--- a/src/Pure/Concurrent/task_queue.ML Wed Jan 06 15:07:56 2010 +0100
+++ b/src/Pure/Concurrent/task_queue.ML Wed Jan 06 18:14:16 2010 +0100
@@ -22,8 +22,8 @@
val empty: queue
val all_passive: queue -> bool
val status: queue -> {ready: int, pending: int, running: int, passive: int}
- val cancel: group -> queue -> bool * queue
- val cancel_all: queue -> group list * queue
+ val cancel: queue -> group -> bool
+ val cancel_all: queue -> group list
val enqueue_passive: group -> queue -> task * queue
val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
val extend: task -> (bool -> bool) -> queue -> queue option
@@ -118,16 +118,13 @@
(* queue of grouped jobs *)
-datatype result = Unknown | Result of task | No_Result;
-
datatype queue = Queue of
{groups: task list Inttab.table, (*groups with presently active members*)
- jobs: jobs, (*job dependency graph*)
- cache: result}; (*last dequeue result*)
+ jobs: jobs}; (*job dependency graph*)
-fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache};
+fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
-val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
+val empty = make_queue Inttab.empty Task_Graph.empty;
fun all_passive (Queue {jobs, ...}) =
Task_Graph.get_first NONE
@@ -150,16 +147,16 @@
(* cancel -- peers and sub-groups *)
-fun cancel group (Queue {groups, jobs, ...}) =
+fun cancel (Queue {groups, jobs}) group =
let
val _ = cancel_group group Exn.Interrupt;
val tasks = Inttab.lookup_list groups (group_id group);
val running =
fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
val _ = List.app SimpleThread.interrupt running;
- in (null running, make_queue groups jobs Unknown) end;
+ in null running end;
-fun cancel_all (Queue {groups, jobs, ...}) =
+fun cancel_all (Queue {groups, jobs}) =
let
fun cancel_job (group, job) (groups, running) =
(cancel_group group Exn.Interrupt;
@@ -168,20 +165,20 @@
| _ => (groups, running)));
val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
val _ = List.app SimpleThread.interrupt running;
- in (running_groups, make_queue groups jobs Unknown) end;
+ in running_groups end;
(* enqueue *)
-fun enqueue_passive group (Queue {groups, jobs, cache}) =
+fun enqueue_passive group (Queue {groups, jobs}) =
let
val task = new_task NONE;
val groups' = groups
|> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive));
- in (task, make_queue groups' jobs' cache) end;
+ in (task, make_queue groups' jobs') end;
-fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
+fun enqueue group deps pri job (Queue {groups, jobs}) =
let
val task = new_task (SOME pri);
val groups' = groups
@@ -191,49 +188,36 @@
|> fold (add_job task) deps
|> fold (fold (add_job task) o get_deps jobs) deps;
val minimal = null (get_deps jobs' task);
- val cache' =
- (case cache of
- Result last =>
- if task_ord (last, task) = LESS
- then cache else Unknown
- | _ => Unknown);
- in ((task, minimal), make_queue groups' jobs' cache') end;
+ in ((task, minimal), make_queue groups' jobs') end;
-fun extend task job (Queue {groups, jobs, cache}) =
+fun extend task job (Queue {groups, jobs}) =
(case try (get_job jobs) task of
- SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs) cache)
+ SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs))
| _ => NONE);
(* dequeue *)
-fun dequeue thread (queue as Queue {groups, jobs, cache}) =
+fun dequeue thread (queue as Queue {groups, jobs}) =
let
fun ready (task, ((group, Job list), (deps, _))) =
if is_ready deps group then SOME (task, group, rev list) else NONE
| ready _ = NONE;
- fun deq boundary =
- (case Task_Graph.get_first boundary ready jobs of
- NONE => (NONE, make_queue groups jobs No_Result)
- | SOME (result as (task, _, _)) =>
- let
- val jobs' = set_job task (Running thread) jobs;
- val cache' = Result task;
- in (SOME result, make_queue groups jobs' cache') end);
in
- (case cache of
- Unknown => deq NONE
- | Result last => deq (SOME last)
- | No_Result => (NONE, queue))
+ (case Task_Graph.get_first NONE ready jobs of
+ NONE => (NONE, queue)
+ | SOME (result as (task, _, _)) =>
+ let val jobs' = set_job task (Running thread) jobs
+ in (SOME result, make_queue groups jobs') end)
end;
(* dequeue_towards -- adhoc dependencies *)
-fun depend task deps (Queue {groups, jobs, ...}) =
- make_queue groups (fold (add_dep task) deps jobs) Unknown;
+fun depend task deps (Queue {groups, jobs}) =
+ make_queue groups (fold (add_dep task) deps jobs);
-fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) =
+fun dequeue_towards thread deps (queue as Queue {groups, jobs}) =
let
fun ready task =
(case Task_Graph.get_node jobs task of
@@ -244,10 +228,8 @@
| _ => NONE);
val tasks = filter (can (Task_Graph.get_node jobs)) deps;
fun result (res as (task, _, _)) =
- let
- val jobs' = set_job task (Running thread) jobs;
- val cache' = Unknown;
- in ((SOME res, tasks), make_queue groups jobs' cache') end;
+ let val jobs' = set_job task (Running thread) jobs
+ in ((SOME res, tasks), make_queue groups jobs') end;
in
(case get_first ready tasks of
SOME res => result res
@@ -260,14 +242,13 @@
(* finish *)
-fun finish task (Queue {groups, jobs, cache}) =
+fun finish task (Queue {groups, jobs}) =
let
val group = get_group jobs task;
val groups' = groups
|> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group);
val jobs' = Task_Graph.del_node task jobs;
val maximal = null (Task_Graph.imm_succs jobs task);
- val cache' = if maximal then cache else Unknown;
- in (maximal, make_queue groups' jobs' cache') end;
+ in (maximal, make_queue groups' jobs') end;
end;