dequeue: wait loop while PROTECTED -- avoids race condition;
authorwenzelm
Mon, 30 Jul 2007 19:22:27 +0200
changeset 24072 8b9e5d776ef3
parent 24071 82873bc360c2
child 24073 373727835757
dequeue: wait loop while PROTECTED -- avoids race condition;
src/Pure/ML-Systems/multithreading_polyml.ML
--- a/src/Pure/ML-Systems/multithreading_polyml.ML	Mon Jul 30 11:12:28 2007 +0200
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Mon Jul 30 19:22:27 2007 +0200
@@ -10,8 +10,7 @@
 structure Multithreading: MULTITHREADING =
 struct
 
-
-(* flags *)
+(* options *)
 
 val trace = ref false;
 fun tracing msg =
@@ -105,40 +104,43 @@
         val _ = Mutex.unlock lock;
       in Exn.release res end;
 
+    (*wakeup condition*)
+    val wakeup = ConditionVar.conditionVar ();
+    fun wakeup_all () = ConditionVar.broadcast wakeup;
+    fun wait () = ConditionVar.wait (wakeup, lock);
+
     (*the queue of tasks*)
     val queue = ref tasks;
-    fun dequeue () = PROTECTED "dequeue" (fn () =>
+    val active = ref 0;
+    fun trace_active () = tracing (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active");
+    fun dequeue () =
       let
         val (next, tasks') = next_task (! queue);
         val _ = queue := tasks';
-      in next end);
+      in
+        if Task.is_running (#1 next) then
+         (dec active; trace_active ();
+          wait ();
+          inc active; trace_active ();
+          dequeue ())
+        else next
+      end;
 
     (*worker threads*)
     val running = ref 0;
-    val active = ref 0;
-    fun trace_active () = tracing (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active");
     val status = ref ([]: exn list);
-    val wakeup = ConditionVar.conditionVar ();
-    fun wait () = ConditionVar.wait (wakeup, lock);
     fun work () =
-      (case dequeue () of
+      (case PROTECTED "dequeue" dequeue of
         (Task.Task f, cont) =>
           (case Exn.capture f () of
             Exn.Result () => continue cont
           | Exn.Exn exn =>
               (PROTECTED "status" (fn () => status := exn :: ! status); continue cont))
-      | (Task.Running, _) =>
-          (PROTECTED "wait" (fn () =>
-            (dec active; trace_active ();
-             wait ();
-             inc active; trace_active ()));
-           work ())
       | (Task.Finished, _) =>
          (PROTECTED "running" (fn () => (dec active; dec running));
-          ConditionVar.broadcast wakeup))
+          wakeup_all ()))
     and continue cont =
-      (PROTECTED "cont" (fn () => queue := cont (! queue));
-       ConditionVar.broadcast wakeup; work ());
+      (PROTECTED "cont" (fn () => queue := cont (! queue)); wakeup_all; work ());
 
     (*main control: fork and wait*)
     fun fork 0 = ()