clarified worker_wait;
authorwenzelm
Thu, 29 Jan 2015 13:49:03 +0100
changeset 59465 c21b65a6834b
parent 59464 df5dc24ca712
child 59466 6fab87db556c
clarified worker_wait; clarified join_next/join_loop, with proper thread attributes to allow interrupting of join (e.g. via Lazy.force);
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Wed Jan 28 22:50:00 2015 +0100
+++ b/src/Pure/Concurrent/future.ML	Thu Jan 29 13:49:03 2015 +0100
@@ -236,13 +236,10 @@
       in () end);
   in () end;
 
-fun worker_wait active cond = (*requires SYNCHRONIZED*)
+fun worker_wait worker_state cond = (*requires SYNCHRONIZED*)
   (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
-    SOME state =>
-     (state := (if active then Waiting else Sleeping);
-      wait cond;
-      state := Working)
-  | NONE => ignore (wait cond));
+    SOME state => Unsynchronized.setmp state worker_state wait cond
+  | NONE => wait cond);
 
 fun worker_next () = (*requires SYNCHRONIZED*)
   if length (! workers) > ! max_workers then
@@ -250,10 +247,10 @@
      signal work_available;
      NONE)
   else if count_workers Working > ! max_active then
-    (worker_wait false work_available; worker_next ())
+    (worker_wait Sleeping work_available; worker_next ())
   else
     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
-      NONE => (worker_wait false work_available; worker_next ())
+      NONE => (worker_wait Sleeping work_available; worker_next ())
     | some => (signal work_available; some));
 
 fun worker_loop name =
@@ -487,21 +484,22 @@
 
 local
 
-fun join_next deps = (*requires SYNCHRONIZED*)
+fun join_next atts deps = (*requires SYNCHRONIZED*)
   if null deps then NONE
   else
     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
       (NONE, []) => NONE
     | (NONE, deps') =>
-        (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
+       (worker_waiting deps' (fn () =>
+          Multithreading.with_attributes atts (fn _ =>
+            Exn.release (worker_wait Waiting work_finished)));
+        join_next atts deps')
     | (SOME work, deps') => SOME (work, deps'));
 
-fun execute_work NONE = ()
-  | execute_work (SOME (work, deps')) =
-      (worker_joining (fn () => worker_exec work); join_work deps')
-and join_work deps =
-  Multithreading.with_attributes Multithreading.no_interrupts
-    (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
+fun join_loop atts deps =
+  (case SYNCHRONIZED "join" (fn () => join_next atts deps) of
+    NONE => ()
+  | SOME (work, deps') => (worker_joining (fn () => worker_exec work); join_loop atts deps'));
 
 in
 
@@ -509,7 +507,9 @@
   let
     val _ =
       if forall is_finished xs then ()
-      else if is_some (worker_task ()) then join_work (map task_of xs)
+      else if is_some (worker_task ()) then
+        Multithreading.with_attributes Multithreading.no_interrupts
+          (fn orig_atts => join_loop orig_atts (map task_of xs))
       else List.app (ignore o Single_Assignment.await o result_of) xs;
   in map get_result xs end;