tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
authorwenzelm
Wed, 06 Jan 2010 15:07:56 +0100
changeset 34279 02936e77a07c
parent 34278 228f27469139
child 34280 16bf3e9786a3
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity); shutdown: back to synchronous wait, which means no asynchronous interrupts within the loop;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/future.ML	Wed Jan 06 13:14:28 2010 +0100
+++ b/src/Pure/Concurrent/future.ML	Wed Jan 06 15:07:56 2010 +0100
@@ -126,9 +126,6 @@
 
 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
 
-fun raw_wait cond = (*requires SYNCHRONIZED*)
-  ConditionVar.wait (cond, lock);
-
 fun wait cond = (*requires SYNCHRONIZED*)
   Multithreading.sync_wait NONE NONE cond lock;
 
@@ -181,7 +178,10 @@
       in assign_result group result res end;
   in (result, job) end;
 
-fun do_cancel group = (*requires SYNCHRONIZED*)
+fun cancel_now group = (*requires SYNCHRONIZED*)
+  Unsynchronized.change_result queue (Task_Queue.cancel group);
+
+fun cancel_later group = (*requires SYNCHRONIZED*)
  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   broadcast scheduler_event);
 
@@ -195,8 +195,8 @@
         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
         val _ =
           if ok then ()
-          else if Task_Queue.cancel (! queue) group then ()
-          else do_cancel group;
+          else if cancel_now group then ()
+          else cancel_later group;
         val _ = broadcast work_finished;
         val _ = if maximal then () else signal work_available;
       in () end);
@@ -332,7 +332,7 @@
       else
        (Multithreading.tracing 1 (fn () =>
           string_of_int (length (! canceled)) ^ " canceled groups");
-        Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
+        Unsynchronized.change canceled (filter_out cancel_now);
         broadcast_work ());
 
 
@@ -351,7 +351,8 @@
   in continue end
   handle Exn.Interrupt =>
    (Multithreading.tracing 1 (fn () => "Interrupt");
-    List.app do_cancel (Task_Queue.cancel_all (! queue)); true);
+    List.app cancel_later (Unsynchronized.change_result queue Task_Queue.cancel_all);
+    broadcast_work (); true);
 
 fun scheduler_loop () =
   while
@@ -504,7 +505,7 @@
   else interruptible f x;
 
 (*cancel: present and future group members will be interrupted eventually*)
-fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
+fun cancel_group group = SYNCHRONIZED "cancel" (fn () => cancel_later group);
 fun cancel x = cancel_group (group_of x);
 
 
@@ -514,7 +515,7 @@
   if Multithreading.available then
     SYNCHRONIZED "shutdown" (fn () =>
      while scheduler_active () do
-      (raw_wait scheduler_event; broadcast_work ()))
+      (wait scheduler_event; broadcast_work ()))
   else ();
 
 
--- a/src/Pure/Concurrent/task_queue.ML	Wed Jan 06 13:14:28 2010 +0100
+++ b/src/Pure/Concurrent/task_queue.ML	Wed Jan 06 15:07:56 2010 +0100
@@ -22,8 +22,8 @@
   val empty: queue
   val all_passive: queue -> bool
   val status: queue -> {ready: int, pending: int, running: int, passive: int}
-  val cancel: queue -> group -> bool
-  val cancel_all: queue -> group list
+  val cancel: group -> queue -> bool * queue
+  val cancel_all: queue -> group list * queue
   val enqueue_passive: group -> queue -> task * queue
   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
   val extend: task -> (bool -> bool) -> queue -> queue option
@@ -81,6 +81,8 @@
   not (null (Synchronized.value status)) orelse
     (case parent of NONE => false | SOME group => is_canceled group);
 
+fun is_ready deps group = null deps orelse is_canceled group;
+
 fun group_status (Group {parent, status, ...}) =
   Synchronized.value status @
     (case parent of NONE => [] | SOME group => group_status group);
@@ -137,9 +139,9 @@
 fun status (Queue {jobs, ...}) =
   let
     val (x, y, z, w) =
-      Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) =>
+      Task_Graph.fold (fn (_, ((group, job), (deps, _))) => fn (x, y, z, w) =>
           (case job of
-            Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w)
+            Job _ => if is_ready deps group then (x + 1, y, z, w) else (x, y + 1, z, w)
           | Running _ => (x, y, z + 1, w)
           | Passive => (x, y, z, w + 1)))
         jobs (0, 0, 0, 0);
@@ -148,16 +150,16 @@
 
 (* cancel -- peers and sub-groups *)
 
-fun cancel (Queue {groups, jobs, ...}) group =
+fun cancel group (Queue {groups, jobs, ...}) =
   let
     val _ = cancel_group group Exn.Interrupt;
     val tasks = Inttab.lookup_list groups (group_id group);
     val running =
       fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
     val _ = List.app SimpleThread.interrupt running;
-  in null running end;
+  in (null running, make_queue groups jobs Unknown) end;
 
-fun cancel_all (Queue {jobs, ...}) =
+fun cancel_all (Queue {groups, jobs, ...}) =
   let
     fun cancel_job (group, job) (groups, running) =
       (cancel_group group Exn.Interrupt;
@@ -166,7 +168,7 @@
         | _ => (groups, running)));
     val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
     val _ = List.app SimpleThread.interrupt running;
-  in running_groups end;
+  in (running_groups, make_queue groups jobs Unknown) end;
 
 
 (* enqueue *)
@@ -207,7 +209,8 @@
 
 fun dequeue thread (queue as Queue {groups, jobs, cache}) =
   let
-    fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
+    fun ready (task, ((group, Job list), (deps, _))) =
+          if is_ready deps group then SOME (task, group, rev list) else NONE
       | ready _ = NONE;
     fun deq boundary =
       (case Task_Graph.get_first boundary ready jobs of
@@ -235,7 +238,7 @@
     fun ready task =
       (case Task_Graph.get_node jobs task of
         (group, Job list) =>
-          if null (get_deps jobs task)
+          if is_ready (get_deps jobs task) group
           then SOME (task, group, rev list)
           else NONE
       | _ => NONE);