added trace flag, official tracing operation;
authorwenzelm
Wed, 25 Jul 2007 17:05:49 +0200
changeset 23981 03b71bf91318
parent 23980 d35dc9344515
child 23982 e3c4c0b9ae05
added trace flag, official tracing operation; added named CRITICAL'; schedule: tuned signature, actually observe dependencies on running tasks;
src/Pure/ML-Systems/multithreading_polyml.ML
--- a/src/Pure/ML-Systems/multithreading_polyml.ML	Wed Jul 25 17:05:48 2007 +0200
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Wed Jul 25 17:05:49 2007 +0200
@@ -10,11 +10,13 @@
 structure Multithreading: MULTITHREADING =
 struct
 
-(* FIXME tmp *)
-fun message s =
-  (TextIO.output (TextIO.stdErr, (">>> " ^ s ^ "\n")); TextIO.flushOut TextIO.stdErr);
+val trace = ref false;
+fun tracing msg =
+  if ! trace
+  then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
+  else ();
 
-
+val available = true;
 val max_threads = ref 1;
 
 
@@ -32,22 +34,24 @@
     NONE => false
   | SOME id => Thread.equal (id, Thread.self ()));
 
-fun CRITICAL e =
+fun CRITICAL' name e =
   if self_critical () then e ()
   else
     let
       val _ =
         if Mutex.trylock critical_lock then ()
         else
-          (message "CRITICAL: waiting for lock";
-           Mutex.lock critical_lock;
-           message "CRITICAL: obtained lock");
+         (tracing (fn () => "CRITICAL " ^ name ^ ": waiting for lock");
+          Mutex.lock critical_lock;
+          tracing (fn () => "CRITICAL " ^ name ^ ": obtained lock"));
       val _ = critical_thread := SOME (Thread.self ());
       val result = Exn.capture e ();
       val _ = critical_thread := NONE;
       val _ = Mutex.unlock critical_lock;
     in Exn.release result end;
 
+fun CRITICAL e = CRITICAL' "" e;
+
 end;
 
 
@@ -57,34 +61,47 @@
   let
     (*protected execution*)
     val lock = Mutex.mutex ();
-    fun PROTECTED e =
+    fun PROTECTED k e =
       let
-        val _ = Mutex.lock lock;
+        val _ =
+          if Mutex.trylock lock then ()
+          else
+           (tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": waiting for lock");
+            Mutex.lock lock;
+            tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": obtained lock"));
         val res = Exn.capture e ();
         val _ = Mutex.unlock lock;
       in Exn.release res end;
 
-    (*queue of tasks*)
+    (*the queue of tasks*)
     val queue = ref tasks;
-    fun dequeue () = PROTECTED (fn () =>
+    fun dequeue k = PROTECTED k (fn () =>
       let
-        val (task, tasks') = next_task (! queue);
+        val (next, tasks') = next_task (! queue);
         val _ = queue := tasks';
-      in task end);
+      in next end);
 
     (*worker threads*)
     val running = ref 0;
     val status = ref ([]: exn list);
     val finished = ConditionVar.conditionVar ();
-    fun work k () =
-      (message ("WORKER THREAD " ^ Int.toString k);
-       case dequeue () of
-        SOME f =>
-          (case Exn.capture f () of
-            Exn.Result () => work k ()
-          | Exn.Exn exn => (PROTECTED (fn () => status := exn :: ! status); work k ()))
-      | NONE =>
-         (PROTECTED (fn () => running := ! running - 1);
+    fun wait () = ConditionVar.waitUntil (finished, lock, Time.fromMilliseconds 500);
+    fun continue cont k =
+      (PROTECTED k (fn () => queue := cont (! queue)); ConditionVar.signal finished; work k ())
+    and work k () =
+      (case dequeue k of
+        (Task.Task f, cont) =>
+          (tracing (fn () => "TASK " ^ Int.toString k);
+           case Exn.capture f () of
+            Exn.Result () => continue cont k
+          | Exn.Exn exn =>
+              (PROTECTED k (fn () => status := exn :: ! status); continue cont k))
+      | (Task.Running, _) =>
+          (tracing (fn () => "WAITING " ^ Int.toString k);
+           PROTECTED k wait; work k ())
+      | (Task.Finished, _) =>
+         (tracing (fn () => "TERMINATING " ^ Int.toString k);
+          PROTECTED k (fn () => running := ! running - 1);
           ConditionVar.signal finished));
 
     (*main control: fork and wait*)
@@ -93,12 +110,12 @@
          (running := ! running + 1;
           Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
           fork (k - 1));
-    val _ = PROTECTED (fn () =>
-     (fork (Int.max (n, 1));
-      while ! running <> 0 do ConditionVar.wait (finished, lock)));
+    val _ = PROTECTED 0 (fn () =>
+     (fork (Int.max (n, 1)); while ! running <> 0 do (tracing (fn () => "MAIN WAIT"); wait ())));
 
   in ! status end;
 
 end;
 
+val CRITICAL' = Multithreading.CRITICAL';
 val CRITICAL = Multithreading.CRITICAL;