dequeue_towards: always return active tasks;
authorwenzelm
Mon, 27 Jul 2009 15:06:33 +0200
changeset 32224 a46f5e9b1718
parent 32223 f5f46d6eb95b
child 32225 d5d6f47fb018
dequeue_towards: always return active tasks; join_work: imitate worker more closely, keep active if queue appears to be blocked for the moment -- it may become free again after some worker_finished event;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/future.ML	Mon Jul 27 13:32:29 2009 +0200
+++ b/src/Pure/Concurrent/future.ML	Mon Jul 27 15:06:33 2009 +0200
@@ -328,14 +328,24 @@
       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   | SOME res => res);
 
+fun join_wait x =
+  if SYNCHRONIZED "join_wait" (fn () =>
+    is_finished x orelse (wait work_finished; false))
+  then () else join_wait x;
+
 fun join_next deps = (*requires SYNCHRONIZED*)
-  if overloaded () then (worker_wait scheduler_event; join_next deps)
-  else change_result queue (Task_Queue.dequeue_towards deps);
+  if null deps then NONE
+  else if overloaded () then (worker_wait scheduler_event; join_next deps)
+  else
+    (case change_result queue (Task_Queue.dequeue_towards deps) of
+      (NONE, []) => NONE
+    | (NONE, deps') => (worker_wait work_finished; join_next deps')
+    | (SOME work, deps') => SOME (work, deps'));
 
-fun join_deps deps =
+fun join_work deps =
   (case SYNCHRONIZED "join" (fn () => join_next deps) of
     NONE => ()
-  | SOME (work, deps') => (execute "join" work; join_deps deps'));
+  | SOME (work, deps') => (execute "join" work; join_work deps'));
 
 in
 
@@ -346,20 +356,9 @@
       val _ = scheduler_check "join check";
       val _ = Multithreading.self_critical () andalso
         error "Cannot join future values within critical section";
-
-      val worker = is_worker ();
-      val _ = if worker then join_deps (map task_of xs) else ();
-
-      fun join_wait x =
-        if SYNCHRONIZED "join_wait" (fn () =>
-          is_finished x orelse ((if worker then worker_wait else wait) work_finished; false))
-        then () else join_wait x;
-
-      val _ = xs |> List.app (fn x =>
-        let val time = Multithreading.real_time join_wait x in
-          Multithreading.tracing_time true time
-            (fn () => "joined after " ^ Time.toString time)
-        end);
+      val _ =
+        if is_worker () then join_work (map task_of xs)
+        else List.app join_wait xs;
     in map get_result xs end) ();
 
 end;
--- a/src/Pure/Concurrent/task_queue.ML	Mon Jul 27 13:32:29 2009 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Mon Jul 27 15:06:33 2009 +0200
@@ -28,7 +28,7 @@
   val extend: task -> (bool -> bool) -> queue -> queue option
   val dequeue: queue -> (task * group * (bool -> bool) list) option * queue
   val dequeue_towards: task list -> queue ->
-    (((task * group * (bool -> bool) list) * task list) option * queue)
+    (((task * group * (bool -> bool) list) option * task list) * queue)
   val finish: task -> queue -> bool * queue
 end;
 
@@ -215,14 +215,14 @@
       let
         val jobs' = set_job task (Running (Thread.self ())) jobs;
         val cache' = Unknown;
-      in (SOME (res, tasks), make_queue groups jobs' cache') end;
+      in ((SOME res, tasks), make_queue groups jobs' cache') end;
   in
     (case get_first ready tasks of
       SOME res => result res
     | NONE =>
         (case get_first (get_first ready o Task_Graph.imm_preds jobs) tasks of
           SOME res => result res
-        | NONE => (NONE, queue)))
+        | NONE => ((NONE, tasks), queue)))
   end;