merged
authorwenzelm
Mon, 27 Jul 2009 13:32:29 +0200
changeset 32223 f5f46d6eb95b
parent 32217 420108dd7dfe (current diff)
parent 32222 572b92362496 (diff)
child 32224 a46f5e9b1718
merged
--- a/src/Pure/Concurrent/future.ML	Mon Jul 27 09:01:13 2009 +0200
+++ b/src/Pure/Concurrent/future.ML	Mon Jul 27 13:32:29 2009 +0200
@@ -11,7 +11,7 @@
     processes.
 
   * Futures are grouped; failure of one group member causes the whole
-    group to be interrupted eventually.
+    group to be interrupted eventually.  Groups are block-structured.
 
   * Forked futures are evaluated spontaneously by a farm of worker
     threads in the background; join resynchronizes the computation and
@@ -46,7 +46,6 @@
   val join: 'a future -> 'a
   val map: ('a -> 'b) -> 'a future -> 'b future
   val interruptible_task: ('a -> 'b) -> 'a -> 'b
-  val interrupt_task: string -> unit
   val cancel_group: group -> unit
   val cancel: 'a future -> unit
   val shutdown: unit -> unit
@@ -114,20 +113,26 @@
 
 (* synchronization *)
 
+val scheduler_event = ConditionVar.conditionVar ();
+val work_available = ConditionVar.conditionVar ();
+val work_finished = ConditionVar.conditionVar ();
+
 local
   val lock = Mutex.mutex ();
-  val cond = ConditionVar.conditionVar ();
 in
 
 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
 
-fun wait () = (*requires SYNCHRONIZED*)
+fun wait cond = (*requires SYNCHRONIZED*)
   ConditionVar.wait (cond, lock);
 
-fun wait_timeout timeout = (*requires SYNCHRONIZED*)
+fun wait_timeout cond timeout = (*requires SYNCHRONIZED*)
   ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)));
 
-fun notify_all () = (*requires SYNCHRONIZED*)
+fun signal cond = (*requires SYNCHRONIZED*)
+  ConditionVar.signal cond;
+
+fun broadcast cond = (*requires SYNCHRONIZED*)
   ConditionVar.broadcast cond;
 
 end;
@@ -183,29 +188,35 @@
     val ok = setmp_thread_data (name, task, group) (fn () =>
       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
     val _ = SYNCHRONIZED "execute" (fn () =>
-     (change queue (Task_Queue.finish task);
-      if ok then ()
-      else if Task_Queue.cancel (! queue) group then ()
-      else do_cancel group;
-      notify_all ()));
+      let
+        val maximal = change_result queue (Task_Queue.finish task);
+        val _ =
+          if ok then ()
+          else if Task_Queue.cancel (! queue) group then ()
+          else do_cancel group;
+        val _ = broadcast work_finished;
+        val _ = if maximal then () else broadcast work_available;
+      in () end);
   in () end;
 
 
 (* worker threads *)
 
-fun worker_wait () = (*requires SYNCHRONIZED*)
-  (change_active false; wait (); change_active true);
+fun worker_wait cond = (*requires SYNCHRONIZED*)
+ (change_active false; broadcast scheduler_event;
+  wait cond;
+  change_active true; broadcast scheduler_event);
 
 fun worker_next () = (*requires SYNCHRONIZED*)
   if ! excessive > 0 then
     (dec excessive;
      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
-     notify_all ();
+     broadcast scheduler_event;
      NONE)
-  else if overloaded () then (worker_wait (); worker_next ())
+  else if overloaded () then (worker_wait scheduler_event; worker_next ())
   else
     (case change_result queue Task_Queue.dequeue of
-      NONE => (worker_wait (); worker_next ())
+      NONE => (worker_wait work_available; worker_next ())
     | some => some);
 
 fun worker_loop name =
@@ -231,37 +242,38 @@
       end);
 
     (*worker threads*)
-    val ws = ! workers;
     val _ =
