diff -r 685c9e05a6ab -r fd8bb7527f7b src/Pure/Concurrent/schedule.ML --- a/src/Pure/Concurrent/schedule.ML Tue Dec 16 09:44:59 2008 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,85 +0,0 @@ -(* Title: Pure/Concurrent/schedule.ML - ID: $Id$ - Author: Makarius - -Scheduling -- multiple threads working on a queue of tasks. -*) - -signature SCHEDULE = -sig - datatype 'a task = - Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate; - val schedule: int -> ('a -> 'a task * 'a) -> 'a -> exn list -end; - -structure Schedule: SCHEDULE = -struct - -datatype 'a task = - Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate; - -fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks => - let - (*synchronized execution*) - val lock = Mutex.mutex (); - fun SYNCHRONIZED e = - let - val _ = Mutex.lock lock; - val res = Exn.capture e (); - val _ = Mutex.unlock lock; - in Exn.release res end; - - (*wakeup condition*) - val wakeup = ConditionVar.conditionVar (); - fun wakeup_all () = ConditionVar.broadcast wakeup; - fun wait () = ConditionVar.wait (wakeup, lock); - fun wait_timeout () = - ConditionVar.waitUntil (wakeup, lock, Time.+ (Time.now (), Time.fromSeconds 1)); - - (*queue of tasks*) - val queue = ref tasks; - val active = ref 0; - fun trace_active () = Multithreading.tracing 1 (fn () => - "SCHEDULE: " ^ string_of_int (! active) ^ " active"); - fun dequeue () = - (case change_result queue next_task of - Wait => - (dec active; trace_active (); - wait (); - inc active; trace_active (); - dequeue ()) - | next => next); - - (*pool of running threads*) - val status = ref ([]: exn list); - val running = ref ([]: Thread.thread list); - fun start f = (inc active; change running (cons (SimpleThread.fork false f))); - fun stop () = (dec active; change running (remove Thread.equal (Thread.self ()))); - - (*worker thread*) - fun worker () = - (case SYNCHRONIZED dequeue of - Task {body, cont, fail} => - (case Exn.capture (restore_attributes body) () of - Exn.Result () => - (SYNCHRONIZED (fn () => (change queue cont; wakeup_all ())); worker ()) - | Exn.Exn exn => - SYNCHRONIZED (fn () => - (change status (cons exn); change queue fail; stop (); wakeup_all ()))) - | Terminate => SYNCHRONIZED (fn () => (stop (); wakeup_all ()))); - - (*main control: fork and wait*) - fun fork 0 = () - | fork k = (start worker; fork (k - 1)); - val _ = SYNCHRONIZED (fn () => - (fork (Int.max (n, 1)); - while not (null (! running)) do - (trace_active (); - if not (null (! status)) - then (List.app SimpleThread.interrupt (! running)) - else (); - wait_timeout ()))); - - in ! status end); - -end;