--- a/src/Pure/Concurrent/future.ML Wed Jan 06 13:14:28 2010 +0100
+++ b/src/Pure/Concurrent/future.ML Wed Jan 06 15:07:56 2010 +0100
@@ -126,9 +126,6 @@
fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
-fun raw_wait cond = (*requires SYNCHRONIZED*)
- ConditionVar.wait (cond, lock);
-
fun wait cond = (*requires SYNCHRONIZED*)
Multithreading.sync_wait NONE NONE cond lock;
@@ -181,7 +178,10 @@
in assign_result group result res end;
in (result, job) end;
-fun do_cancel group = (*requires SYNCHRONIZED*)
+fun cancel_now group = (*requires SYNCHRONIZED*)
+ Unsynchronized.change_result queue (Task_Queue.cancel group);
+
+fun cancel_later group = (*requires SYNCHRONIZED*)
(Unsynchronized.change canceled (insert Task_Queue.eq_group group);
broadcast scheduler_event);
@@ -195,8 +195,8 @@
val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
val _ =
if ok then ()
- else if Task_Queue.cancel (! queue) group then ()
- else do_cancel group;
+ else if cancel_now group then ()
+ else cancel_later group;
val _ = broadcast work_finished;
val _ = if maximal then () else signal work_available;
in () end);
@@ -332,7 +332,7 @@
else
(Multithreading.tracing 1 (fn () =>
string_of_int (length (! canceled)) ^ " canceled groups");
- Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
+ Unsynchronized.change canceled (filter_out cancel_now);
broadcast_work ());
@@ -351,7 +351,8 @@
in continue end
handle Exn.Interrupt =>
(Multithreading.tracing 1 (fn () => "Interrupt");
- List.app do_cancel (Task_Queue.cancel_all (! queue)); true);
+ List.app cancel_later (Unsynchronized.change_result queue Task_Queue.cancel_all);
+ broadcast_work (); true);
fun scheduler_loop () =
while
@@ -504,7 +505,7 @@
else interruptible f x;
(*cancel: present and future group members will be interrupted eventually*)
-fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
+fun cancel_group group = SYNCHRONIZED "cancel" (fn () => cancel_later group);
fun cancel x = cancel_group (group_of x);
@@ -514,7 +515,7 @@
if Multithreading.available then
SYNCHRONIZED "shutdown" (fn () =>
while scheduler_active () do
- (raw_wait scheduler_event; broadcast_work ()))
+ (wait scheduler_event; broadcast_work ()))
else ();
--- a/src/Pure/Concurrent/task_queue.ML Wed Jan 06 13:14:28 2010 +0100
+++ b/src/Pure/Concurrent/task_queue.ML Wed Jan 06 15:07:56 2010 +0100
@@ -22,8 +22,8 @@
val empty: queue
val all_passive: queue -> bool
val status: queue -> {ready: int, pending: int, running: int, passive: int}
- val cancel: queue -> group -> bool
- val cancel_all: queue -> group list
+ val cancel: group -> queue -> bool * queue
+ val cancel_all: queue -> group list * queue
val enqueue_passive: group -> queue -> task * queue
val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
val extend: task -> (bool -> bool) -> queue -> queue option
@@ -81,6 +81,8 @@
not (null (Synchronized.value status)) orelse
(case parent of NONE => false | SOME group => is_canceled group);
+fun is_ready deps group = null deps orelse is_canceled group;
+
fun group_status (Group {parent, status, ...}) =
Synchronized.value status @
(case parent of NONE => [] | SOME group => group_status group);
@@ -137,9 +139,9 @@
fun status (Queue {jobs, ...}) =
let
val (x, y, z, w) =
- Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) =>
+ Task_Graph.fold (fn (_, ((group, job), (deps, _))) => fn (x, y, z, w) =>
(case job of
- Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w)
+ Job _ => if is_ready deps group then (x + 1, y, z, w) else (x, y + 1, z, w)
| Running _ => (x, y, z + 1, w)
| Passive => (x, y, z, w + 1)))
jobs (0, 0, 0, 0);
@@ -148,16 +150,16 @@
(* cancel -- peers and sub-groups *)
-fun cancel (Queue {groups, jobs, ...}) group =
+fun cancel group (Queue {groups, jobs, ...}) =
let
val _ = cancel_group group Exn.Interrupt;
val tasks = Inttab.lookup_list groups (group_id group);
val running =
fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
val _ = List.app SimpleThread.interrupt running;
- in null running end;
+ in (null running, make_queue groups jobs Unknown) end;
-fun cancel_all (Queue {jobs, ...}) =
+fun cancel_all (Queue {groups, jobs, ...}) =
let
fun cancel_job (group, job) (groups, running) =
(cancel_group group Exn.Interrupt;
@@ -166,7 +168,7 @@
| _ => (groups, running)));
val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
val _ = List.app SimpleThread.interrupt running;
- in running_groups end;
+ in (running_groups, make_queue groups jobs Unknown) end;
(* enqueue *)
@@ -207,7 +209,8 @@
fun dequeue thread (queue as Queue {groups, jobs, cache}) =
let
- fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
+ fun ready (task, ((group, Job list), (deps, _))) =
+ if is_ready deps group then SOME (task, group, rev list) else NONE
| ready _ = NONE;
fun deq boundary =
(case Task_Graph.get_first boundary ready jobs of
@@ -235,7 +238,7 @@
fun ready task =
(case Task_Graph.get_node jobs task of
(group, Job list) =>
- if null (get_deps jobs task)
+ if is_ready (get_deps jobs task) group
then SOME (task, group, rev list)
else NONE
| _ => NONE);