eliminated slightly odd abstract type Task_Queue.deps;
authorwenzelm
Wed, 02 Feb 2011 20:32:50 +0100
changeset 41695 afdbec23b92b
parent 41694 a96d43a54650
child 41696 f69bb9077b02
eliminated slightly odd abstract type Task_Queue.deps; tuned signature; tuned;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/future.ML	Wed Feb 02 18:22:13 2011 +0100
+++ b/src/Pure/Concurrent/future.ML	Wed Feb 02 20:32:50 2011 +0100
@@ -440,12 +440,12 @@
       else res);
 
 fun join_next deps = (*requires SYNCHRONIZED*)
-  if Task_Queue.finished_deps deps then NONE
+  if null deps then NONE
   else
     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
-      (NONE, deps') =>
-        if Task_Queue.finished_deps deps' then NONE
-        else (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
+      (NONE, []) => NONE
+    | (NONE, deps') =>
+        (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
     | (SOME work, deps') => SOME (work, deps'));
 
 fun execute_work NONE = ()
@@ -461,8 +461,7 @@
       if forall is_finished xs then ()
       else if Multithreading.self_critical () then
         error "Cannot join future values within critical section"
-      else if is_some (worker_task ()) then
-        join_work (Task_Queue.init_deps (map task_of xs))
+      else if is_some (worker_task ()) then join_work (map task_of xs)
       else List.app (ignore o Single_Assignment.await o result_of) xs;
   in map get_result xs end;
 
@@ -533,8 +532,8 @@
                   (Task_Queue.dequeue_passive (Thread.self ()) task));
           in if still_passive then execute (task, [job]) else () end);
       val _ =
-        worker_waiting (Task_Queue.init_deps [task])
-          (fn () => Single_Assignment.await result);
+        if is_some (Single_Assignment.peek result) then ()
+        else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
     in () end;
 
 fun fulfill x res = fulfill_result x (Exn.Result res);
--- a/src/Pure/Concurrent/task_queue.ML	Wed Feb 02 18:22:13 2011 +0100
+++ b/src/Pure/Concurrent/task_queue.ML	Wed Feb 02 20:32:50 2011 +0100
@@ -21,6 +21,9 @@
   val pri_of_task: task -> int
   val str_of_task: task -> string
   val timing_of_task: task -> Time.time * Time.time * string list
+  val running: task -> (unit -> 'a) -> 'a
+  val joining: task -> (unit -> 'a) -> 'a
+  val waiting: task -> task list -> (unit -> 'a) -> 'a
   type queue
   val empty: queue
   val all_passive: queue -> bool
@@ -34,14 +37,8 @@
   val extend: task -> (bool -> bool) -> queue -> queue option
   val dequeue_passive: Thread.thread -> task -> queue -> bool * queue
   val dequeue: Thread.thread -> queue -> (task * (bool -> bool) list) option * queue
-  type deps
-  val init_deps: task list -> deps
-  val finished_deps: deps -> bool
-  val dequeue_deps: Thread.thread -> deps -> queue ->
-    (((task * (bool -> bool) list) option * deps) * queue)
-  val running: task -> (unit -> 'a) -> 'a
-  val joining: task -> (unit -> 'a) -> 'a
-  val waiting: task -> deps -> (unit -> 'a) -> 'a
+  val dequeue_deps: Thread.thread -> task list -> queue ->
+    (((task * (bool -> bool) list) option * task list) * queue)
 end;
 
 structure Task_Queue: TASK_QUEUE =
@@ -140,6 +137,19 @@
 structure Task_Graph = Graph(type key = task val ord = task_ord);
 
 
+(* timing *)
+
+fun running task =
+  update_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) task;
+
+fun joining task =
+  update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) task;
+
+fun waiting task deps =
+  update_timing (fn t => fn (a, b, ds) =>
+    (Time.- (a, t), Time.+ (b, t), fold (insert (op =) o name_of_task) deps ds)) task;
+
+
 
 (** queue of jobs and groups **)
 
@@ -165,7 +175,7 @@
 (* queue *)
 
 datatype queue = Queue of
- {groups: task list Inttab.table,   (*groups with presently known members*)
+ {groups: task list Inttab.table,   (*presently known group members*)
   jobs: jobs};                      (*job dependency graph*)
 
 fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
@@ -291,15 +301,7 @@
 
 (* dequeue wrt. dynamic dependencies *)
 
-abstype deps = Deps of task list
-with
-
-fun init_deps tasks = Deps tasks;
-fun finished_deps (Deps tasks) = null tasks;
-
-fun insert_deps (Deps tasks) = fold (insert (op =) o name_of_task) tasks;
-
-fun dequeue_deps thread (Deps deps) (queue as Queue {groups, jobs}) =
+fun dequeue_deps thread deps (queue as Queue {groups, jobs}) =
   let
     fun ready [] rest = (NONE, rev rest)
       | ready (task :: tasks) rest =
@@ -322,28 +324,14 @@
 
     fun result (res as (task, _)) deps' =
       let val jobs' = set_job task (Running thread) jobs
-      in ((SOME res, Deps deps'), make_queue groups jobs') end;
+      in ((SOME res, deps'), make_queue groups jobs') end;
   in
     (case ready deps [] of
       (SOME res, deps') => result res deps'
     | (NONE, deps') =>
         (case ready_dep [] deps' of
           SOME res => result res deps'
-        | NONE => ((NONE, Deps deps'), queue)))
+        | NONE => ((NONE, deps'), queue)))
   end;
 
 end;
-
-
-(* timing *)
-
-fun running task =
-  update_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) task;
-
-fun joining task =
-  update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) task;
-
-fun waiting task deps =
-  update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), Time.+ (b, t), insert_deps deps ds)) task;
-
-end;