# HG changeset patch # User wenzelm # Date 1296507289 -3600 # Node ID 2f70b1ddd09fe2cb7d34307bc366665e8acfdc09 # Parent 5ffa2cf4cceda1ce8b5d74aa8dfab2280e5ab2dd more direct Future.bulk, which potentially reduces overhead for Par_List; tuned signature; diff -r 5ffa2cf4cced -r 2f70b1ddd09f src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Mon Jan 31 17:19:23 2011 +0100 +++ b/src/Pure/Concurrent/future.ML Mon Jan 31 21:54:49 2011 +0100 @@ -44,9 +44,7 @@ val group_of: 'a future -> group val peek: 'a future -> 'a Exn.result option val is_finished: 'a future -> bool - val fork_group: group -> (unit -> 'a) -> 'a future - val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future - val fork_deps: 'b future list -> (unit -> 'a) -> 'a future + val bulk: {group: group option, deps: task list, pri: int} -> (unit -> 'a) list -> 'a future list val fork_pri: int -> (unit -> 'a) -> 'a future val fork: (unit -> 'a) -> 'a future val join_results: 'a future list -> 'a Exn.result list @@ -401,27 +399,32 @@ (* fork *) -fun fork_future opt_group deps pri e = +fun bulk {group, deps, pri} es = let - val group = - (case opt_group of + val grp = + (case group of NONE => worker_subgroup () - | SOME group => group); - val (result, job) = future_job group e; - val task = SYNCHRONIZED "enqueue" (fn () => + | SOME grp => grp); + fun enqueue e (minimal, queue) = let - val (task, minimal) = - Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job); + val (result, job) = future_job grp e; + val ((task, minimal'), queue') = Task_Queue.enqueue grp deps pri job queue; + val future = Future {promised = false, task = task, group = grp, result = result}; + in (future, (minimal orelse minimal', queue')) end; + in + SYNCHRONIZED "enqueue" (fn () => + let + val (futures, minimal) = + Unsynchronized.change_result queue (fn q => + let val (futures, (minimal, q')) = fold_map enqueue es (false, q) + in ((futures, minimal), q') end); val _ = if minimal then signal work_available else (); val _ = scheduler_check (); - in task end); - in Future {promised = false, task = task, group = group, result = result} end; + in futures end) + end; -fun fork_group group e = fork_future (SOME group) [] 0 e; -fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e; -fun fork_deps deps e = fork_deps_pri deps 0 e; -fun fork_pri pri e = fork_deps_pri [] pri e; -fun fork e = fork_deps [] e; +fun fork_pri pri e = singleton (bulk {group = NONE, deps = [], pri = pri}) e; +fun fork e = fork_pri 0 e; (* join *) @@ -495,7 +498,9 @@ | NONE => false)); in if extended then Future {promised = false, task = task, group = group, result = result} - else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) + else + singleton (bulk {group = SOME group, deps = [task], pri = Task_Queue.pri_of_task task}) + (fn () => f (join x)) end; diff -r 5ffa2cf4cced -r 2f70b1ddd09f src/Pure/Concurrent/par_list.ML --- a/src/Pure/Concurrent/par_list.ML Mon Jan 31 17:19:23 2011 +0100 +++ b/src/Pure/Concurrent/par_list.ML Mon Jan 31 21:54:49 2011 +0100 @@ -29,11 +29,11 @@ fun managed_results f xs = if Multithreading.enabled () andalso not (Multithreading.self_critical ()) then let - val shared_group = Task_Queue.new_group (Future.worker_group ()); - val results = - Future.join_results (map (fn x => Future.fork_group shared_group (fn () => f x)) xs) - handle exn => - (if Exn.is_interrupt exn then Future.cancel_group shared_group else (); reraise exn); + val group = Task_Queue.new_group (Future.worker_group ()); + val futures = + Future.bulk {group = SOME group, deps = [], pri = 0} (map (fn x => fn () => f x) xs); + val results = Future.join_results futures + handle exn => (if Exn.is_interrupt exn then Future.cancel_group group else (); reraise exn); in results end else map (Exn.capture f) xs; diff -r 5ffa2cf4cced -r 2f70b1ddd09f src/Pure/PIDE/document.ML --- a/src/Pure/PIDE/document.ML Mon Jan 31 17:19:23 2011 +0100 +++ b/src/Pure/PIDE/document.ML Mon Jan 31 21:54:49 2011 +0100 @@ -208,7 +208,7 @@ fun async_state tr st = ignore - (Future.fork_group (Task_Queue.new_group NONE) + (singleton (Future.bulk {group = SOME (Task_Queue.new_group NONE), deps = [], pri = 0}) (fn () => Toplevel.setmp_thread_position tr (fn () => Toplevel.print_state false st) ())); @@ -337,14 +337,14 @@ val _ = cancel state; val execution' = (* FIXME proper node deps *) - [Future.fork_pri 1 (fn () => + Future.bulk {group = NONE, deps = [], pri = 1} [fn () => let val _ = await_cancellation state; val _ = node_names_of version |> List.app (fn name => fold_entries NONE (fn (_, exec) => fn () => force_exec exec) (get_node version name) ()); - in () end)]; + in () end]; val _ = await_cancellation state; (* FIXME async!? *) diff -r 5ffa2cf4cced -r 2f70b1ddd09f src/Pure/Thy/thy_info.ML --- a/src/Pure/Thy/thy_info.ML Mon Jan 31 17:19:23 2011 +0100 +++ b/src/Pure/Thy/thy_info.ML Mon Jan 31 21:54:49 2011 +0100 @@ -184,11 +184,13 @@ val deps = map (`get) (Graph.imm_preds task_graph name); fun failed (future, parent) = if can Future.join future then NONE else SOME parent; - val future = Future.fork_deps (map #1 deps) (fn () => - (case map_filter failed deps of - [] => body (map (#1 o Future.join o get) parents) - | bad => error (loader_msg - ("failed to load " ^ quote name ^ " (unresolved " ^ commas_quote bad ^ ")") []))); + val future = + singleton (Future.bulk {group = NONE, deps = map (Future.task_of o #1) deps, pri = 0}) + (fn () => + (case map_filter failed deps of + [] => body (map (#1 o Future.join o get) parents) + | bad => error (loader_msg + ("failed to load " ^ quote name ^ " (unresolved " ^ commas_quote bad ^ ")") []))); in Symtab.update (name, future) tab end | Finished thy => Symtab.update (name, Future.value (thy, I)) tab)); val _ = diff -r 5ffa2cf4cced -r 2f70b1ddd09f src/Pure/proofterm.ML --- a/src/Pure/proofterm.ML Mon Jan 31 17:19:23 2011 +0100 +++ b/src/Pure/proofterm.ML Mon Jan 31 21:54:49 2011 +0100 @@ -1389,8 +1389,11 @@ if not (Multithreading.enabled ()) then Future.value (postproc (Future.join body)) else Future.map postproc body | fulfill_proof_future thy promises postproc body = - Future.fork_deps (body :: map snd promises) (fn () => - postproc (fulfill_norm_proof thy (map (apsnd Future.join) promises) (Future.join body))); + singleton + (Future.bulk {group = NONE, + deps = Future.task_of body :: map (Future.task_of o snd) promises, pri = 0}) + (fn () => + postproc (fulfill_norm_proof thy (map (apsnd Future.join) promises) (Future.join body))); (***** abstraction over sort constraints *****)