# HG changeset patch # User wenzelm # Date 1231245797 -3600 # Node ID 5c5bc17d9135dd2e908b63315b1443188f48eef0 # Parent 642cac18e155add9a94bd6a1bf75632f865dc79d added is_valid; added extend: support bulk jobs; tuned; diff -r 642cac18e155 -r 5c5bc17d9135 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Mon Jan 05 18:13:26 2009 +0100 +++ b/src/Pure/Concurrent/task_queue.ML Tue Jan 06 13:43:17 2009 +0100 @@ -13,16 +13,18 @@ type group val eq_group: group * group -> bool val new_group: unit -> group + val is_valid: group -> bool val invalidate_group: group -> unit val str_of_group: group -> string type queue val empty: queue val is_empty: queue -> bool val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue + val extend: task -> (bool -> bool) -> queue -> queue option val depend: task list -> task -> queue -> queue - val dequeue: queue -> (task * group * (unit -> bool)) option * queue + val dequeue: queue -> (task * group * (bool -> bool) list) option * queue val dequeue_towards: task list -> queue -> - (((task * group * (unit -> bool)) * task list) option * queue) + (((task * group * (bool -> bool) list) * task list) option * queue) val interrupt: queue -> task -> unit val interrupt_external: queue -> string -> unit val cancel: queue -> group -> bool @@ -52,6 +54,7 @@ fun new_group () = Group (serial (), ref true); +fun is_valid (Group (_, ref ok)) = ok; fun invalidate_group (Group (_, ok)) = ok := false; fun str_of_group (Group (i, ref ok)) = @@ -61,14 +64,14 @@ (* jobs *) datatype job = - Job of bool -> bool | + Job of (bool -> bool) list | Running of Thread.thread; type jobs = (group * job) Task_Graph.T; fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task); fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task); -fun map_job task f (jobs: jobs) = Task_Graph.map_node task (apsnd f) jobs; +fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs; fun add_job task dep (jobs: jobs) = Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; @@ -96,9 +99,14 @@ val task = new_task pri; val groups' = Inttab.cons_list (gid, task) groups; val jobs' = jobs - |> Task_Graph.new_node (task, (group, Job job)) |> fold (add_job task) deps; + |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps; in (task, make_queue groups' jobs') end; +fun extend task job (Queue {groups, jobs}) = + (case try (get_job jobs) task of + SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs)) + | _ => NONE); + fun depend deps task (Queue {groups, jobs}) = make_queue groups (fold (add_job_acyclic task) deps jobs); @@ -109,14 +117,13 @@ fun dequeue_result NONE queue = (NONE, queue) | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) = - (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs)); + (SOME result, make_queue groups (set_job task (Running (Thread.self ())) jobs)); in fun dequeue (queue as Queue {jobs, ...}) = let - fun ready (task, ((group as Group (_, ref ok), Job job), ([], _))) = - SOME (task, group, (fn () => job ok)) + fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list) | ready _ = NONE; in dequeue_result (Task_Graph.get_first ready jobs) queue end; @@ -126,8 +133,8 @@ fun ready task = (case Task_Graph.get_node jobs task of - (group as Group (_, ref ok), Job job) => - if null (Task_Graph.imm_preds jobs task) then SOME (task, group, (fn () => job ok)) + (group, Job list) => + if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list) else NONE | _ => NONE); in