back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
authorwenzelm
Tue, 20 Jul 2010 17:35:42 +0200
changeset 37854 047c96f41455
parent 37853 26906cacbaae
child 37855 1ad8205078d4
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c); cancel passive tasks more actively via Exn.Interrupt, by treating them like ragular jobs here; attempts to re-assign canceled futures/promises raise Exn.Interrupt; tuned;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/future.ML	Tue Jul 20 16:42:48 2010 +0200
+++ b/src/Pure/Concurrent/future.ML	Tue Jul 20 17:35:42 2010 +0200
@@ -106,9 +106,13 @@
 
 fun assign_result group result res =
   let
-    val _ = Single_Assignment.assign result res;
+    val _ = Single_Assignment.assign result res
+      handle exn as Fail _ =>
+        (case Single_Assignment.peek result of
+          SOME (Exn.Exn Exn.Interrupt) => raise Exn.Interrupt
+        | _ => reraise exn);
     val ok =
-      (case res of
+      (case the (Single_Assignment.peek result) of
         Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
       | Exn.Result _ => true);
   in ok end;
@@ -481,8 +485,9 @@
 fun promise_group group : 'a future =
   let
     val result = Single_Assignment.var "promise" : 'a result;
-    val task = SYNCHRONIZED "enqueue" (fn () =>
-      Unsynchronized.change_result queue (Task_Queue.enqueue_passive group));
+    fun abort () = assign_result group result (Exn.Exn Exn.Interrupt) handle Fail _ => true;
+    val task = SYNCHRONIZED "enqueue_passive" (fn () =>
+      Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
   in Future {promised = true, task = task, group = group, result = result} end;
 
 fun promise () = promise_group (new_group ());
@@ -509,8 +514,10 @@
   else interruptible f x;
 
 (*cancel: present and future group members will be interrupted eventually*)
-fun cancel_group group =
-  SYNCHRONIZED "cancel" (fn () => if cancel_now group then () else cancel_later group);
+fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
+ (if cancel_now group then () else cancel_later group;
+  signal work_available; scheduler_check ()));
+
 fun cancel x = cancel_group (group_of x);
 
 
--- a/src/Pure/Concurrent/task_queue.ML	Tue Jul 20 16:42:48 2010 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Tue Jul 20 17:35:42 2010 +0200
@@ -24,7 +24,7 @@
   val status: queue -> {ready: int, pending: int, running: int, passive: int}
   val cancel: queue -> group -> bool
   val cancel_all: queue -> group list
-  val enqueue_passive: group -> queue -> task * queue
+  val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
   val extend: task -> (bool -> bool) -> queue -> queue option
   val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
@@ -88,8 +88,6 @@
   not (null (Synchronized.value status)) orelse
     (case parent of NONE => false | SOME group => is_canceled group);
 
-fun is_ready deps group = null deps orelse is_canceled group;
-
 fun group_status (Group {parent, status, ...}) =
   Synchronized.value status @
     (case parent of NONE => [] | SOME group => group_status group);
@@ -105,7 +103,7 @@
 datatype job =
   Job of (bool -> bool) list |
   Running of Thread.thread |
-  Passive;
+  Passive of unit -> bool;
 
 type jobs = (group * job) Task_Graph.T;
 
@@ -135,9 +133,21 @@
 
 val empty = make_queue Inttab.empty Task_Graph.empty;
 
+
+(* job status *)
+
+fun ready_job task ((group, Job list), ([], _)) = SOME (task, group, rev list)
+  | ready_job task ((group, Passive abort), ([], _)) =
+      if is_canceled group then SOME (task, group, [fn _ => abort ()])
+      else NONE
+  | ready_job _ _ = NONE;
+
+fun active_job (_, Job _) = SOME ()
+  | active_job (_, Running _) = SOME ()
+  | active_job (group, Passive _) = if is_canceled group then SOME () else NONE;
+
 fun all_passive (Queue {jobs, ...}) =
-  Task_Graph.get_first
-    ((fn Job _ => SOME () | Running _ => SOME () | Passive => NONE) o #2 o #1 o #2) jobs |> is_none;
+  is_none (Task_Graph.get_first (active_job o #1 o #2) jobs);
 
 
 (* queue status *)
@@ -145,11 +155,11 @@
 fun status (Queue {jobs, ...}) =
   let
     val (x, y, z, w) =
-      Task_Graph.fold (fn (_, ((group, job), (deps, _))) => fn (x, y, z, w) =>
+      Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) =>
           (case job of
-            Job _ => if is_ready deps group then (x + 1, y, z, w) else (x, y + 1, z, w)
+            Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w)
           | Running _ => (x, y, z + 1, w)
-          | Passive => (x, y, z, w + 1)))
+          | Passive _ => (x, y, z, w + 1)))
         jobs (0, 0, 0, 0);
   in {ready = x, pending = y, running = z, passive = w} end;
 
@@ -165,7 +175,7 @@
     val _ = List.app Simple_Thread.interrupt running;
   in null running end;
 
-fun cancel_all (Queue {groups, jobs}) =
+fun cancel_all (Queue {jobs, ...}) =
   let
     fun cancel_job (group, job) (groups, running) =
       (cancel_group group Exn.Interrupt;
@@ -179,12 +189,12 @@
 
 (* enqueue *)
 
-fun enqueue_passive group (Queue {groups, jobs}) =
+fun enqueue_passive group abort (Queue {groups, jobs}) =
   let
     val task = new_task NONE;
     val groups' = groups
       |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
-    val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive));
+    val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive abort));
   in (task, make_queue groups' jobs') end;
 
 fun enqueue group deps pri job (Queue {groups, jobs}) =
@@ -208,17 +218,11 @@
 (* dequeue *)
 
 fun dequeue thread (queue as Queue {groups, jobs}) =
-  let
-    fun ready (task, ((group, Job list), (deps, _))) =
-          if is_ready deps group then SOME (task, group, rev list) else NONE
-      | ready _ = NONE;
-  in
-    (case Task_Graph.get_first ready jobs of
-      NONE => (NONE, queue)
-    | SOME (result as (task, _, _)) =>
-        let val jobs' = set_job task (Running thread) jobs
-        in (SOME result, make_queue groups jobs') end)
-  end;
+  (case Task_Graph.get_first (uncurry ready_job) jobs of
+    NONE => (NONE, queue)
+  | SOME (result as (task, _, _)) =>
+      let val jobs' = set_job task (Running thread) jobs
+      in (SOME result, make_queue groups jobs') end);
 
 
 (* dequeue_towards -- adhoc dependencies *)
@@ -228,13 +232,7 @@
 
 fun dequeue_towards thread deps (queue as Queue {groups, jobs}) =
   let
-    fun ready task =
-      (case Task_Graph.get_node jobs task of
-        (group, Job list) =>
-          if is_ready (get_deps jobs task) group
-          then SOME (task, group, rev list)
-          else NONE
-      | _ => NONE);
+    fun ready task = ready_job task (Task_Graph.get_entry jobs task);
     val tasks = filter (can (Task_Graph.get_node jobs)) deps;
     fun result (res as (task, _, _)) =
       let val jobs' = set_job task (Running thread) jobs