added basic task timing;
authorwenzelm
Mon, 31 Jan 2011 16:34:10 +0100
changeset 41670 74010c6af0a4
parent 41669 0820c4ebcad6
child 41671 5ffa2cf4cced
added basic task timing;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/future.ML	Mon Jan 31 15:28:48 2011 +0100
+++ b/src/Pure/Concurrent/future.ML	Mon Jan 31 16:34:10 2011 +0100
@@ -38,6 +38,7 @@
   val worker_task: unit -> Task_Queue.task option
   val worker_group: unit -> Task_Queue.group option
   val worker_subgroup: unit -> Task_Queue.group
+  val worker_waiting: (unit -> 'a) -> 'a
   type 'a future
   val task_of: 'a future -> task
   val group_of: 'a future -> group
@@ -87,6 +88,11 @@
 val worker_group = Option.map #2 o thread_data;
 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
 
+fun worker_waiting e =
+  (case worker_task () of
+    NONE => e ()
+  | SOME task => Task_Queue.waiting task e);
+
 
 (* datatype future *)
 
@@ -198,8 +204,16 @@
 fun execute (task, group, jobs) =
   let
     val valid = not (Task_Queue.is_canceled group);
-    val ok = setmp_thread_data (task, group) (fn () =>
-      fold (fn job => fn ok => job valid andalso ok) jobs true) ();
+    val ok =
+      Task_Queue.running task (fn () =>
+        setmp_thread_data (task, group) (fn () =>
+          fold (fn job => fn ok => job valid andalso ok) jobs true) ());
+    val _ = Multithreading.tracing 1 (fn () =>
+      let
+        val s = Task_Queue.str_of_task task;
+        fun micros time = string_of_int (Time.toNanoseconds time div 1000);
+        val (run, wait) = pairself micros (Task_Queue.timing_of_task task);
+      in "TASK " ^ s ^ " " ^ run ^ " " ^ wait end);
     val _ = SYNCHRONIZED "finish" (fn () =>
       let
         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
@@ -448,10 +462,11 @@
   else if Multithreading.self_critical () then
     error "Cannot join future values within critical section"
   else
-    (case worker_task () of
-      SOME task => join_depend task (map task_of xs)
-    | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
-    map get_result xs);
+    worker_waiting (fn () =>
+      (case worker_task () of
+        SOME task => join_depend task (map task_of xs)
+      | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
+      map get_result xs));
 
 end;
 
@@ -513,7 +528,7 @@
                 Unsynchronized.change_result queue
                   (Task_Queue.dequeue_passive (Thread.self ()) task));
           in if still_passive then execute (task, group, [job]) else () end);
-      val _ = Single_Assignment.await result;
+      val _ = worker_waiting (fn () => Single_Assignment.await result);
     in () end;
 
 fun fulfill x res = fulfill_result x (Exn.Result res);
--- a/src/Pure/Concurrent/task_queue.ML	Mon Jan 31 15:28:48 2011 +0100
+++ b/src/Pure/Concurrent/task_queue.ML	Mon Jan 31 16:34:10 2011 +0100
@@ -10,6 +10,9 @@
   val dummy_task: task
   val pri_of_task: task -> int
   val str_of_task: task -> string
+  val timing_of_task: task -> Time.time * Time.time
+  val running: task -> (unit -> 'a) -> 'a
+  val waiting: task -> (unit -> 'a) -> 'a
   type group
   val new_group: group option -> group
   val group_id: group -> int
@@ -41,18 +44,40 @@
 val new_id = Synchronized.counter ();
 
 
+(* timing *)
+
+type timing = Time.time * Time.time;
+
+fun new_timing () =
+  Synchronized.var "Task_Queue.timing" (Time.zeroTime, Time.zeroTime);
+
+fun gen_timing which timing e =
+  let
+    val start = Time.now ();
+    val result = Exn.capture e ();
+    val t = Time.- (Time.now (), start);
+    val _ = Synchronized.change timing (which (fn t' => Time.+ (t, t')));
+  in Exn.release result end;
+
+
 (* tasks *)
 
-abstype task = Task of int option * int
+abstype task = Task of (int option * int) * timing Synchronized.var
 with
 
-val dummy_task = Task (NONE, ~1);
-fun new_task pri = Task (pri, new_id ());
+val dummy_task = Task ((NONE, ~1), new_timing ());
+fun new_task pri = Task ((pri, new_id ()), new_timing ());
+
+fun pri_of_task (Task ((pri, _), _)) = the_default 0 pri;
+fun str_of_task (Task ((_, i), _)) = string_of_int i;
 
-fun pri_of_task (Task (pri, _)) = the_default 0 pri;
-fun str_of_task (Task (_, i)) = string_of_int i;
+fun timing_of_task (Task (_, timing)) = Synchronized.value timing;
+fun running (Task (_, timing)) = gen_timing apfst timing;
+fun waiting (Task (_, timing)) = gen_timing apsnd timing;
 
-fun task_ord (Task t1, Task t2) = prod_ord (rev_order o option_ord int_ord) int_ord (t1, t2);
+fun task_ord (Task (t1, _), Task (t2, _)) =
+  prod_ord (rev_order o option_ord int_ord) int_ord (t1, t2);
+
 val eq_task = is_equal o task_ord;
 
 end;