(* Title: Pure/Concurrent/schedule.ML
ID: $Id$
Author: Makarius
Scheduling -- multiple threads working on a queue of tasks.
*)
signature SCHEDULE =
sig
datatype 'a task =
Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
val schedule: int -> ('a -> 'a task * 'a) -> 'a -> exn list
end;
structure Schedule: SCHEDULE =
struct
datatype 'a task =
Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks =>
let
(*synchronized execution*)
val lock = Mutex.mutex ();
fun SYNCHRONIZED e =
let
val _ = Mutex.lock lock;
val res = Exn.capture e ();
val _ = Mutex.unlock lock;
in Exn.release res end;
(*wakeup condition*)
val wakeup = ConditionVar.conditionVar ();
fun wakeup_all () = ConditionVar.broadcast wakeup;
fun wait () = ConditionVar.wait (wakeup, lock);
fun wait_timeout () =
ConditionVar.waitUntil (wakeup, lock, Time.+ (Time.now (), Time.fromSeconds 1));
(*queue of tasks*)
val queue = ref tasks;
val active = ref 0;
fun trace_active () = Multithreading.tracing 1 (fn () =>
"SCHEDULE: " ^ string_of_int (! active) ^ " active");
fun dequeue () =
(case change_result queue next_task of
Wait =>
(dec active; trace_active ();
wait ();
inc active; trace_active ();
dequeue ())
| next => next);
(*pool of running threads*)
val status = ref ([]: exn list);
val running = ref ([]: Thread.thread list);
fun start f = (inc active; change running (cons (SimpleThread.fork false f)));
fun stop () = (dec active; change running (remove Thread.equal (Thread.self ())));
(*worker thread*)
fun worker () =
(case SYNCHRONIZED dequeue of
Task {body, cont, fail} =>
(case Exn.capture (restore_attributes body) () of
Exn.Result () =>
(SYNCHRONIZED (fn () => (change queue cont; wakeup_all ())); worker ())
| Exn.Exn exn =>
SYNCHRONIZED (fn () =>
(change status (cons exn); change queue fail; stop (); wakeup_all ())))
| Terminate => SYNCHRONIZED (fn () => (stop (); wakeup_all ())));
(*main control: fork and wait*)
fun fork 0 = ()
| fork k = (start worker; fork (k - 1));
val _ = SYNCHRONIZED (fn () =>
(fork (Int.max (n, 1));
while not (null (! running)) do
(trace_active ();
if not (null (! status))
then (List.app SimpleThread.interrupt (! running))
else ();
wait_timeout ())));
in ! status end);
end;