-      if forall (Thread.isActive o #1) ws then ()
+      if forall (Thread.isActive o #1) (! workers) then ()
       else
-        (case List.partition (Thread.isActive o #1) ws of
+        (case List.partition (Thread.isActive o #1) (! workers) of
           (_, []) => ()
-        | (active, inactive) =>
-            (workers := active; Multithreading.tracing 0 (fn () =>
-              "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
+        | (alive, dead) =>
+            (workers := alive; Multithreading.tracing 0 (fn () =>
+              "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
     val _ = trace_active ();
 
     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
     val mm = (m * 3) div 2;
-    val l = length ws;
+    val l = length (! workers);
     val _ = excessive := l - mm;
     val _ =
       if mm > l then
-        funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
+       (funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ();
+        broadcast scheduler_event)
       else ();
 
     (*canceled groups*)
     val _ = change canceled (filter_out (Task_Queue.cancel (! queue)));
 
-    (*shutdown*)
-    val continue = not (! do_shutdown andalso null ws);
-    val _ = if continue then () else scheduler := NONE;
+    val timeout =
+      Time.fromMilliseconds (if not (! do_shutdown) andalso null (! canceled) then 500 else 50);
+    val _ = interruptible (fn () => wait_timeout scheduler_event timeout) ()
+      handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
 
-    val _ = notify_all ();
-    val _ = interruptible (fn () =>
-        wait_timeout (Time.fromMilliseconds (if null (! canceled) then 1000 else 50))) ()
-      handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
+    (*shutdown*)
+    val continue = not (! do_shutdown andalso null (! workers));
+    val _ = if continue then () else scheduler := NONE;
+    val _ = broadcast scheduler_event;
   in continue end;
 
 fun scheduler_loop () =
@@ -272,7 +284,8 @@
 
 fun scheduler_check name = SYNCHRONIZED name (fn () =>
   if not (scheduler_active ()) then
-    (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
+   (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop);
+    broadcast scheduler_event)
   else if ! do_shutdown then error "Scheduler shutdown in progress"
   else ());
 
@@ -292,7 +305,10 @@
       | NONE => Task_Queue.new_group (worker_group ()));
     val (result, job) = future_job group e;
     val task = SYNCHRONIZED "future" (fn () =>
-      change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ());
+      let
+        val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
+        val _ = if minimal then signal work_available else ();
+      in task end);
   in Future {task = task, group = group, result = result} end;
 
 fun fork e = fork_future NONE [] 0 e;
@@ -313,7 +329,7 @@
   | SOME res => res);
 
 fun join_next deps = (*requires SYNCHRONIZED*)
-  if overloaded () then (worker_wait (); join_next deps)
+  if overloaded () then (worker_wait scheduler_event; join_next deps)
   else change_result queue (Task_Queue.dequeue_towards deps);
 
 fun join_deps deps =
@@ -336,7 +352,7 @@
 
       fun join_wait x =
         if SYNCHRONIZED "join_wait" (fn () =>
-          is_finished x orelse (if worker then worker_wait () else wait (); false))
+          is_finished x orelse ((if worker then worker_wait else wait) work_finished; false))
         then () else join_wait x;
 
       val _ = xs |> List.app (fn x =>
@@ -383,14 +399,10 @@
       (fn _ => f) x
   else interruptible f x;
 
-(*interrupt: permissive signal, may get ignored*)
-fun interrupt_task id = SYNCHRONIZED "interrupt"
-  (fn () => Task_Queue.interrupt_external (! queue) id);
-
-(*cancel: present and future group members will be interrupted eventually*)
+(*cancel_group: present and future group members will be interrupted eventually*)
 fun cancel_group group =
  (scheduler_check "cancel check";
-  SYNCHRONIZED "cancel" (fn () => (do_cancel group; notify_all ())));
+  SYNCHRONIZED "cancel" (fn () => (do_cancel group; broadcast scheduler_event)));
 
 fun cancel x = cancel_group (group_of x);
 
@@ -401,13 +413,13 @@
   if Multithreading.available then
    (scheduler_check "shutdown check";
     SYNCHRONIZED "shutdown" (fn () =>
-     (while not (scheduler_active ()) do wait ();
-      while not (Task_Queue.is_empty (! queue)) do wait ();
+     (while not (scheduler_active ()) do wait scheduler_event;
+      while not (Task_Queue.is_empty (! queue)) do wait scheduler_event;
       do_shutdown := true;
-      notify_all ();
-      while not (null (! workers)) do wait ();
-      while scheduler_active () do wait ();
-      OS.Process.sleep (Time.fromMilliseconds 300))))
+      while scheduler_active () do
+       (broadcast work_available;
+        broadcast scheduler_event;
+        wait scheduler_event))))
   else ();
 
 
--- a/src/Pure/Concurrent/task_queue.ML	Mon Jul 27 09:01:13 2009 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Mon Jul 27 13:32:29 2009 +0200
@@ -11,27 +11,25 @@
   val pri_of_task: task -> int
   val str_of_task: task -> string
   type group
+  val new_group: group option -> group
   val group_id: group -> int
   val eq_group: group * group -> bool
-  val new_group: group option -> group
+  val cancel_group: group -> exn -> unit
+  val is_canceled: group -> bool
   val group_status: group -> exn list
   val str_of_group: group -> string
   type queue
   val empty: queue
   val is_empty: queue -> bool
   val status: queue -> {ready: int, pending: int, running: int}
-  val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue
+  val cancel: queue -> group -> bool
+  val cancel_all: queue -> group list
+  val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
   val extend: task -> (bool -> bool) -> queue -> queue option
   val dequeue: queue -> (task * group * (bool -> bool) list) option * queue
   val dequeue_towards: task list -> queue ->
     (((task * group * (bool -> bool) list) * task list) option * queue)
-  val interrupt: queue -> task -> unit
-  val interrupt_external: queue -> string -> unit
-  val is_canceled: group -> bool
-  val cancel_group: group -> exn -> unit
-  val cancel: queue -> group -> bool
-  val cancel_all: queue -> group list
-  val finish: task -> queue -> queue
+  val finish: task -> queue -> bool * queue
 end;
 
 structure Task_Queue:> TASK_QUEUE =
@@ -67,17 +65,19 @@
   id :: (case parent of NONE => [] | SOME group => group_ancestry group);
 
 
+(* group status *)
+
 fun cancel_group (Group {status, ...}) exn = CRITICAL (fn () =>
   (case exn of
     Exn.Interrupt => if null (! status) then status := [exn] else ()
   | _ => change status (cons exn)));
 
+fun is_canceled (Group {parent, status, ...}) = (*non-critical*)
+  not (null (! status)) orelse (case parent of NONE => false | SOME group => is_canceled group);
+
 fun group_status (Group {parent, status, ...}) = (*non-critical*)
   ! status @ (case parent of NONE => [] | SOME group => group_status group);
 
-fun is_canceled (Group {parent, status, ...}) = (*non-critical*)
-  not (null (! status)) orelse (case parent of NONE => false | SOME group => is_canceled group);
-
 fun str_of_group group =
   (is_canceled group ? enclose "(" ")") (string_of_int (group_id group));
 
@@ -162,13 +162,14 @@
       |> Task_Graph.new_node (task, (group, Job [job]))
       |> fold (add_job task) deps
       |> fold (fold (add_job task) o get_deps jobs) deps;
+    val minimal = null (get_deps jobs' task);
     val cache' =
       (case cache of
         Result last =>
           if task_ord (last, task) = LESS
           then cache else Unknown
       | _ => Unknown);
-  in (task, make_queue groups' jobs' cache') end;
+  in ((task, minimal), make_queue groups' jobs' cache') end;
 
 fun extend task job (Queue {groups, jobs, cache}) =
   (case try (get_job jobs) task of
@@ -225,23 +226,7 @@
   end;
 
 
-(* sporadic interrupts *)
-
-fun interrupt (Queue {jobs, ...}) task =
-  (case try (get_job jobs) task of
-    SOME (Running thread) => SimpleThread.interrupt thread
-  | _ => ());
-
-fun interrupt_external (queue as Queue {jobs, ...}) str =
-  (case Int.fromString str of
-    SOME i =>
-      (case Task_Graph.get_first NONE
-          (fn (task as Task (_, j), _) => if i = j then SOME task else NONE) jobs
-        of SOME task => interrupt queue task | NONE => ())
-  | NONE => ());
-
-
-(* termination *)
+(* finish *)
 
 fun finish task (Queue {groups, jobs, cache}) =
   let
@@ -249,9 +234,8 @@
     val groups' = groups
       |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group);
     val jobs' = Task_Graph.del_node task jobs;
-    val cache' =
-      if null (Task_Graph.imm_succs jobs task) then cache
-      else Unknown;
-  in make_queue groups' jobs' cache' end;
+    val maximal = null (Task_Graph.imm_succs jobs task);
+    val cache' = if maximal then cache else Unknown;
+  in (maximal, make_queue groups' jobs' cache') end;
 
 end;