renamed number_of_threads to max_threads;
authorwenzelm
Tue, 24 Jul 2007 22:53:48 +0200
changeset 23973 b6ce6de5b700
parent 23972 9c418fa38f7e
child 23974 16ecf0a5a6bb
renamed number_of_threads to max_threads; added schedule operator;
src/Pure/ML-Systems/multithreading_dummy.ML
src/Pure/ML-Systems/multithreading_polyml.ML
--- a/src/Pure/ML-Systems/multithreading_dummy.ML	Tue Jul 24 22:53:46 2007 +0200
+++ b/src/Pure/ML-Systems/multithreading_dummy.ML	Tue Jul 24 22:53:48 2007 +0200
@@ -7,19 +7,22 @@
 
 signature MULTITHREADING =
 sig
-  val number_of_threads: int ref
+  val max_threads: int ref
   val self_critical: unit -> bool
   val CRITICAL: (unit -> 'a) -> 'a
+  val schedule: int -> ('a -> (unit -> unit) option * 'a) -> 'a -> exn list
 end;
 
 structure Multithreading: MULTITHREADING =
 struct
 
-val number_of_threads = ref 0;
+val max_threads = ref 1;
 
 fun self_critical () = false;
 fun CRITICAL e = e ();
 
+fun schedule _ _ _ = raise Fail ("No multithreading support for " ^ ml_system);
+
 end;
 
 val CRITICAL = Multithreading.CRITICAL;
--- a/src/Pure/ML-Systems/multithreading_polyml.ML	Tue Jul 24 22:53:46 2007 +0200
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Tue Jul 24 22:53:48 2007 +0200
@@ -10,14 +10,14 @@
 structure Multithreading: MULTITHREADING =
 struct
 
-val number_of_threads = ref 0;
-
-
 (* FIXME tmp *)
 fun message s =
   (TextIO.output (TextIO.stdErr, (">>> " ^ s ^ "\n")); TextIO.flushOut TextIO.stdErr);
 
 
+val max_threads = ref 1;
+
+
 (* critical section -- may be nested within the same thread *)
 
 local
@@ -39,9 +39,9 @@
       val _ =
         if Mutex.trylock critical_lock then ()
         else
-          (message "Waiting for critical lock";
+          (message "CRITICAL: waiting for lock";
            Mutex.lock critical_lock;
-           message "Obtained critical lock");
+           message "CRITICAL: obtained lock");
       val _ = critical_thread := SOME (Thread.self ());
       val result = Exn.capture e ();
       val _ = critical_thread := NONE;
@@ -50,6 +50,55 @@
 
 end;
 
+
+(* scheduling -- non-interruptible threads working on a queue of tasks *)
+
+fun schedule n next_task tasks =
+  let
+    (*protected execution*)
+    val lock = Mutex.mutex ();
+    fun PROTECTED e =
+      let
+        val _ = Mutex.lock lock;
+        val res = Exn.capture e ();
+        val _ = Mutex.unlock lock;
+      in Exn.release res end;
+
+    (*queue of tasks*)
+    val queue = ref tasks;
+    fun dequeue () = PROTECTED (fn () =>
+      let
+        val (task, tasks') = next_task (! queue);
+        val _ = queue := tasks';
+      in task end);
+
+    (*worker threads*)
+    val running = ref 0;
+    val status = ref ([]: exn list);
+    val finished = ConditionVar.conditionVar ();
+    fun work k () =
+      (message ("WORKER THREAD " ^ Int.toString k);
+       case dequeue () of
+        SOME f =>
+          (case Exn.capture f () of
+            Exn.Result () => work k ()
+          | Exn.Exn exn => (PROTECTED (fn () => status := exn :: ! status); work k ()))
+      | NONE =>
+         (PROTECTED (fn () => running := ! running - 1);
+          ConditionVar.signal finished));
+
+    (*main control: fork and wait*)
+    fun fork 0 = ()
+      | fork k =
+         (running := ! running + 1;
+          Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
+          fork (k - 1));
+    val _ = PROTECTED (fn () =>
+     (fork (Int.max (n, 1));
+      while ! running <> 0 do ConditionVar.wait (finished, lock)));
+
+  in ! status end;
+
 end;
 
 val CRITICAL = Multithreading.CRITICAL;