recovered a version of dequeue_towards (cf. bb7b5a5942c7);
Sun, 19 Jul 2009 14:14:25 +0200
changeset 32055 6a46898aa805
parent 32054 db50e76b0046
child 32056 f4b74cbecdaf
recovered a version of dequeue_towards (cf. bb7b5a5942c7); join_results: work only towards explicit dependencies -- otherwise could produce dynamic cycle (not recorded in queue);
--- a/src/Pure/Concurrent/future.ML	Sat Jul 18 22:53:02 2009 +0200
+++ b/src/Pure/Concurrent/future.ML	Sun Jul 19 14:14:25 2009 +0200
@@ -285,21 +285,10 @@
 fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);
-fun join_wait x =
-  if SYNCHRONIZED "join_wait" (fn () => is_finished x orelse (wait (); false))
-  then () else join_wait x;
-fun join_next x = (*requires SYNCHRONIZED*)
-  if is_finished x then NONE
-  else
-    (case change_result queue Task_Queue.dequeue of
-      NONE => (worker_wait (); join_next x)
-    | some => some);
-fun join_loop x =
-  (case SYNCHRONIZED "join" (fn () => join_next x) of
+fun join_deps deps =
+  (case SYNCHRONIZED "join" (fn () => change_result queue (Task_Queue.dequeue_towards deps)) of
     NONE => ()
-  | SOME work => (execute "join" work; join_loop x));
+  | SOME (work, deps') => (execute "join" work; join_deps deps'));
@@ -310,10 +299,16 @@
       val _ = scheduler_check "join check";
       val _ = Multithreading.self_critical () andalso
         error "Cannot join future values within critical section";
-      val _ =
-        if is_some (thread_data ())
-        then join_loop xs   (*proper task -- continue work*)
-        else join_wait xs;  (*alien thread -- refrain from contending for resources*)
+      val is_worker = is_some (thread_data ());
+      fun join_wait x =
+        if SYNCHRONIZED "join_wait" (fn () =>
+          is_finished x orelse (if is_worker then worker_wait () else wait (); false))
+        then () else join_wait x;
+      val _ = if is_worker then join_deps (map task_of xs) else ();
+      val _ = join_wait xs;
     in map get_result xs end) ();
--- a/src/Pure/Concurrent/task_queue.ML	Sat Jul 18 22:53:02 2009 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Sun Jul 19 14:14:25 2009 +0200
@@ -24,6 +24,8 @@
   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue
   val extend: task -> (bool -> bool) -> queue -> queue option
   val dequeue: queue -> (task * group * (bool -> bool) list) option * queue
+  val dequeue_towards: task list -> queue ->
+    (((task * group * (bool -> bool) list) * task list) option * queue)
   val interrupt: queue -> task -> unit
   val interrupt_external: queue -> string -> unit
   val cancel: queue -> group -> bool
@@ -150,6 +152,28 @@
+(* dequeue_towards -- adhoc dependencies *)
+fun dequeue_towards deps (queue as Queue {groups, jobs, ...}) =
+  let
+    fun ready task =
+      (case Task_Graph.get_node jobs task of
+        (group, Job list) =>
+          if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list)
+          else NONE
+      | _ => NONE);
+    val tasks = filter (can (Task_Graph.get_node jobs)) deps;
+  in
+    (case get_first ready (Task_Graph.all_preds jobs tasks) of
+      NONE => (NONE, queue)
+    | SOME (result as (task, _, _)) =>
+        let
+          val jobs' = set_job task (Running (Thread.self ())) jobs;
+          val cache' = Unknown;
+        in (SOME (result, tasks), make_queue groups jobs' cache') end)
+  end;
 (* sporadic interrupts *)
 fun interrupt (Queue {jobs, ...}) task =