# HG changeset patch # User wenzelm # Date 1262731090 -3600 # Node ID 7325a5e3587fe4e8b27e6151926f7b114e081609 # Parent 12436485c2446c2ffe17a629d935381c1ad200b4 added Future.promise/fulfill -- promised futures that are fulfilled by external means; Future.value: official result assignment -- produces immutable ref; Future.shutdown: raw_wait keeps raw task attributes, e.g. asynchronous interrupts of toplevel; Task_Queue: passive tasks track dependencies, but lack any evaluation process; tuned; diff -r 12436485c244 -r 7325a5e3587f src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Tue Jan 05 18:20:18 2010 +0100 +++ b/src/Pure/Concurrent/future.ML Tue Jan 05 23:38:10 2010 +0100 @@ -23,6 +23,10 @@ of runtime resources is distorted either if workers yield CPU time (e.g. via system sleep or wait operations), or if non-worker threads contend for significant runtime resources independently. + + * Promised futures are fulfilled by external means. There is no + associated evaluation task, but other futures can depend on them + as usual. *) signature FUTURE = @@ -37,7 +41,6 @@ val group_of: 'a future -> group val peek: 'a future -> 'a Exn.result option val is_finished: 'a future -> bool - val value: 'a -> 'a future val fork_group: group -> (unit -> 'a) -> 'a future val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future val fork_deps: 'b future list -> (unit -> 'a) -> 'a future @@ -46,7 +49,12 @@ val join_results: 'a future list -> 'a Exn.result list val join_result: 'a future -> 'a Exn.result val join: 'a future -> 'a + val value: 'a -> 'a future val map: ('a -> 'b) -> 'a future -> 'b future + val promise_group: group -> 'a future + val promise: unit -> 'a future + val fulfill_result: 'a future -> 'a Exn.result -> unit + val fulfill: 'a future -> 'a -> unit val interruptible_task: ('a -> 'b) -> 'a -> 'b val cancel_group: group -> unit val cancel: 'a future -> unit @@ -75,11 +83,14 @@ val worker_task = Option.map #1 o thread_data; val worker_group = Option.map #2 o thread_data; +fun new_group () = Task_Queue.new_group (worker_group ()); + (* datatype future *) datatype 'a future = Future of - {task: task, + {promised: bool, + task: task, group: group, result: 'a Exn.result option Synchronized.var}; @@ -90,10 +101,14 @@ fun peek x = Synchronized.value (result_of x); fun is_finished x = is_some (peek x); -fun value x = Future - {task = Task_Queue.new_task 0, - group = Task_Queue.new_group NONE, - result = Synchronized.var "future" (SOME (Exn.Result x))}; +fun assign_result group result res = + let + val _ = Synchronized.assign result (K (SOME res)); + val ok = + (case res of + Exn.Exn exn => (Task_Queue.cancel_group group exn; false) + | Exn.Result _ => true); + in ok end; @@ -111,6 +126,9 @@ fun SYNCHRONIZED name = SimpleThread.synchronized name lock; +fun raw_wait cond = (*requires SYNCHRONIZED*) + ConditionVar.wait (cond, lock); + fun wait cond = (*requires SYNCHRONIZED*) Multithreading.sync_wait NONE NONE cond lock; @@ -160,12 +178,7 @@ Exn.capture (fn () => Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) () else Exn.Exn Exn.Interrupt; - val _ = Synchronized.assign result (K (SOME res)); - in - (case res of - Exn.Exn exn => (Task_Queue.cancel_group group exn; false) - | Exn.Result _ => true) - end; + in assign_result group result res end; in (result, job) end; fun do_cancel group = (*requires SYNCHRONIZED*) @@ -247,7 +260,7 @@ if tick andalso ! status_ticks = 0 then Multithreading.tracing 1 (fn () => let - val {ready, pending, running} = Task_Queue.status (! queue); + val {ready, pending, running, passive} = Task_Queue.status (! queue); val total = length (! workers); val active = count_workers Working; val waiting = count_workers Waiting; @@ -255,7 +268,8 @@ "SCHEDULE " ^ Time.toString now ^ ": " ^ string_of_int ready ^ " ready, " ^ string_of_int pending ^ " pending, " ^ - string_of_int running ^ " running; " ^ + string_of_int running ^ " running, " ^ + string_of_int passive ^ " passive; " ^ string_of_int total ^ " workers, " ^ string_of_int active ^ " active, " ^ string_of_int waiting ^ " waiting " @@ -329,7 +343,7 @@ (* shutdown *) - val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else (); + val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else (); val continue = not (! do_shutdown andalso null (! workers)); val _ = if continue then () else scheduler := NONE; @@ -364,8 +378,8 @@ let val group = (case opt_group of - SOME group => group - | NONE => Task_Queue.new_group (worker_group ())); + NONE => new_group () + | SOME group => group); val (result, job) = future_job group e; val task = SYNCHRONIZED "enqueue" (fn () => let @@ -374,7 +388,7 @@ val _ = if minimal then signal work_available else (); val _ = scheduler_check (); in task end); - in Future {task = task, group = group, result = result} end; + in Future {promised = false, task = task, group = group, result = result} end; fun fork_group group e = fork_future (SOME group) [] 0 e; fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e; @@ -432,7 +446,14 @@ fun join x = Exn.release (join_result x); -(* map *) +(* fast-path versions -- bypassing full task management *) + +fun value (x: 'a) = + let + val group = Task_Queue.new_group NONE; + val result = Synchronized.var "value" NONE : 'a Exn.result option Synchronized.var; + val _ = assign_result group result (Exn.Result x); + in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end; fun map_future f x = let @@ -445,11 +466,32 @@ SOME queue' => (queue := queue'; true) | NONE => false)); in - if extended then Future {task = task, group = group, result = result} + if extended then Future {promised = false, task = task, group = group, result = result} else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) end; +(* promised futures -- fulfilled by external means *) + +fun promise_group group : 'a future = + let + val result = Synchronized.var "promise" (NONE: 'a Exn.result option); + val task = SYNCHRONIZED "enqueue" (fn () => + Unsynchronized.change_result queue (Task_Queue.enqueue_passive group)); + in Future {promised = true, task = task, group = group, result = result} end; + +fun promise () = promise_group (new_group ()); + +fun fulfill_result (Future {promised, task, group, result}) res = + let + val _ = promised orelse raise Fail "Not a promised future"; + fun job ok = assign_result group result (if ok then res else Exn.Exn Exn.Interrupt); + val _ = execute (task, group, [job]); + in () end; + +fun fulfill x res = fulfill_result x (Exn.Result res); + + (* cancellation *) fun interruptible_task f x = @@ -472,7 +514,7 @@ if Multithreading.available then SYNCHRONIZED "shutdown" (fn () => while scheduler_active () do - (wait scheduler_event; broadcast_work ())) + (raw_wait scheduler_event; broadcast_work ())) else (); diff -r 12436485c244 -r 7325a5e3587f src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Tue Jan 05 18:20:18 2010 +0100 +++ b/src/Pure/Concurrent/task_queue.ML Tue Jan 05 23:38:10 2010 +0100 @@ -7,7 +7,7 @@ signature TASK_QUEUE = sig type task - val new_task: int -> task + val dummy_task: task val pri_of_task: task -> int val str_of_task: task -> string type group @@ -20,10 +20,11 @@ val str_of_group: group -> string type queue val empty: queue - val is_empty: queue -> bool - val status: queue -> {ready: int, pending: int, running: int} + val all_passive: queue -> bool + val status: queue -> {ready: int, pending: int, running: int, passive: int} val cancel: queue -> group -> bool val cancel_all: queue -> group list + val enqueue_passive: group -> queue -> task * queue val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue val extend: task -> (bool -> bool) -> queue -> queue option val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue @@ -38,13 +39,14 @@ (* tasks *) -datatype task = Task of int * serial; +datatype task = Task of int option * serial; +val dummy_task = Task (NONE, ~1); fun new_task pri = Task (pri, serial ()); -fun pri_of_task (Task (pri, _)) = pri; +fun pri_of_task (Task (pri, _)) = the_default 0 pri; fun str_of_task (Task (_, i)) = string_of_int i; -fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2); +fun task_ord (Task t1, Task t2) = prod_ord (rev_order o option_ord int_ord) int_ord (t1, t2); structure Task_Graph = Graph(type key = task val ord = task_ord); @@ -91,7 +93,8 @@ datatype job = Job of (bool -> bool) list | - Running of Thread.thread; + Running of Thread.thread | + Passive; type jobs = (group * job) Task_Graph.T; @@ -123,20 +126,24 @@ fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache}; val empty = make_queue Inttab.empty Task_Graph.empty No_Result; -fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs; + +fun all_passive (Queue {jobs, ...}) = + Task_Graph.get_first NONE + ((fn Job _ => SOME () | Running _ => SOME () | Passive => NONE) o #2 o #1 o #2) jobs |> is_none; (* queue status *) fun status (Queue {jobs, ...}) = let - val (x, y, z) = - Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z) => + val (x, y, z, w) = + Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) => (case job of - Job _ => if null deps then (x + 1, y, z) else (x, y + 1, z) - | Running _ => (x, y, z + 1))) - jobs (0, 0, 0); - in {ready = x, pending = y, running = z} end; + Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w) + | Running _ => (x, y, z + 1, w) + | Passive => (x, y, z, w + 1))) + jobs (0, 0, 0, 0); + in {ready = x, pending = y, running = z, passive = w} end; (* cancel -- peers and sub-groups *) @@ -154,18 +161,27 @@ let fun cancel_job (group, job) (groups, running) = (cancel_group group Exn.Interrupt; - (case job of Running t => (insert eq_group group groups, insert Thread.equal t running) + (case job of + Running t => (insert eq_group group groups, insert Thread.equal t running) | _ => (groups, running))); - val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []); + val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []); val _ = List.app SimpleThread.interrupt running; - in groups end; + in running_groups end; (* enqueue *) +fun enqueue_passive group (Queue {groups, jobs, cache}) = + let + val task = new_task NONE; + val groups' = groups + |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group); + val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive)); + in (task, make_queue groups' jobs' cache) end; + fun enqueue group deps pri job (Queue {groups, jobs, cache}) = let - val task = new_task pri; + val task = new_task (SOME pri); val groups' = groups |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group); val jobs' = jobs