# HG changeset patch # User wenzelm # Date 1262786876 -3600 # Node ID 02936e77a07c5c8a68d7384b7264ec791444185a # Parent 228f274691396ae9dc8dd92cbed99ead72262c0d tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity); shutdown: back to synchronous wait, which means no asynchronous interrupts within the loop; diff -r 228f27469139 -r 02936e77a07c src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Wed Jan 06 13:14:28 2010 +0100 +++ b/src/Pure/Concurrent/future.ML Wed Jan 06 15:07:56 2010 +0100 @@ -126,9 +126,6 @@ fun SYNCHRONIZED name = SimpleThread.synchronized name lock; -fun raw_wait cond = (*requires SYNCHRONIZED*) - ConditionVar.wait (cond, lock); - fun wait cond = (*requires SYNCHRONIZED*) Multithreading.sync_wait NONE NONE cond lock; @@ -181,7 +178,10 @@ in assign_result group result res end; in (result, job) end; -fun do_cancel group = (*requires SYNCHRONIZED*) +fun cancel_now group = (*requires SYNCHRONIZED*) + Unsynchronized.change_result queue (Task_Queue.cancel group); + +fun cancel_later group = (*requires SYNCHRONIZED*) (Unsynchronized.change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event); @@ -195,8 +195,8 @@ val maximal = Unsynchronized.change_result queue (Task_Queue.finish task); val _ = if ok then () - else if Task_Queue.cancel (! queue) group then () - else do_cancel group; + else if cancel_now group then () + else cancel_later group; val _ = broadcast work_finished; val _ = if maximal then () else signal work_available; in () end); @@ -332,7 +332,7 @@ else (Multithreading.tracing 1 (fn () => string_of_int (length (! canceled)) ^ " canceled groups"); - Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue))); + Unsynchronized.change canceled (filter_out cancel_now); broadcast_work ()); @@ -351,7 +351,8 @@ in continue end handle Exn.Interrupt => (Multithreading.tracing 1 (fn () => "Interrupt"); - List.app do_cancel (Task_Queue.cancel_all (! queue)); true); + List.app cancel_later (Unsynchronized.change_result queue Task_Queue.cancel_all); + broadcast_work (); true); fun scheduler_loop () = while @@ -504,7 +505,7 @@ else interruptible f x; (*cancel: present and future group members will be interrupted eventually*) -fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group); +fun cancel_group group = SYNCHRONIZED "cancel" (fn () => cancel_later group); fun cancel x = cancel_group (group_of x); @@ -514,7 +515,7 @@ if Multithreading.available then SYNCHRONIZED "shutdown" (fn () => while scheduler_active () do - (raw_wait scheduler_event; broadcast_work ())) + (wait scheduler_event; broadcast_work ())) else (); diff -r 228f27469139 -r 02936e77a07c src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Wed Jan 06 13:14:28 2010 +0100 +++ b/src/Pure/Concurrent/task_queue.ML Wed Jan 06 15:07:56 2010 +0100 @@ -22,8 +22,8 @@ val empty: queue val all_passive: queue -> bool val status: queue -> {ready: int, pending: int, running: int, passive: int} - val cancel: queue -> group -> bool - val cancel_all: queue -> group list + val cancel: group -> queue -> bool * queue + val cancel_all: queue -> group list * queue val enqueue_passive: group -> queue -> task * queue val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue val extend: task -> (bool -> bool) -> queue -> queue option @@ -81,6 +81,8 @@ not (null (Synchronized.value status)) orelse (case parent of NONE => false | SOME group => is_canceled group); +fun is_ready deps group = null deps orelse is_canceled group; + fun group_status (Group {parent, status, ...}) = Synchronized.value status @ (case parent of NONE => [] | SOME group => group_status group); @@ -137,9 +139,9 @@ fun status (Queue {jobs, ...}) = let val (x, y, z, w) = - Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) => + Task_Graph.fold (fn (_, ((group, job), (deps, _))) => fn (x, y, z, w) => (case job of - Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w) + Job _ => if is_ready deps group then (x + 1, y, z, w) else (x, y + 1, z, w) | Running _ => (x, y, z + 1, w) | Passive => (x, y, z, w + 1))) jobs (0, 0, 0, 0); @@ -148,16 +150,16 @@ (* cancel -- peers and sub-groups *) -fun cancel (Queue {groups, jobs, ...}) group = +fun cancel group (Queue {groups, jobs, ...}) = let val _ = cancel_group group Exn.Interrupt; val tasks = Inttab.lookup_list groups (group_id group); val running = fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks []; val _ = List.app SimpleThread.interrupt running; - in null running end; + in (null running, make_queue groups jobs Unknown) end; -fun cancel_all (Queue {jobs, ...}) = +fun cancel_all (Queue {groups, jobs, ...}) = let fun cancel_job (group, job) (groups, running) = (cancel_group group Exn.Interrupt; @@ -166,7 +168,7 @@ | _ => (groups, running))); val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []); val _ = List.app SimpleThread.interrupt running; - in running_groups end; + in (running_groups, make_queue groups jobs Unknown) end; (* enqueue *) @@ -207,7 +209,8 @@ fun dequeue thread (queue as Queue {groups, jobs, cache}) = let - fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list) + fun ready (task, ((group, Job list), (deps, _))) = + if is_ready deps group then SOME (task, group, rev list) else NONE | ready _ = NONE; fun deq boundary = (case Task_Graph.get_first boundary ready jobs of @@ -235,7 +238,7 @@ fun ready task = (case Task_Graph.get_node jobs task of (group, Job list) => - if null (get_deps jobs task) + if is_ready (get_deps jobs task) group then SOME (task, group, rev list) else NONE | _ => NONE);