Future.join_results: discontinued post-hoc recording of dynamic dependencies;
authorwenzelm
Wed, 02 Feb 2011 13:38:09 +0100
changeset 41681 b5d7b15166bf
parent 41680 a4c822915eaa
child 41682 44a2e0db281f
Future.join_results: discontinued post-hoc recording of dynamic dependencies; abstract Task_Queue.deps; tuned signature; tuned;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/future.ML	Tue Feb 01 22:24:28 2011 +0100
+++ b/src/Pure/Concurrent/future.ML	Wed Feb 02 13:38:09 2011 +0100
@@ -449,12 +449,12 @@
       else res);
 
 fun join_next deps = (*requires SYNCHRONIZED*)
-  if null deps then NONE
+  if Task_Queue.finished_deps deps then NONE
   else
-    (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
-      (NONE, []) => NONE
-    | (NONE, deps') =>
-        (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
+    (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
+      (NONE, deps') =>
+        if Task_Queue.finished_deps deps' then NONE
+        else (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
     | (SOME work, deps') => SOME (work, deps'));
 
 fun execute_work NONE = ()
@@ -462,10 +462,6 @@
 and join_work deps =
   execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
 
-fun join_depend task deps =
-  execute_work (SYNCHRONIZED "join" (fn () =>
-    (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
-
 in
 
 fun join_results xs =
@@ -474,10 +470,9 @@
       if forall is_finished xs then ()
       else if Multithreading.self_critical () then
         error "Cannot join future values within critical section"
-      else
-        (case worker_task () of
-          SOME task => join_depend task (map task_of xs)
-        | NONE => List.app (ignore o Single_Assignment.await o result_of) xs);
+      else if is_some (thread_data ()) then
+        join_work (Task_Queue.init_deps (map task_of xs))
+      else List.app (ignore o Single_Assignment.await o result_of) xs;
   in map get_result xs end;
 
 end;
@@ -544,7 +539,9 @@
                 Unsynchronized.change_result queue
                   (Task_Queue.dequeue_passive (Thread.self ()) task));
           in if still_passive then execute (task, group, [job]) else () end);
-      val _ = worker_waiting [task] (fn () => Single_Assignment.await result);
+      val _ =
+        worker_waiting (Task_Queue.init_deps [task])
+          (fn () => Single_Assignment.await result);
     in () end;
 
 fun fulfill x res = fulfill_result x (Exn.Result res);
--- a/src/Pure/Concurrent/task_queue.ML	Tue Feb 01 22:24:28 2011 +0100
+++ b/src/Pure/Concurrent/task_queue.ML	Wed Feb 02 13:38:09 2011 +0100
@@ -8,12 +8,10 @@
 sig
   type task
   val dummy_task: task
+  val name_of_task: task -> string
   val pri_of_task: task -> int
   val str_of_task: task -> string
   val timing_of_task: task -> Time.time * Time.time * string list
-  val running: task -> (unit -> 'a) -> 'a
-  val joining: task -> (unit -> 'a) -> 'a
-  val waiting: task -> task list -> (unit -> 'a) -> 'a
   type group
   val new_group: group option -> group
   val group_id: group -> int
@@ -28,16 +26,21 @@
   val status: queue -> {ready: int, pending: int, running: int, passive: int}
   val cancel: queue -> group -> bool
   val cancel_all: queue -> group list
+  val finish: task -> queue -> bool * queue
   val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
   val enqueue: string -> group -> task list -> int -> (bool -> bool) ->
     queue -> (task * bool) * queue
   val extend: task -> (bool -> bool) -> queue -> queue option
   val dequeue_passive: Thread.thread -> task -> queue -> bool * queue
   val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
-  val depend: task -> task list -> queue -> queue
-  val dequeue_towards: Thread.thread -> task list -> queue ->
-    (((task * group * (bool -> bool) list) option * task list) * queue)
-  val finish: task -> queue -> bool * queue
+  type deps
+  val init_deps: task list -> deps
+  val finished_deps: deps -> bool
+  val dequeue_deps: Thread.thread -> deps -> queue ->
+    (((task * group * (bool -> bool) list) option * deps) * queue)
+  val running: task -> (unit -> 'a) -> 'a
+  val joining: task -> (unit -> 'a) -> 'a
+  val waiting: task -> deps -> (unit -> 'a) -> 'a
 end;
 
 structure Task_Queue: TASK_QUEUE =
@@ -46,24 +49,15 @@
 val new_id = Synchronized.counter ();
 
 
-(* timing *)
+(** grouped tasks **)
 
-type timing = Time.time * Time.time * string list;
+(* tasks *)
+
+type timing = Time.time * Time.time * string list;  (*run, wait, wait dependencies*)
 
 fun new_timing () =
   Synchronized.var "timing" ((Time.zeroTime, Time.zeroTime, []): timing);
 
-fun gen_timing account timing e =
-  let
-    val start = Time.now ();
-    val result = Exn.capture e ();
-    val t = Time.- (Time.now (), start);
-    val _ = Synchronized.change timing (account t);
-  in Exn.release result end;
-
-
-(* tasks *)
-
 abstype task = Task of
  {name: string,
   id: int,
@@ -74,21 +68,20 @@
 val dummy_task = Task {name = "", id = ~1, pri = NONE, timing = new_timing ()};
 fun new_task name pri = Task {name = name, id = new_id (), pri = pri, timing = new_timing ()};
 
+fun name_of_task (Task {name, ...}) = name;
 fun pri_of_task (Task {pri, ...}) = the_default 0 pri;
 fun str_of_task (Task {name, id, ...}) =
   if name = "" then string_of_int id else string_of_int id ^ " (" ^ name ^ ")";
 
 fun timing_of_task (Task {timing, ...}) = Synchronized.value timing;
 
-fun running (Task {timing, ...}) =
-  gen_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) timing;
-
-fun joining (Task {timing, ...}) =
-  gen_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) timing;
-
-fun waiting (Task {timing, ...}) deps =
-  timing |> gen_timing (fn t => fn (a, b, ds) =>
-    (Time.- (a, t), Time.+ (b, t), fold (fn Task {name, ...} => insert (op =) name) deps ds));
+fun update_timing update (Task {timing, ...}) e =
+  let
+    val start = Time.now ();
+    val result = Exn.capture e ();
+    val t = Time.- (Time.now (), start);
+    val _ = Synchronized.change timing (update t);
+  in Exn.release result end;
 
 fun task_ord (Task {id = id1, pri = pri1, ...}, Task {id = id2, pri = pri2, ...}) =
   prod_ord (rev_order o option_ord int_ord) int_ord ((pri1, id1), (pri2, id2));
@@ -141,6 +134,8 @@
 end;
 
 
+(** queue of jobs and groups **)
+
 (* jobs *)
 
 datatype job =
@@ -157,16 +152,11 @@
 fun add_job task dep (jobs: jobs) =
   Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
 
-fun add_dep task dep (jobs: jobs) =
-  if Task_Graph.is_edge jobs (task, dep) then
-    raise Fail "Cyclic dependency of future tasks"
-  else add_job task dep jobs;
-
 fun get_deps (jobs: jobs) task =
   Task_Graph.imm_preds jobs task handle Task_Graph.UNDEF _ => [];
 
 
-(* queue of grouped jobs *)
+(* queue *)
 
 datatype queue = Queue of
  {groups: task list Inttab.table,   (*groups with presently active members*)
@@ -207,6 +197,9 @@
   in {ready = x, pending = y, running = z, passive = w} end;
 
 
+
+(** task queue operations **)
+
 (* cancel -- peers and sub-groups *)
 
 fun cancel (Queue {groups, jobs}) group =
@@ -230,6 +223,18 @@
   in running_groups end;
 
 
+(* finish *)
+
+fun finish task (Queue {groups, jobs}) =
+  let
+    val group = get_group jobs task;
+    val groups' = groups
+      |> fold (fn gid => Inttab.remove_list eq_task (gid, task)) (group_ancestry group);
+    val jobs' = Task_Graph.del_node task jobs;
+    val maximal = null (Task_Graph.imm_succs jobs task);
+  in (maximal, make_queue groups' jobs') end;
+
+
 (* enqueue *)
 
 fun enqueue_passive group abort (Queue {groups, jobs}) =
@@ -275,37 +280,44 @@
   | NONE => (NONE, queue));
 
 
-(* dequeue_towards -- adhoc dependencies *)
+(* dequeue wrt. dynamic dependencies *)
+
+abstype deps = Deps of task list
+with
 
-fun depend task deps (Queue {groups, jobs}) =
-  make_queue groups (fold (add_dep task) deps jobs);
+fun init_deps tasks = Deps tasks;
+fun finished_deps (Deps tasks) = null tasks;
 
-fun dequeue_towards thread deps (queue as Queue {groups, jobs}) =
+fun insert_deps (Deps tasks) = fold (insert (op =) o name_of_task) tasks;
+
+fun dequeue_deps thread (Deps deps) (queue as Queue {groups, jobs}) =
   let
     fun ready task = ready_job task (Task_Graph.get_entry jobs task);
     val tasks = filter (can (Task_Graph.get_node jobs)) deps;
     fun result (res as (task, _, _)) =
       let val jobs' = set_job task (Running thread) jobs
-      in ((SOME res, tasks), make_queue groups jobs') end;
+      in ((SOME res, Deps tasks), make_queue groups jobs') end;
   in
     (case get_first ready tasks of
       SOME res => result res
     | NONE =>
         (case get_first (get_first ready o get_deps jobs) tasks of
           SOME res => result res
-        | NONE => ((NONE, tasks), queue)))
+        | NONE => ((NONE, Deps tasks), queue)))
   end;
 
+end;
+
 
-(* finish *)
+(* timing *)
+
+fun running task =
+  update_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) task;
 
-fun finish task (Queue {groups, jobs}) =
-  let
-    val group = get_group jobs task;
-    val groups' = groups
-      |> fold (fn gid => Inttab.remove_list eq_task (gid, task)) (group_ancestry group);
-    val jobs' = Task_Graph.del_node task jobs;
-    val maximal = null (Task_Graph.imm_succs jobs task);
-  in (maximal, make_queue groups' jobs') end;
+fun joining task =
+  update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) task;
+
+fun waiting task deps =
+  update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), Time.+ (b, t), insert_deps deps ds)) task;
 
 end;