moved Multithreading.task/schedule to Concurrent/schedule.ML;
authorwenzelm
Thu, 04 Sep 2008 16:03:47 +0200
changeset 28124 10a1f1f4c6ae
parent 28123 53cd972d7db9
child 28125 e99427bcf7f3
moved Multithreading.task/schedule to Concurrent/schedule.ML;
src/Pure/ML-Systems/multithreading_polyml.ML
--- a/src/Pure/ML-Systems/multithreading_polyml.ML	Thu Sep 04 16:03:46 2008 +0200
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Thu Sep 04 16:03:47 2008 +0200
@@ -2,11 +2,9 @@
     ID:         $Id$
     Author:     Makarius
 
-Multithreading in Poly/ML 5.1 (cf. polyml/basis/Thread.sml).
+Multithreading in Poly/ML 5.1, 5.2 (cf. polyml/basis/Thread.sml).
 *)
 
-open Thread;
-
 signature MULTITHREADING_POLYML =
 sig
   val interruptible: ('a -> 'b) -> 'a -> 'b
@@ -50,13 +48,6 @@
 
 (* misc utils *)
 
-fun cons x xs = x :: xs;
-
-fun change r f = r := f (! r);
-
-fun inc i = (i := ! i + 1; ! i);
-fun dec i = (i := ! i - 1; ! i);
-
 fun show "" = "" | show name = " " ^ name;
 fun show' "" = "" | show' name = " [" ^ name ^ "]";
 
@@ -238,93 +229,6 @@
 end;
 
 
-(* scheduling -- multiple threads working on a queue of tasks *)
-
-datatype 'a task =
-  Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
-
-fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks =>
-  let
-    (*protected execution*)
-    val lock = Mutex.mutex ();
-    val protected_name = ref "";
-    fun PROTECTED name e =
-      let
-        val name' = ! protected_name;
-        val _ =
-          if Mutex.trylock lock then ()
-          else
-            let
-              val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": waiting");
-              val _ = Mutex.lock lock;
-              val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": passed");
-            in () end;
-        val _ = protected_name := name;
-        val res = Exn.capture e ();
-        val _ = protected_name := "";
-        val _ = Mutex.unlock lock;
-      in Exn.release res end;
-
-    (*wakeup condition*)
-    val wakeup = ConditionVar.conditionVar ();
-    fun wakeup_all () = ConditionVar.broadcast wakeup;
-    fun wait () = ConditionVar.wait (wakeup, lock);
-    fun wait_timeout () = ConditionVar.waitUntil (wakeup, lock, Time.now () + Time.fromSeconds 1);
-
-    (*queue of tasks*)
-    val queue = ref tasks;
-    val active = ref 0;
-    fun trace_active () = tracing 1 (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active");
-    fun dequeue () =
-      let
-        val (next, tasks') = next_task (! queue);
-        val _ = queue := tasks';
-      in
-        (case next of Wait =>
-          (dec active; trace_active ();
-            wait ();
-            inc active; trace_active ();
-            dequeue ())
-        | _ => next)
-      end;
-
-    (*pool of running threads*)
-    val status = ref ([]: exn list);
-    val running = ref ([]: Thread.thread list);
-    fun start f =
-      (inc active;
-       change running (cons (Thread.fork (f, [Thread.InterruptState Thread.InterruptDefer]))));
-    fun stop () =
-      (dec active;
-       change running (List.filter (fn t => not (Thread.equal (t, Thread.self ())))));
-
-   (*worker thread*)
-    fun worker () =
-      (case PROTECTED "dequeue" dequeue of
-        Task {body, cont, fail} =>
-          (case Exn.capture (restore_attributes body) () of
-            Exn.Result () =>
-              (PROTECTED "cont" (fn () => (change queue cont; wakeup_all ())); worker ())
-          | Exn.Exn exn =>
-              PROTECTED "fail" (fn () =>
-                (change status (cons exn); change queue fail; stop (); wakeup_all ())))
-      | Terminate => PROTECTED "terminate" (fn () => (stop (); wakeup_all ())));
-
-    (*main control: fork and wait*)
-    fun fork 0 = ()
-      | fork k = (start worker; fork (k - 1));
-    val _ = PROTECTED "main" (fn () =>
-     (fork (Int.max (n, 1));
-      while not (List.null (! running)) do
-      (trace_active ();
-       if not (List.null (! status))
-       then (List.app (fn t => Thread.interrupt t handle Thread _ => ()) (! running))
-       else ();
-       wait_timeout ())));
-
-  in ! status end);
-
-
 (* profiling *)
 
 local val profile_orig = profile in
@@ -347,18 +251,13 @@
 val serial = uninterruptible (fn _ => fn () =>
   let
     val _ = Mutex.lock serial_lock;
-    val res = inc serial_count;
+    val _ = serial_count := ! serial_count + 1;
+    val res = ! serial_count;
     val _ = Mutex.unlock serial_lock;
   in res end);
 
 end;
 
-
-(* thread data *)
-
-val get_data = Thread.getLocal;
-val put_data = Thread.setLocal;
-
 end;
 
 structure BasicMultithreading: BASIC_MULTITHREADING = Multithreading;