more direct Future.bulk, which potentially reduces overhead for Par_List;
authorwenzelm
Mon, 31 Jan 2011 21:54:49 +0100
changeset 41672 2f70b1ddd09f
parent 41671 5ffa2cf4cced
child 41673 1c191a39549f
more direct Future.bulk, which potentially reduces overhead for Par_List; tuned signature;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/par_list.ML
src/Pure/PIDE/document.ML
src/Pure/Thy/thy_info.ML
src/Pure/proofterm.ML
--- a/src/Pure/Concurrent/future.ML	Mon Jan 31 17:19:23 2011 +0100
+++ b/src/Pure/Concurrent/future.ML	Mon Jan 31 21:54:49 2011 +0100
@@ -44,9 +44,7 @@
   val group_of: 'a future -> group
   val peek: 'a future -> 'a Exn.result option
   val is_finished: 'a future -> bool
-  val fork_group: group -> (unit -> 'a) -> 'a future
-  val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
-  val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
+  val bulk: {group: group option, deps: task list, pri: int} -> (unit -> 'a) list -> 'a future list
   val fork_pri: int -> (unit -> 'a) -> 'a future
   val fork: (unit -> 'a) -> 'a future
   val join_results: 'a future list -> 'a Exn.result list
@@ -401,27 +399,32 @@
 
 (* fork *)
 
-fun fork_future opt_group deps pri e =
+fun bulk {group, deps, pri} es =
   let
-    val group =
-      (case opt_group of
+    val grp =
+      (case group of
         NONE => worker_subgroup ()
-      | SOME group => group);
-    val (result, job) = future_job group e;
-    val task = SYNCHRONIZED "enqueue" (fn () =>
+      | SOME grp => grp);
+    fun enqueue e (minimal, queue) =
       let
-        val (task, minimal) =
-          Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
+        val (result, job) = future_job grp e;
+        val ((task, minimal'), queue') = Task_Queue.enqueue grp deps pri job queue;
+        val future = Future {promised = false, task = task, group = grp, result = result};
+      in (future, (minimal orelse minimal', queue')) end;
+  in
+    SYNCHRONIZED "enqueue" (fn () =>
+      let
+        val (futures, minimal) =
+          Unsynchronized.change_result queue (fn q =>
+            let val (futures, (minimal, q')) = fold_map enqueue es (false, q)
+            in ((futures, minimal), q') end);
         val _ = if minimal then signal work_available else ();
         val _ = scheduler_check ();
-      in task end);
-  in Future {promised = false, task = task, group = group, result = result} end;
+      in futures end)
+  end;
 
-fun fork_group group e = fork_future (SOME group) [] 0 e;
-fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
-fun fork_deps deps e = fork_deps_pri deps 0 e;
-fun fork_pri pri e = fork_deps_pri [] pri e;
-fun fork e = fork_deps [] e;
+fun fork_pri pri e = singleton (bulk {group = NONE, deps = [], pri = pri}) e;
+fun fork e = fork_pri 0 e;
 
 
 (* join *)
@@ -495,7 +498,9 @@
       | NONE => false));
   in
     if extended then Future {promised = false, task = task, group = group, result = result}
-    else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
+    else
+      singleton (bulk {group = SOME group, deps = [task], pri = Task_Queue.pri_of_task task})
+        (fn () => f (join x))
   end;
 
 
--- a/src/Pure/Concurrent/par_list.ML	Mon Jan 31 17:19:23 2011 +0100
+++ b/src/Pure/Concurrent/par_list.ML	Mon Jan 31 21:54:49 2011 +0100
@@ -29,11 +29,11 @@
 fun managed_results f xs =
   if Multithreading.enabled () andalso not (Multithreading.self_critical ()) then
     let
-      val shared_group = Task_Queue.new_group (Future.worker_group ());
-      val results =
-        Future.join_results (map (fn x => Future.fork_group shared_group (fn () => f x)) xs)
-          handle exn =>
-            (if Exn.is_interrupt exn then Future.cancel_group shared_group else (); reraise exn);
+      val group = Task_Queue.new_group (Future.worker_group ());
+      val futures =
+        Future.bulk {group = SOME group, deps = [], pri = 0} (map (fn x => fn () => f x) xs);
+      val results = Future.join_results futures
+        handle exn => (if Exn.is_interrupt exn then Future.cancel_group group else (); reraise exn);
     in results end
   else map (Exn.capture f) xs;
 
--- a/src/Pure/PIDE/document.ML	Mon Jan 31 17:19:23 2011 +0100
+++ b/src/Pure/PIDE/document.ML	Mon Jan 31 21:54:49 2011 +0100
@@ -208,7 +208,7 @@
 
 fun async_state tr st =
   ignore
-    (Future.fork_group (Task_Queue.new_group NONE)
+    (singleton (Future.bulk {group = SOME (Task_Queue.new_group NONE), deps = [], pri = 0})
       (fn () =>
         Toplevel.setmp_thread_position tr
           (fn () => Toplevel.print_state false st) ()));
@@ -337,14 +337,14 @@
       val _ = cancel state;
 
       val execution' = (* FIXME proper node deps *)
-        [Future.fork_pri 1 (fn () =>
+        Future.bulk {group = NONE, deps = [], pri = 1} [fn () =>
           let
             val _ = await_cancellation state;
             val _ =
               node_names_of version |> List.app (fn name =>
                 fold_entries NONE (fn (_, exec) => fn () => force_exec exec)
                     (get_node version name) ());
-          in () end)];
+          in () end];
 
       val _ = await_cancellation state;  (* FIXME async!? *)
 
--- a/src/Pure/Thy/thy_info.ML	Mon Jan 31 17:19:23 2011 +0100
+++ b/src/Pure/Thy/thy_info.ML	Mon Jan 31 21:54:49 2011 +0100
@@ -184,11 +184,13 @@
             val deps = map (`get) (Graph.imm_preds task_graph name);
             fun failed (future, parent) = if can Future.join future then NONE else SOME parent;
 
-            val future = Future.fork_deps (map #1 deps) (fn () =>
-              (case map_filter failed deps of
-                [] => body (map (#1 o Future.join o get) parents)
-              | bad => error (loader_msg
-                  ("failed to load " ^ quote name ^ " (unresolved " ^ commas_quote bad ^ ")") [])));
+            val future =
+              singleton (Future.bulk {group = NONE, deps = map (Future.task_of o #1) deps, pri = 0})
+              (fn () =>
+                (case map_filter failed deps of
+                  [] => body (map (#1 o Future.join o get) parents)
+                | bad => error (loader_msg
+                    ("failed to load " ^ quote name ^ " (unresolved " ^ commas_quote bad ^ ")") [])));
           in Symtab.update (name, future) tab end
       | Finished thy => Symtab.update (name, Future.value (thy, I)) tab));
     val _ =
--- a/src/Pure/proofterm.ML	Mon Jan 31 17:19:23 2011 +0100
+++ b/src/Pure/proofterm.ML	Mon Jan 31 21:54:49 2011 +0100
@@ -1389,8 +1389,11 @@
       if not (Multithreading.enabled ()) then Future.value (postproc (Future.join body))
       else Future.map postproc body
   | fulfill_proof_future thy promises postproc body =
-      Future.fork_deps (body :: map snd promises) (fn () =>
-        postproc (fulfill_norm_proof thy (map (apsnd Future.join) promises) (Future.join body)));
+      singleton
+        (Future.bulk {group = NONE,
+            deps = Future.task_of body :: map (Future.task_of o snd) promises, pri = 0})
+        (fn () =>
+          postproc (fulfill_norm_proof thy (map (apsnd Future.join) promises) (Future.join body)));
 
 
 (***** abstraction over sort constraints *****)