--- a/src/Pure/Concurrent/task_queue.ML Tue Jul 20 16:42:48 2010 +0200
+++ b/src/Pure/Concurrent/task_queue.ML Tue Jul 20 17:35:42 2010 +0200
@@ -24,7 +24,7 @@
val status: queue -> {ready: int, pending: int, running: int, passive: int}
val cancel: queue -> group -> bool
val cancel_all: queue -> group list
- val enqueue_passive: group -> queue -> task * queue
+ val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
val extend: task -> (bool -> bool) -> queue -> queue option
val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
@@ -88,8 +88,6 @@
not (null (Synchronized.value status)) orelse
(case parent of NONE => false | SOME group => is_canceled group);
-fun is_ready deps group = null deps orelse is_canceled group;
-
fun group_status (Group {parent, status, ...}) =
Synchronized.value status @
(case parent of NONE => [] | SOME group => group_status group);
@@ -105,7 +103,7 @@
datatype job =
Job of (bool -> bool) list |
Running of Thread.thread |
- Passive;
+ Passive of unit -> bool;
type jobs = (group * job) Task_Graph.T;
@@ -135,9 +133,21 @@
val empty = make_queue Inttab.empty Task_Graph.empty;
+
+(* job status *)
+
+fun ready_job task ((group, Job list), ([], _)) = SOME (task, group, rev list)
+ | ready_job task ((group, Passive abort), ([], _)) =
+ if is_canceled group then SOME (task, group, [fn _ => abort ()])
+ else NONE
+ | ready_job _ _ = NONE;
+
+fun active_job (_, Job _) = SOME ()
+ | active_job (_, Running _) = SOME ()
+ | active_job (group, Passive _) = if is_canceled group then SOME () else NONE;
+
fun all_passive (Queue {jobs, ...}) =
- Task_Graph.get_first
- ((fn Job _ => SOME () | Running _ => SOME () | Passive => NONE) o #2 o #1 o #2) jobs |> is_none;
+ is_none (Task_Graph.get_first (active_job o #1 o #2) jobs);
(* queue status *)
@@ -145,11 +155,11 @@
fun status (Queue {jobs, ...}) =
let
val (x, y, z, w) =
- Task_Graph.fold (fn (_, ((group, job), (deps, _))) => fn (x, y, z, w) =>
+ Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) =>
(case job of
- Job _ => if is_ready deps group then (x + 1, y, z, w) else (x, y + 1, z, w)
+ Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w)
| Running _ => (x, y, z + 1, w)
- | Passive => (x, y, z, w + 1)))
+ | Passive _ => (x, y, z, w + 1)))
jobs (0, 0, 0, 0);
in {ready = x, pending = y, running = z, passive = w} end;
@@ -165,7 +175,7 @@
val _ = List.app Simple_Thread.interrupt running;
in null running end;
-fun cancel_all (Queue {groups, jobs}) =
+fun cancel_all (Queue {jobs, ...}) =
let
fun cancel_job (group, job) (groups, running) =
(cancel_group group Exn.Interrupt;
@@ -179,12 +189,12 @@
(* enqueue *)
-fun enqueue_passive group (Queue {groups, jobs}) =
+fun enqueue_passive group abort (Queue {groups, jobs}) =
let
val task = new_task NONE;
val groups' = groups
|> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
- val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive));
+ val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive abort));
in (task, make_queue groups' jobs') end;
fun enqueue group deps pri job (Queue {groups, jobs}) =
@@ -208,17 +218,11 @@
(* dequeue *)
fun dequeue thread (queue as Queue {groups, jobs}) =
- let
- fun ready (task, ((group, Job list), (deps, _))) =
- if is_ready deps group then SOME (task, group, rev list) else NONE
- | ready _ = NONE;
- in
- (case Task_Graph.get_first ready jobs of
- NONE => (NONE, queue)
- | SOME (result as (task, _, _)) =>
- let val jobs' = set_job task (Running thread) jobs
- in (SOME result, make_queue groups jobs') end)
- end;
+ (case Task_Graph.get_first (uncurry ready_job) jobs of
+ NONE => (NONE, queue)
+ | SOME (result as (task, _, _)) =>
+ let val jobs' = set_job task (Running thread) jobs
+ in (SOME result, make_queue groups jobs') end);
(* dequeue_towards -- adhoc dependencies *)
@@ -228,13 +232,7 @@
fun dequeue_towards thread deps (queue as Queue {groups, jobs}) =
let
- fun ready task =
- (case Task_Graph.get_node jobs task of
- (group, Job list) =>
- if is_ready (get_deps jobs task) group
- then SOME (task, group, rev list)
- else NONE
- | _ => NONE);
+ fun ready task = ready_job task (Task_Graph.get_entry jobs task);
val tasks = filter (can (Task_Graph.get_node jobs)) deps;
fun result (res as (task, _, _)) =
let val jobs' = set_job task (Running thread) jobs