added Task_Queue.depend (again) -- light-weight version for transitive graph;
authorwenzelm
Thu, 01 Oct 2009 16:27:13 +0200
changeset 32814 81897d30b97f
parent 32813 dac196e23093
child 32815 1a5e364584ae
added Task_Queue.depend (again) -- light-weight version for transitive graph; Future.join_results: record explicit dependency, detect direct task-task join cycles; Future.join_results: no change of interruptibility, allows to interrupt wait; added Future.worker_task; ThyInfo.schedule_futures: uninterruptible outer join;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
src/Pure/Thy/thy_info.ML
--- a/src/Pure/Concurrent/future.ML	Thu Oct 01 16:09:47 2009 +0200
+++ b/src/Pure/Concurrent/future.ML	Thu Oct 01 16:27:13 2009 +0200
@@ -30,6 +30,7 @@
   type task = Task_Queue.task
   type group = Task_Queue.group
   val is_worker: unit -> bool
+  val worker_task: unit -> Task_Queue.task option
   val worker_group: unit -> Task_Queue.group option
   type 'a future
   val task_of: 'a future -> task
@@ -71,6 +72,7 @@
 end;
 
 val is_worker = is_some o thread_data;
+val worker_task = Option.map #2 o thread_data;
 val worker_group = Option.map #3 o thread_data;
 
 
@@ -347,7 +349,8 @@
   | SOME res => res);
 
 fun join_wait x =
-  Synchronized.guarded_access (result_of x) (fn NONE => NONE | some => SOME ((), some));
+  Synchronized.guarded_access (result_of x)
+    (fn NONE => NONE | some => SOME ((), some));
 
 fun join_next deps = (*requires SYNCHRONIZED*)
   if null deps then NONE
@@ -357,10 +360,14 @@
     | (NONE, deps') => (worker_wait work_finished; join_next deps')
     | (SOME work, deps') => SOME (work, deps'));
 
-fun join_work deps =
-  (case SYNCHRONIZED "join" (fn () => join_next deps) of
-    NONE => ()
-  | SOME (work, deps') => (execute "join" work; join_work deps'));
+fun execute_work NONE = ()
+  | execute_work (SOME (work, deps')) = (execute "join" work; join_work deps')
+and join_work deps =
+  execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
+
+fun join_depend task deps =
+  execute_work (SYNCHRONIZED "join" (fn () =>
+    (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
 
 in
 
@@ -368,11 +375,11 @@
   if forall is_finished xs then map get_result xs
   else if Multithreading.self_critical () then
     error "Cannot join future values within critical section"
-  else uninterruptible (fn _ => fn () =>
-     (if is_worker ()
-      then join_work (map task_of xs)
-      else List.app join_wait xs;
-      map get_result xs)) ();
+  else
+    (case worker_task () of
+      SOME task => join_depend task (map task_of xs)
+    | NONE => List.app join_wait xs;
+    map get_result xs);
 
 end;
 
--- a/src/Pure/Concurrent/task_queue.ML	Thu Oct 01 16:09:47 2009 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Thu Oct 01 16:27:13 2009 +0200
@@ -27,6 +27,7 @@
   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
   val extend: task -> (bool -> bool) -> queue -> queue option
   val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
+  val depend: task -> task list -> queue -> queue
   val dequeue_towards: Thread.thread -> task list -> queue ->
     (((task * group * (bool -> bool) list) option * task list) * queue)
   val finish: task -> queue -> bool * queue
@@ -101,6 +102,11 @@
 fun add_job task dep (jobs: jobs) =
   Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
 
+fun add_dep task dep (jobs: jobs) =
+  if Task_Graph.is_edge jobs (task, dep) then
+    raise Fail "Cyclic dependency of future tasks"
+  else add_job task dep jobs;
+
 fun get_deps (jobs: jobs) task =
   Task_Graph.imm_preds jobs task handle Task_Graph.UNDEF _ => [];
 
@@ -205,6 +211,9 @@
 
 (* dequeue_towards -- adhoc dependencies *)
 
+fun depend task deps (Queue {groups, jobs, ...}) =
+  make_queue groups (fold (add_dep task) deps jobs) Unknown;
+
 fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) =
   let
     fun ready task =
--- a/src/Pure/Thy/thy_info.ML	Thu Oct 01 16:09:47 2009 +0200
+++ b/src/Pure/Thy/thy_info.ML	Thu Oct 01 16:27:13 2009 +0200
@@ -364,7 +364,7 @@
 
 local
 
-fun schedule_futures task_graph =
+fun schedule_futures task_graph = uninterruptible (fn _ => fn () =>
   let
     val tasks = Graph.topological_order task_graph |> map_filter (fn name =>
       (case Graph.get_node task_graph name of Task body => SOME (name, body) | _ => NONE));
@@ -397,7 +397,7 @@
         val _ = after_load ();
       in [] end handle exn => (kill_thy name; [exn]));
 
-  in ignore (Exn.release_all (map Exn.Exn (rev exns))) end;
+  in ignore (Exn.release_all (map Exn.Exn (rev exns))) end) ();
 
 fun schedule_seq tasks =
   Graph.topological_order tasks