simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
authorwenzelm
Sat, 13 Jun 2009 19:40:37 +0200
changeset 31617 bb7b5a5942c7
parent 31616 63893e3a50a6
child 31618 2e4430b84303
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures); more efficient Task_Queue.dequeue, with internal cache (reduces wast of cycles with many idle workers); removed complicated/expensive Task_Queue.dequeue_towards;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/future.ML	Sat Jun 13 19:19:14 2009 +0200
+++ b/src/Pure/Concurrent/future.ML	Sat Jun 13 19:40:37 2009 +0200
@@ -286,7 +286,7 @@
 fun join_loop name pending =
   (case SYNCHRONIZED name (fn () => join_next pending) of
     NONE => ()
-  | SOME work => (execute name work; join_loop name pending));
+  | SOME work => (execute name work; join_loop name (filter_out is_finished pending)));
 
 in
 
@@ -298,13 +298,6 @@
       val _ = Multithreading.self_critical () andalso
         error "Cannot join future values within critical section";
 
-      fun join_deps _ [] = ()
-        | join_deps name deps =
-            (case SYNCHRONIZED name (fn () =>
-                change_result queue (Task_Queue.dequeue_towards deps)) of
-              NONE => ()
-            | SOME (work, deps') => (execute name work; join_deps name deps'));
-
       val _ =
         (case thread_data () of
           NONE =>
@@ -312,14 +305,13 @@
             while not (forall is_finished xs)
             do SYNCHRONIZED "join_thread" (fn () => wait ())
         | SOME (name, task) =>
-            (*proper task -- actively work towards results*)
+            (*proper task -- continue work*)
             let
               val pending = filter_out is_finished xs;
               val deps = map task_of pending;
               val _ = SYNCHRONIZED "join" (fn () =>
                 (change queue (Task_Queue.depend deps task); notify_all ()));
-              val _ = join_deps ("join_deps: " ^ name) deps;
-              val _ = join_loop ("join_loop: " ^ name) (filter_out is_finished pending);
+              val _ = join_loop ("join_loop: " ^ name) pending;
             in () end);
 
     in map get_result xs end) ();
--- a/src/Pure/Concurrent/task_queue.ML	Sat Jun 13 19:19:14 2009 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Sat Jun 13 19:40:37 2009 +0200
@@ -23,8 +23,6 @@
   val extend: task -> (bool -> bool) -> queue -> queue option
   val depend: task list -> task -> queue -> queue
   val dequeue: queue -> (task * group * (bool -> bool) list) option * queue
-  val dequeue_towards: task list -> queue ->
-    (((task * group * (bool -> bool) list) * task list) option * queue)
   val interrupt: queue -> task -> unit
   val interrupt_external: queue -> string -> unit
   val cancel: queue -> group -> bool
@@ -82,69 +80,59 @@
 
 (* queue of grouped jobs *)
 
+datatype result = Unknown | Result of task | No_Result;
+
 datatype queue = Queue of
  {groups: task list Inttab.table,   (*groups with presently active members*)
-  jobs: jobs};                      (*job dependency graph*)
+  jobs: jobs,                       (*job dependency graph*)
+  cache: result};                   (*last dequeue result*)
 
-fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
+fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache};
 
-val empty = make_queue Inttab.empty Task_Graph.empty;
+val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
 fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
 
 
 (* enqueue *)
 
-fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs}) =
+fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, ...}) =
   let
     val task = new_task pri;
     val groups' = Inttab.cons_list (gid, task) groups;
     val jobs' = jobs
       |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps;
-  in (task, make_queue groups' jobs') end;
+  in (task, make_queue groups' jobs' Unknown) end;
 
-fun extend task job (Queue {groups, jobs}) =
+fun extend task job (Queue {groups, jobs, cache}) =
   (case try (get_job jobs) task of
-    SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs))
+    SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs) cache)
   | _ => NONE);
 
-fun depend deps task (Queue {groups, jobs}) =
-  make_queue groups (fold (add_job_acyclic task) deps jobs);
+fun depend deps task (Queue {groups, jobs, ...}) =
+  make_queue groups (fold (add_job_acyclic task) deps jobs) Unknown;
 
 
 (* dequeue *)
 
-local
-
-fun dequeue_result NONE queue = (NONE, queue)
-  | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) =
-      (SOME result, make_queue groups (set_job task (Running (Thread.self ())) jobs));
-
-in
-
-fun dequeue (queue as Queue {jobs, ...}) =
+fun dequeue (queue as Queue {groups, jobs, cache}) =
   let
     fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
       | ready _ = NONE;
-  in dequeue_result (Task_Graph.get_first ready jobs) queue end;
-
-fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
-  let
-    val tasks' = filter (can (Task_Graph.get_node jobs)) tasks;
-
-    fun ready task =
-      (case Task_Graph.get_node jobs task of
-        (group, Job list) =>
-          if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list)
-          else NONE
-      | _ => NONE);
+    fun deq boundary =
+      (case Task_Graph.get_first boundary ready jobs of
+        NONE => (NONE, make_queue groups jobs No_Result)
+      | SOME (result as (task, _, _)) =>
+          let
+            val jobs' = set_job task (Running (Thread.self ())) jobs;
+            val cache' = Result task;
+          in (SOME result, make_queue groups jobs' cache') end);
   in
-    (case dequeue_result (get_first ready (Task_Graph.all_preds jobs tasks')) queue of
-      (NONE, queue') => (NONE, queue')
-    | (SOME work, queue') => (SOME (work, tasks'), queue'))
+    (case cache of
+      Unknown => deq NONE
+    | Result last => deq (SOME last)
+    | No_Result => (NONE, queue))
   end;
 
-end;
-
 
 (* sporadic interrupts *)
 
@@ -154,7 +142,7 @@
 fun interrupt_external (queue as Queue {jobs, ...}) str =
   (case Int.fromString str of
     SOME i =>
-      (case Task_Graph.get_first
+      (case Task_Graph.get_first NONE
           (fn (task as Task (_, j), _) => if i = j then SOME task else NONE) jobs
         of SOME task => interrupt queue task | NONE => ())
   | NONE => ());
@@ -180,11 +168,11 @@
     val _ = List.app SimpleThread.interrupt running;
   in groups end;
 
-fun finish task (Queue {groups, jobs}) =
+fun finish task (Queue {groups, jobs, ...}) =
   let
     val Group (gid, _) = get_group jobs task;
     val groups' = Inttab.remove_list (op =) (gid, task) groups;
     val jobs' = Task_Graph.del_node task jobs;
-  in make_queue groups' jobs' end;
+  in make_queue groups' jobs' Unknown end;
 
 end;