# HG changeset patch # User wenzelm # Date 1296488050 -3600 # Node ID 74010c6af0a48cb782bd3b165df1db5b5632017f # Parent 0820c4ebcad6969c0ef76fa85a36c0c3c06652f5 added basic task timing; diff -r 0820c4ebcad6 -r 74010c6af0a4 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Mon Jan 31 15:28:48 2011 +0100 +++ b/src/Pure/Concurrent/future.ML Mon Jan 31 16:34:10 2011 +0100 @@ -38,6 +38,7 @@ val worker_task: unit -> Task_Queue.task option val worker_group: unit -> Task_Queue.group option val worker_subgroup: unit -> Task_Queue.group + val worker_waiting: (unit -> 'a) -> 'a type 'a future val task_of: 'a future -> task val group_of: 'a future -> group @@ -87,6 +88,11 @@ val worker_group = Option.map #2 o thread_data; fun worker_subgroup () = Task_Queue.new_group (worker_group ()); +fun worker_waiting e = + (case worker_task () of + NONE => e () + | SOME task => Task_Queue.waiting task e); + (* datatype future *) @@ -198,8 +204,16 @@ fun execute (task, group, jobs) = let val valid = not (Task_Queue.is_canceled group); - val ok = setmp_thread_data (task, group) (fn () => - fold (fn job => fn ok => job valid andalso ok) jobs true) (); + val ok = + Task_Queue.running task (fn () => + setmp_thread_data (task, group) (fn () => + fold (fn job => fn ok => job valid andalso ok) jobs true) ()); + val _ = Multithreading.tracing 1 (fn () => + let + val s = Task_Queue.str_of_task task; + fun micros time = string_of_int (Time.toNanoseconds time div 1000); + val (run, wait) = pairself micros (Task_Queue.timing_of_task task); + in "TASK " ^ s ^ " " ^ run ^ " " ^ wait end); val _ = SYNCHRONIZED "finish" (fn () => let val maximal = Unsynchronized.change_result queue (Task_Queue.finish task); @@ -448,10 +462,11 @@ else if Multithreading.self_critical () then error "Cannot join future values within critical section" else - (case worker_task () of - SOME task => join_depend task (map task_of xs) - | NONE => List.app (ignore o Single_Assignment.await o result_of) xs; - map get_result xs); + worker_waiting (fn () => + (case worker_task () of + SOME task => join_depend task (map task_of xs) + | NONE => List.app (ignore o Single_Assignment.await o result_of) xs; + map get_result xs)); end; @@ -513,7 +528,7 @@ Unsynchronized.change_result queue (Task_Queue.dequeue_passive (Thread.self ()) task)); in if still_passive then execute (task, group, [job]) else () end); - val _ = Single_Assignment.await result; + val _ = worker_waiting (fn () => Single_Assignment.await result); in () end; fun fulfill x res = fulfill_result x (Exn.Result res); diff -r 0820c4ebcad6 -r 74010c6af0a4 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Mon Jan 31 15:28:48 2011 +0100 +++ b/src/Pure/Concurrent/task_queue.ML Mon Jan 31 16:34:10 2011 +0100 @@ -10,6 +10,9 @@ val dummy_task: task val pri_of_task: task -> int val str_of_task: task -> string + val timing_of_task: task -> Time.time * Time.time + val running: task -> (unit -> 'a) -> 'a + val waiting: task -> (unit -> 'a) -> 'a type group val new_group: group option -> group val group_id: group -> int @@ -41,18 +44,40 @@ val new_id = Synchronized.counter (); +(* timing *) + +type timing = Time.time * Time.time; + +fun new_timing () = + Synchronized.var "Task_Queue.timing" (Time.zeroTime, Time.zeroTime); + +fun gen_timing which timing e = + let + val start = Time.now (); + val result = Exn.capture e (); + val t = Time.- (Time.now (), start); + val _ = Synchronized.change timing (which (fn t' => Time.+ (t, t'))); + in Exn.release result end; + + (* tasks *) -abstype task = Task of int option * int +abstype task = Task of (int option * int) * timing Synchronized.var with -val dummy_task = Task (NONE, ~1); -fun new_task pri = Task (pri, new_id ()); +val dummy_task = Task ((NONE, ~1), new_timing ()); +fun new_task pri = Task ((pri, new_id ()), new_timing ()); + +fun pri_of_task (Task ((pri, _), _)) = the_default 0 pri; +fun str_of_task (Task ((_, i), _)) = string_of_int i; -fun pri_of_task (Task (pri, _)) = the_default 0 pri; -fun str_of_task (Task (_, i)) = string_of_int i; +fun timing_of_task (Task (_, timing)) = Synchronized.value timing; +fun running (Task (_, timing)) = gen_timing apfst timing; +fun waiting (Task (_, timing)) = gen_timing apsnd timing; -fun task_ord (Task t1, Task t2) = prod_ord (rev_order o option_ord int_ord) int_ord (t1, t2); +fun task_ord (Task (t1, _), Task (t2, _)) = + prod_ord (rev_order o option_ord int_ord) int_ord (t1, t2); + val eq_task = is_equal o task_ord; end;