eliminated cache, which complicates the code without making a real difference (NB: deque_towards often disrupts caching, and daisy-chaining of workers already reduces queue overhead);
authorwenzelm
Wed, 06 Jan 2010 18:14:16 +0100
changeset 34280 16bf3e9786a3
parent 34279 02936e77a07c
child 34281 eedea6f0b37e
eliminated cache, which complicates the code without making a real difference (NB: deque_towards often disrupts caching, and daisy-chaining of workers already reduces queue overhead);
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/future.ML	Wed Jan 06 15:07:56 2010 +0100
+++ b/src/Pure/Concurrent/future.ML	Wed Jan 06 18:14:16 2010 +0100
@@ -179,7 +179,7 @@
   in (result, job) end;
 
 fun cancel_now group = (*requires SYNCHRONIZED*)
-  Unsynchronized.change_result queue (Task_Queue.cancel group);
+  Task_Queue.cancel (! queue) group;
 
 fun cancel_later group = (*requires SYNCHRONIZED*)
  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
@@ -351,7 +351,7 @@
   in continue end
   handle Exn.Interrupt =>
    (Multithreading.tracing 1 (fn () => "Interrupt");
-    List.app cancel_later (Unsynchronized.change_result queue Task_Queue.cancel_all);
+    List.app cancel_later (Task_Queue.cancel_all (! queue));
     broadcast_work (); true);
 
 fun scheduler_loop () =
--- a/src/Pure/Concurrent/task_queue.ML	Wed Jan 06 15:07:56 2010 +0100
+++ b/src/Pure/Concurrent/task_queue.ML	Wed Jan 06 18:14:16 2010 +0100
@@ -22,8 +22,8 @@
   val empty: queue
   val all_passive: queue -> bool
   val status: queue -> {ready: int, pending: int, running: int, passive: int}
-  val cancel: group -> queue -> bool * queue
-  val cancel_all: queue -> group list * queue
+  val cancel: queue -> group -> bool
+  val cancel_all: queue -> group list
   val enqueue_passive: group -> queue -> task * queue
   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
   val extend: task -> (bool -> bool) -> queue -> queue option
@@ -118,16 +118,13 @@
 
 (* queue of grouped jobs *)
 
-datatype result = Unknown | Result of task | No_Result;
-
 datatype queue = Queue of
  {groups: task list Inttab.table,   (*groups with presently active members*)
-  jobs: jobs,                       (*job dependency graph*)
-  cache: result};                   (*last dequeue result*)
+  jobs: jobs};                      (*job dependency graph*)
 
-fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache};
+fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
 
-val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
+val empty = make_queue Inttab.empty Task_Graph.empty;
 
 fun all_passive (Queue {jobs, ...}) =
   Task_Graph.get_first NONE
@@ -150,16 +147,16 @@
 
 (* cancel -- peers and sub-groups *)
 
-fun cancel group (Queue {groups, jobs, ...}) =
+fun cancel (Queue {groups, jobs}) group =
   let
     val _ = cancel_group group Exn.Interrupt;
     val tasks = Inttab.lookup_list groups (group_id group);
     val running =
       fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
     val _ = List.app SimpleThread.interrupt running;
-  in (null running, make_queue groups jobs Unknown) end;
+  in null running end;
 
-fun cancel_all (Queue {groups, jobs, ...}) =
+fun cancel_all (Queue {groups, jobs}) =
   let
     fun cancel_job (group, job) (groups, running) =
       (cancel_group group Exn.Interrupt;
@@ -168,20 +165,20 @@
         | _ => (groups, running)));
     val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
     val _ = List.app SimpleThread.interrupt running;
-  in (running_groups, make_queue groups jobs Unknown) end;
+  in running_groups end;
 
 
 (* enqueue *)
 
-fun enqueue_passive group (Queue {groups, jobs, cache}) =
+fun enqueue_passive group (Queue {groups, jobs}) =
   let
     val task = new_task NONE;
     val groups' = groups
       |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
     val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive));
-  in (task, make_queue groups' jobs' cache) end;
+  in (task, make_queue groups' jobs') end;
 
-fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
+fun enqueue group deps pri job (Queue {groups, jobs}) =
   let
     val task = new_task (SOME pri);
     val groups' = groups
@@ -191,49 +188,36 @@
       |> fold (add_job task) deps
       |> fold (fold (add_job task) o get_deps jobs) deps;
     val minimal = null (get_deps jobs' task);
-    val cache' =
-      (case cache of
-        Result last =>
-          if task_ord (last, task) = LESS
-          then cache else Unknown
-      | _ => Unknown);
-  in ((task, minimal), make_queue groups' jobs' cache') end;
+  in ((task, minimal), make_queue groups' jobs') end;
 
-fun extend task job (Queue {groups, jobs, cache}) =
+fun extend task job (Queue {groups, jobs}) =
   (case try (get_job jobs) task of
-    SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs) cache)
+    SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs))
   | _ => NONE);
 
 
 (* dequeue *)
 
-fun dequeue thread (queue as Queue {groups, jobs, cache}) =
+fun dequeue thread (queue as Queue {groups, jobs}) =
   let
     fun ready (task, ((group, Job list), (deps, _))) =
           if is_ready deps group then SOME (task, group, rev list) else NONE
       | ready _ = NONE;
-    fun deq boundary =
-      (case Task_Graph.get_first boundary ready jobs of
-        NONE => (NONE, make_queue groups jobs No_Result)
-      | SOME (result as (task, _, _)) =>
-          let
-            val jobs' = set_job task (Running thread) jobs;
-            val cache' = Result task;
-          in (SOME result, make_queue groups jobs' cache') end);
   in
-    (case cache of
-      Unknown => deq NONE
-    | Result last => deq (SOME last)
-    | No_Result => (NONE, queue))
+    (case Task_Graph.get_first NONE ready jobs of
+      NONE => (NONE, queue)
+    | SOME (result as (task, _, _)) =>
+        let val jobs' = set_job task (Running thread) jobs
+        in (SOME result, make_queue groups jobs') end)
   end;
 
 
 (* dequeue_towards -- adhoc dependencies *)
 
-fun depend task deps (Queue {groups, jobs, ...}) =
-  make_queue groups (fold (add_dep task) deps jobs) Unknown;
+fun depend task deps (Queue {groups, jobs}) =
+  make_queue groups (fold (add_dep task) deps jobs);
 
-fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) =
+fun dequeue_towards thread deps (queue as Queue {groups, jobs}) =
   let
     fun ready task =
       (case Task_Graph.get_node jobs task of
@@ -244,10 +228,8 @@
       | _ => NONE);
     val tasks = filter (can (Task_Graph.get_node jobs)) deps;
     fun result (res as (task, _, _)) =
-      let
-        val jobs' = set_job task (Running thread) jobs;
-        val cache' = Unknown;
-      in ((SOME res, tasks), make_queue groups jobs' cache') end;
+      let val jobs' = set_job task (Running thread) jobs
+      in ((SOME res, tasks), make_queue groups jobs') end;
   in
     (case get_first ready tasks of
       SOME res => result res
@@ -260,14 +242,13 @@
 
 (* finish *)
 
-fun finish task (Queue {groups, jobs, cache}) =
+fun finish task (Queue {groups, jobs}) =
   let
     val group = get_group jobs task;
     val groups' = groups
       |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group);
     val jobs' = Task_Graph.del_node task jobs;
     val maximal = null (Task_Graph.imm_succs jobs task);
-    val cache' = if maximal then cache else Unknown;
-  in (maximal, make_queue groups' jobs' cache') end;
+  in (maximal, make_queue groups' jobs') end;
 
 end;