--- a/src/Pure/Concurrent/schedule.ML Tue Dec 30 08:18:54 2008 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,85 +0,0 @@
-(* Title: Pure/Concurrent/schedule.ML
- ID: $Id$
- Author: Makarius
-
-Scheduling -- multiple threads working on a queue of tasks.
-*)
-
-signature SCHEDULE =
-sig
- datatype 'a task =
- Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
- val schedule: int -> ('a -> 'a task * 'a) -> 'a -> exn list
-end;
-
-structure Schedule: SCHEDULE =
-struct
-
-datatype 'a task =
- Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
-
-fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks =>
- let
- (*synchronized execution*)
- val lock = Mutex.mutex ();
- fun SYNCHRONIZED e =
- let
- val _ = Mutex.lock lock;
- val res = Exn.capture e ();
- val _ = Mutex.unlock lock;
- in Exn.release res end;
-
- (*wakeup condition*)
- val wakeup = ConditionVar.conditionVar ();
- fun wakeup_all () = ConditionVar.broadcast wakeup;
- fun wait () = ConditionVar.wait (wakeup, lock);
- fun wait_timeout () =
- ConditionVar.waitUntil (wakeup, lock, Time.+ (Time.now (), Time.fromSeconds 1));
-
- (*queue of tasks*)
- val queue = ref tasks;
- val active = ref 0;
- fun trace_active () = Multithreading.tracing 1 (fn () =>
- "SCHEDULE: " ^ string_of_int (! active) ^ " active");
- fun dequeue () =
- (case change_result queue next_task of
- Wait =>
- (dec active; trace_active ();
- wait ();
- inc active; trace_active ();
- dequeue ())
- | next => next);
-
- (*pool of running threads*)
- val status = ref ([]: exn list);
- val running = ref ([]: Thread.thread list);
- fun start f = (inc active; change running (cons (SimpleThread.fork false f)));
- fun stop () = (dec active; change running (remove Thread.equal (Thread.self ())));
-
- (*worker thread*)
- fun worker () =
- (case SYNCHRONIZED dequeue of
- Task {body, cont, fail} =>
- (case Exn.capture (restore_attributes body) () of
- Exn.Result () =>
- (SYNCHRONIZED (fn () => (change queue cont; wakeup_all ())); worker ())
- | Exn.Exn exn =>
- SYNCHRONIZED (fn () =>
- (change status (cons exn); change queue fail; stop (); wakeup_all ())))
- | Terminate => SYNCHRONIZED (fn () => (stop (); wakeup_all ())));
-
- (*main control: fork and wait*)
- fun fork 0 = ()
- | fork k = (start worker; fork (k - 1));
- val _ = SYNCHRONIZED (fn () =>
- (fork (Int.max (n, 1));
- while not (null (! running)) do
- (trace_active ();
- if not (null (! status))
- then (List.app SimpleThread.interrupt (! running))
- else ();
- wait_timeout ())));
-
- in ! status end);
-
-end;