src/Pure/Concurrent/schedule.ML
changeset 29252 ea97aa6aeba2
parent 29251 8f84a608883d
parent 29205 7dc7a75033ea
child 29253 3c6cd80a4854
child 29254 ef3e2c3399d7
child 29332 edc1e2a56398
--- a/src/Pure/Concurrent/schedule.ML	Tue Dec 30 08:18:54 2008 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,85 +0,0 @@
-(*  Title:      Pure/Concurrent/schedule.ML
-    ID:         $Id$
-    Author:     Makarius
-
-Scheduling -- multiple threads working on a queue of tasks.
-*)
-
-signature SCHEDULE =
-sig
-  datatype 'a task =
-    Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
-  val schedule: int -> ('a -> 'a task * 'a) -> 'a -> exn list
-end;
-
-structure Schedule: SCHEDULE =
-struct
-
-datatype 'a task =
-  Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
-
-fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks =>
-  let
-    (*synchronized execution*)
-    val lock = Mutex.mutex ();
-    fun SYNCHRONIZED e =
-      let
-        val _ = Mutex.lock lock;
-        val res = Exn.capture e ();
-        val _ = Mutex.unlock lock;
-      in Exn.release res end;
-
-    (*wakeup condition*)
-    val wakeup = ConditionVar.conditionVar ();
-    fun wakeup_all () = ConditionVar.broadcast wakeup;
-    fun wait () = ConditionVar.wait (wakeup, lock);
-    fun wait_timeout () =
-      ConditionVar.waitUntil (wakeup, lock, Time.+ (Time.now (), Time.fromSeconds 1));
-
-    (*queue of tasks*)
-    val queue = ref tasks;
-    val active = ref 0;
-    fun trace_active () = Multithreading.tracing 1 (fn () =>
-      "SCHEDULE: " ^ string_of_int (! active) ^ " active");
-    fun dequeue () =
-      (case change_result queue next_task of
-        Wait =>
-          (dec active; trace_active ();
-            wait ();
-            inc active; trace_active ();
-            dequeue ())
-      | next => next);
-
-    (*pool of running threads*)
-    val status = ref ([]: exn list);
-    val running = ref ([]: Thread.thread list);
-    fun start f = (inc active; change running (cons (SimpleThread.fork false f)));
-    fun stop () = (dec active; change running (remove Thread.equal (Thread.self ())));
-
-    (*worker thread*)
-    fun worker () =
-      (case SYNCHRONIZED dequeue of
-        Task {body, cont, fail} =>
-          (case Exn.capture (restore_attributes body) () of
-            Exn.Result () =>
-              (SYNCHRONIZED (fn () => (change queue cont; wakeup_all ())); worker ())
-          | Exn.Exn exn =>
-              SYNCHRONIZED (fn () =>
-                (change status (cons exn); change queue fail; stop (); wakeup_all ())))
-      | Terminate => SYNCHRONIZED (fn () => (stop (); wakeup_all ())));
-
-    (*main control: fork and wait*)
-    fun fork 0 = ()
-      | fork k = (start worker; fork (k - 1));
-    val _ = SYNCHRONIZED (fn () =>
-     (fork (Int.max (n, 1));
-      while not (null (! running)) do
-      (trace_active ();
-       if not (null (! status))
-       then (List.app SimpleThread.interrupt (! running))
-       else ();
-       wait_timeout ())));
-
-  in ! status end);
-
-end;