support named tasks, for improved tracing;
authorwenzelm
Mon, 31 Jan 2011 22:57:01 +0100
changeset 41673 1c191a39549f
parent 41672 2f70b1ddd09f
child 41674 7da257539a8d
support named tasks, for improved tracing;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/par_list.ML
src/Pure/Concurrent/task_queue.ML
src/Pure/Isar/toplevel.ML
src/Pure/PIDE/document.ML
src/Pure/Thy/thy_info.ML
src/Pure/goal.ML
src/Pure/proofterm.ML
--- a/src/Pure/Concurrent/future.ML	Mon Jan 31 21:54:49 2011 +0100
+++ b/src/Pure/Concurrent/future.ML	Mon Jan 31 22:57:01 2011 +0100
@@ -44,7 +44,8 @@
   val group_of: 'a future -> group
   val peek: 'a future -> 'a Exn.result option
   val is_finished: 'a future -> bool
-  val bulk: {group: group option, deps: task list, pri: int} -> (unit -> 'a) list -> 'a future list
+  val bulk: {name: string, group: group option, deps: task list, pri: int} ->
+    (unit -> 'a) list -> 'a future list
   val fork_pri: int -> (unit -> 'a) -> 'a future
   val fork: (unit -> 'a) -> 'a future
   val join_results: 'a future list -> 'a Exn.result list
@@ -399,7 +400,7 @@
 
 (* fork *)
 
-fun bulk {group, deps, pri} es =
+fun bulk {name, group, deps, pri} es =
   let
     val grp =
       (case group of
@@ -408,7 +409,7 @@
     fun enqueue e (minimal, queue) =
       let
         val (result, job) = future_job grp e;
-        val ((task, minimal'), queue') = Task_Queue.enqueue grp deps pri job queue;
+        val ((task, minimal'), queue') = Task_Queue.enqueue name grp deps pri job queue;
         val future = Future {promised = false, task = task, group = grp, result = result};
       in (future, (minimal orelse minimal', queue')) end;
   in
@@ -423,7 +424,7 @@
       in futures end)
   end;
 
-fun fork_pri pri e = singleton (bulk {group = NONE, deps = [], pri = pri}) e;
+fun fork_pri pri e = singleton (bulk {name = "", group = NONE, deps = [], pri = pri}) e;
 fun fork e = fork_pri 0 e;
 
 
@@ -499,7 +500,9 @@
   in
     if extended then Future {promised = false, task = task, group = group, result = result}
     else
-      singleton (bulk {group = SOME group, deps = [task], pri = Task_Queue.pri_of_task task})
+      singleton
+        (bulk {name = "Future.map", group = SOME group,
+          deps = [task], pri = Task_Queue.pri_of_task task})
         (fn () => f (join x))
   end;
 
--- a/src/Pure/Concurrent/par_list.ML	Mon Jan 31 21:54:49 2011 +0100
+++ b/src/Pure/Concurrent/par_list.ML	Mon Jan 31 22:57:01 2011 +0100
@@ -26,18 +26,19 @@
 structure Par_List: PAR_LIST =
 struct
 
-fun managed_results f xs =
+fun managed_results name f xs =
   if Multithreading.enabled () andalso not (Multithreading.self_critical ()) then
     let
       val group = Task_Queue.new_group (Future.worker_group ());
       val futures =
-        Future.bulk {group = SOME group, deps = [], pri = 0} (map (fn x => fn () => f x) xs);
+        Future.bulk {name = name, group = SOME group, deps = [], pri = 0}
+          (map (fn x => fn () => f x) xs);
       val results = Future.join_results futures
         handle exn => (if Exn.is_interrupt exn then Future.cancel_group group else (); reraise exn);
     in results end
   else map (Exn.capture f) xs;
 
-fun map f xs = Exn.release_first (managed_results f xs);
+fun map f xs = Exn.release_first (managed_results "Par_List.map" f xs);
 
 fun get_some f xs =
   let
@@ -45,7 +46,8 @@
     fun found (Exn.Exn (FOUND some)) = some
       | found _ = NONE;
     val results =
-      managed_results (fn x => (case f x of NONE => () | some => raise FOUND some)) xs;
+      managed_results "Par_List.get_some"
+        (fn x => (case f x of NONE => () | some => raise FOUND some)) xs;
   in
     (case get_first found results of
       SOME y => SOME y
--- a/src/Pure/Concurrent/task_queue.ML	Mon Jan 31 21:54:49 2011 +0100
+++ b/src/Pure/Concurrent/task_queue.ML	Mon Jan 31 22:57:01 2011 +0100
@@ -28,7 +28,8 @@
   val cancel: queue -> group -> bool
   val cancel_all: queue -> group list
   val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
-  val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
+  val enqueue: string -> group -> task list -> int -> (bool -> bool) ->
+    queue -> (task * bool) * queue
   val extend: task -> (bool -> bool) -> queue -> queue option
   val dequeue_passive: Thread.thread -> task -> queue -> bool * queue
   val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
@@ -62,21 +63,26 @@
 
 (* tasks *)
 
-abstype task = Task of (int option * int) * timing Synchronized.var
+abstype task = Task of
+ {name: string,
+  id: int,
+  pri: int option,
+  timing: timing Synchronized.var}
 with
 
-val dummy_task = Task ((NONE, ~1), new_timing ());
-fun new_task pri = Task ((pri, new_id ()), new_timing ());
+val dummy_task = Task {name = "", id = ~1, pri = NONE, timing = new_timing ()};
+fun new_task name pri = Task {name = name, id = new_id (), pri = pri, timing = new_timing ()};
 
-fun pri_of_task (Task ((pri, _), _)) = the_default 0 pri;
-fun str_of_task (Task ((_, i), _)) = string_of_int i;
+fun pri_of_task (Task {pri, ...}) = the_default 0 pri;
+fun str_of_task (Task {name, id, ...}) =
+  if name = "" then string_of_int id else string_of_int id ^ " (" ^ name ^ ")";
 
-fun timing_of_task (Task (_, timing)) = Synchronized.value timing;
-fun running (Task (_, timing)) = gen_timing apfst timing;
-fun waiting (Task (_, timing)) = gen_timing apsnd timing;
+fun timing_of_task (Task {timing, ...}) = Synchronized.value timing;
+fun running (Task {timing, ...}) = gen_timing apfst timing;
+fun waiting (Task {timing, ...}) = gen_timing apsnd timing;
 
-fun task_ord (Task (t1, _), Task (t2, _)) =
-  prod_ord (rev_order o option_ord int_ord) int_ord (t1, t2);
+fun task_ord (Task {id = id1, pri = pri1, ...}, Task {id = id2, pri = pri2, ...}) =
+  prod_ord (rev_order o option_ord int_ord) int_ord ((pri1, id1), (pri2, id2));
 
 val eq_task = is_equal o task_ord;
 
@@ -219,15 +225,15 @@
 
 fun enqueue_passive group abort (Queue {groups, jobs}) =
   let
-    val task = new_task NONE;
+    val task = new_task "" NONE;
     val groups' = groups
       |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
     val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive abort));
   in (task, make_queue groups' jobs') end;
 
-fun enqueue group deps pri job (Queue {groups, jobs}) =
+fun enqueue name group deps pri job (Queue {groups, jobs}) =
   let
-    val task = new_task (SOME pri);
+    val task = new_task name (SOME pri);
     val groups' = groups
       |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
     val jobs' = jobs
--- a/src/Pure/Isar/toplevel.ML	Mon Jan 31 21:54:49 2011 +0100
+++ b/src/Pure/Isar/toplevel.ML	Mon Jan 31 22:57:01 2011 +0100
@@ -663,13 +663,15 @@
 
         val future_proof = Proof.global_future_proof
           (fn prf =>
-            Future.fork_pri ~1 (fn () =>
-              let val (states, result_state) =
-                (case st' of State (SOME (Proof (_, (_, orig_gthy))), prev)
-                  => State (SOME (Proof (Proof_Node.init prf, (finish, orig_gthy))), prev))
-                |> fold_map command_result body_trs
-                ||> command (end_tr |> set_print false);
-              in (states, presentation_context_of result_state) end))
+            singleton
+              (Future.bulk {name = "Toplevel.proof_result", group = NONE, deps = [], pri = ~1})
+              (fn () =>
+                let val (states, result_state) =
+                  (case st' of State (SOME (Proof (_, (_, orig_gthy))), prev)
+                    => State (SOME (Proof (Proof_Node.init prf, (finish, orig_gthy))), prev))
+                  |> fold_map command_result body_trs
+                  ||> command (end_tr |> set_print false);
+                in (states, presentation_context_of result_state) end))
           #> (fn (states, ctxt) => States.put (SOME states) ctxt);
 
         val st'' = st' |> command (end_tr |> reset_trans |> end_proof (K future_proof));
--- a/src/Pure/PIDE/document.ML	Mon Jan 31 21:54:49 2011 +0100
+++ b/src/Pure/PIDE/document.ML	Mon Jan 31 22:57:01 2011 +0100
@@ -208,7 +208,9 @@
 
 fun async_state tr st =
   ignore
-    (singleton (Future.bulk {group = SOME (Task_Queue.new_group NONE), deps = [], pri = 0})
+    (singleton
+      (Future.bulk {name = "Document.async_state",
+        group = SOME (Task_Queue.new_group NONE), deps = [], pri = 0})
       (fn () =>
         Toplevel.setmp_thread_position tr
           (fn () => Toplevel.print_state false st) ()));
@@ -327,7 +329,7 @@
 (* execute *)
 
 fun execute version_id state =
-  state |> map_state (fn (versions, commands, execs, execution) =>
+  state |> map_state (fn (versions, commands, execs, _) =>
     let
       val version = the_version state version_id;
 
@@ -337,14 +339,15 @@
       val _ = cancel state;
 
       val execution' = (* FIXME proper node deps *)
-        Future.bulk {group = NONE, deps = [], pri = 1} [fn () =>
-          let
-            val _ = await_cancellation state;
-            val _ =
-              node_names_of version |> List.app (fn name =>
-                fold_entries NONE (fn (_, exec) => fn () => force_exec exec)
-                    (get_node version name) ());
-          in () end];
+        Future.bulk {name = "Document.execute", group = NONE, deps = [], pri = 1}
+          [fn () =>
+            let
+              val _ = await_cancellation state;
+              val _ =
+                node_names_of version |> List.app (fn name =>
+                  fold_entries NONE (fn (_, exec) => fn () => force_exec exec)
+                      (get_node version name) ());
+            in () end];
 
       val _ = await_cancellation state;  (* FIXME async!? *)
 
--- a/src/Pure/Thy/thy_info.ML	Mon Jan 31 21:54:49 2011 +0100
+++ b/src/Pure/Thy/thy_info.ML	Mon Jan 31 22:57:01 2011 +0100
@@ -182,15 +182,17 @@
           let
             val get = the o Symtab.lookup tab;
             val deps = map (`get) (Graph.imm_preds task_graph name);
+            val task_deps = map (Future.task_of o #1) deps;
             fun failed (future, parent) = if can Future.join future then NONE else SOME parent;
 
             val future =
-              singleton (Future.bulk {group = NONE, deps = map (Future.task_of o #1) deps, pri = 0})
-              (fn () =>
-                (case map_filter failed deps of
-                  [] => body (map (#1 o Future.join o get) parents)
-                | bad => error (loader_msg
-                    ("failed to load " ^ quote name ^ " (unresolved " ^ commas_quote bad ^ ")") [])));
+              singleton
+                (Future.bulk {name = "theory " ^ name, group = NONE, deps = task_deps, pri = 0})
+                (fn () =>
+                  (case map_filter failed deps of
+                    [] => body (map (#1 o Future.join o get) parents)
+                  | bad => error (loader_msg ("failed to load " ^ quote name ^
+                      " (unresolved " ^ commas_quote bad ^ ")") [])));
           in Symtab.update (name, future) tab end
       | Finished thy => Symtab.update (name, Future.value (thy, I)) tab));
     val _ =
--- a/src/Pure/goal.ML	Mon Jan 31 21:54:49 2011 +0100
+++ b/src/Pure/goal.ML	Mon Jan 31 22:57:01 2011 +0100
@@ -106,7 +106,9 @@
 
 (* parallel proofs *)
 
-fun fork e = Future.fork_pri ~1 (fn () => Future.status e);
+fun fork e =
+  singleton (Future.bulk {name = "Goal.fork", group = NONE, deps = [], pri = ~1})
+    (fn () => Future.status e);
 
 val parallel_proofs = Unsynchronized.ref 1;
 
--- a/src/Pure/proofterm.ML	Mon Jan 31 21:54:49 2011 +0100
+++ b/src/Pure/proofterm.ML	Mon Jan 31 22:57:01 2011 +0100
@@ -1390,7 +1390,7 @@
       else Future.map postproc body
   | fulfill_proof_future thy promises postproc body =
       singleton
-        (Future.bulk {group = NONE,
+        (Future.bulk {name = "Proofterm.fulfill_proof_future", group = NONE,
             deps = Future.task_of body :: map (Future.task_of o snd) promises, pri = 0})
         (fn () =>
           postproc (fulfill_norm_proof thy (map (apsnd Future.join) promises) (Future.join body)));
@@ -1446,7 +1446,10 @@
     val body0 =
       if ! proofs <> 2 then Future.value (make_body0 MinProof)
       else if not (Multithreading.enabled ()) then Future.value (make_body0 (full_proof0 ()))
-      else Future.fork_pri ~1 (make_body0 o full_proof0);
+      else
+        singleton
+          (Future.bulk {name = "Proofterm.prepare_thm_proof", group = NONE, deps = [], pri = ~1})
+          (make_body0 o full_proof0);
 
     fun new_prf () = (serial (), fulfill_proof_future thy promises postproc body0);
     val (i, body') =