added Future.promise/fulfill -- promised futures that are fulfilled by external means;
authorwenzelm
Tue, 05 Jan 2010 23:38:10 +0100
changeset 34277 7325a5e3587f
parent 34276 12436485c244
child 34278 228f27469139
added Future.promise/fulfill -- promised futures that are fulfilled by external means; Future.value: official result assignment -- produces immutable ref; Future.shutdown: raw_wait keeps raw task attributes, e.g. asynchronous interrupts of toplevel; Task_Queue: passive tasks track dependencies, but lack any evaluation process; tuned;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/future.ML	Tue Jan 05 18:20:18 2010 +0100
+++ b/src/Pure/Concurrent/future.ML	Tue Jan 05 23:38:10 2010 +0100
@@ -23,6 +23,10 @@
     of runtime resources is distorted either if workers yield CPU time
     (e.g. via system sleep or wait operations), or if non-worker
     threads contend for significant runtime resources independently.
+
+  * Promised futures are fulfilled by external means.  There is no
+    associated evaluation task, but other futures can depend on them
+    as usual.
 *)
 
 signature FUTURE =
@@ -37,7 +41,6 @@
   val group_of: 'a future -> group
   val peek: 'a future -> 'a Exn.result option
   val is_finished: 'a future -> bool
-  val value: 'a -> 'a future
   val fork_group: group -> (unit -> 'a) -> 'a future
   val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
@@ -46,7 +49,12 @@
   val join_results: 'a future list -> 'a Exn.result list
   val join_result: 'a future -> 'a Exn.result
   val join: 'a future -> 'a
+  val value: 'a -> 'a future
   val map: ('a -> 'b) -> 'a future -> 'b future
+  val promise_group: group -> 'a future
+  val promise: unit -> 'a future
+  val fulfill_result: 'a future -> 'a Exn.result -> unit
+  val fulfill: 'a future -> 'a -> unit
   val interruptible_task: ('a -> 'b) -> 'a -> 'b
   val cancel_group: group -> unit
   val cancel: 'a future -> unit
@@ -75,11 +83,14 @@
 val worker_task = Option.map #1 o thread_data;
 val worker_group = Option.map #2 o thread_data;
 
+fun new_group () = Task_Queue.new_group (worker_group ());
+
 
 (* datatype future *)
 
 datatype 'a future = Future of
- {task: task,
+ {promised: bool,
+  task: task,
   group: group,
   result: 'a Exn.result option Synchronized.var};
 
@@ -90,10 +101,14 @@
 fun peek x = Synchronized.value (result_of x);
 fun is_finished x = is_some (peek x);
 
-fun value x = Future
- {task = Task_Queue.new_task 0,
-  group = Task_Queue.new_group NONE,
-  result = Synchronized.var "future" (SOME (Exn.Result x))};
+fun assign_result group result res =
+  let
+    val _ = Synchronized.assign result (K (SOME res));
+    val ok =
+      (case res of
+        Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
+      | Exn.Result _ => true);
+  in ok end;
 
 
 
@@ -111,6 +126,9 @@
 
 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
 
+fun raw_wait cond = (*requires SYNCHRONIZED*)
+  ConditionVar.wait (cond, lock);
+
 fun wait cond = (*requires SYNCHRONIZED*)
   Multithreading.sync_wait NONE NONE cond lock;
 
@@ -160,12 +178,7 @@
             Exn.capture (fn () =>
               Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
           else Exn.Exn Exn.Interrupt;
-        val _ = Synchronized.assign result (K (SOME res));
-      in
-        (case res of
-          Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
-        | Exn.Result _ => true)
-      end;
+      in assign_result group result res end;
   in (result, job) end;
 
 fun do_cancel group = (*requires SYNCHRONIZED*)
@@ -247,7 +260,7 @@
       if tick andalso ! status_ticks = 0 then
         Multithreading.tracing 1 (fn () =>
           let
-            val {ready, pending, running} = Task_Queue.status (! queue);
+            val {ready, pending, running, passive} = Task_Queue.status (! queue);
             val total = length (! workers);
             val active = count_workers Working;
             val waiting = count_workers Waiting;
@@ -255,7 +268,8 @@
             "SCHEDULE " ^ Time.toString now ^ ": " ^
               string_of_int ready ^ " ready, " ^
               string_of_int pending ^ " pending, " ^
-              string_of_int running ^ " running; " ^
+              string_of_int running ^ " running, " ^
+              string_of_int passive ^ " passive; " ^
               string_of_int total ^ " workers, " ^
               string_of_int active ^ " active, " ^
               string_of_int waiting ^ " waiting "
@@ -329,7 +343,7 @@
 
     (* shutdown *)
 
-    val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
+    val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
     val continue = not (! do_shutdown andalso null (! workers));
     val _ = if continue then () else scheduler := NONE;
 
@@ -364,8 +378,8 @@
   let
     val group =
       (case opt_group of
-        SOME group => group
-      | NONE => Task_Queue.new_group (worker_group ()));
+        NONE => new_group ()
+      | SOME group => group);
     val (result, job) = future_job group e;
     val task = SYNCHRONIZED "enqueue" (fn () =>
       let
@@ -374,7 +388,7 @@
         val _ = if minimal then signal work_available else ();
         val _ = scheduler_check ();
       in task end);
-  in Future {task = task, group = group, result = result} end;
+  in Future {promised = false, task = task, group = group, result = result} end;
 
 fun fork_group group e = fork_future (SOME group) [] 0 e;
 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
@@ -432,7 +446,14 @@
 fun join x = Exn.release (join_result x);
 
 
-(* map *)
+(* fast-path versions -- bypassing full task management *)
+
+fun value (x: 'a) =
+  let
+    val group = Task_Queue.new_group NONE;
+    val result = Synchronized.var "value" NONE : 'a Exn.result option Synchronized.var;
+    val _ = assign_result group result (Exn.Result x);
+  in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
 
 fun map_future f x =
   let
@@ -445,11 +466,32 @@
         SOME queue' => (queue := queue'; true)
       | NONE => false));
   in
-    if extended then Future {task = task, group = group, result = result}
+    if extended then Future {promised = false, task = task, group = group, result = result}
     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   end;
 
 
+(* promised futures -- fulfilled by external means *)
+
+fun promise_group group : 'a future =
+  let
+    val result = Synchronized.var "promise" (NONE: 'a Exn.result option);
+    val task = SYNCHRONIZED "enqueue" (fn () =>
+      Unsynchronized.change_result queue (Task_Queue.enqueue_passive group));
+  in Future {promised = true, task = task, group = group, result = result} end;
+
+fun promise () = promise_group (new_group ());
+
+fun fulfill_result (Future {promised, task, group, result}) res =
+  let
+    val _ = promised orelse raise Fail "Not a promised future";
+    fun job ok = assign_result group result (if ok then res else Exn.Exn Exn.Interrupt);
+    val _ = execute (task, group, [job]);
+  in () end;
+
+fun fulfill x res = fulfill_result x (Exn.Result res);
+
+
 (* cancellation *)
 
 fun interruptible_task f x =
@@ -472,7 +514,7 @@
   if Multithreading.available then
     SYNCHRONIZED "shutdown" (fn () =>
      while scheduler_active () do
-      (wait scheduler_event; broadcast_work ()))
+      (raw_wait scheduler_event; broadcast_work ()))
   else ();
 
 
--- a/src/Pure/Concurrent/task_queue.ML	Tue Jan 05 18:20:18 2010 +0100
+++ b/src/Pure/Concurrent/task_queue.ML	Tue Jan 05 23:38:10 2010 +0100
@@ -7,7 +7,7 @@
 signature TASK_QUEUE =
 sig
   type task
-  val new_task: int -> task
+  val dummy_task: task
   val pri_of_task: task -> int
   val str_of_task: task -> string
   type group
@@ -20,10 +20,11 @@
   val str_of_group: group -> string
   type queue
   val empty: queue
-  val is_empty: queue -> bool
-  val status: queue -> {ready: int, pending: int, running: int}
+  val all_passive: queue -> bool
+  val status: queue -> {ready: int, pending: int, running: int, passive: int}
   val cancel: queue -> group -> bool
   val cancel_all: queue -> group list
+  val enqueue_passive: group -> queue -> task * queue
   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
   val extend: task -> (bool -> bool) -> queue -> queue option
   val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
@@ -38,13 +39,14 @@
 
 (* tasks *)
 
-datatype task = Task of int * serial;
+datatype task = Task of int option * serial;
+val dummy_task = Task (NONE, ~1);
 fun new_task pri = Task (pri, serial ());
 
-fun pri_of_task (Task (pri, _)) = pri;
+fun pri_of_task (Task (pri, _)) = the_default 0 pri;
 fun str_of_task (Task (_, i)) = string_of_int i;
 
-fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2);
+fun task_ord (Task t1, Task t2) = prod_ord (rev_order o option_ord int_ord) int_ord (t1, t2);
 structure Task_Graph = Graph(type key = task val ord = task_ord);
 
 
@@ -91,7 +93,8 @@
 
 datatype job =
   Job of (bool -> bool) list |
-  Running of Thread.thread;
+  Running of Thread.thread |
+  Passive;
 
 type jobs = (group * job) Task_Graph.T;
 
@@ -123,20 +126,24 @@
 fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache};
 
 val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
-fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
+
+fun all_passive (Queue {jobs, ...}) =
+  Task_Graph.get_first NONE
+    ((fn Job _ => SOME () | Running _ => SOME () | Passive => NONE) o #2 o #1 o #2) jobs |> is_none;
 
 
 (* queue status *)
 
 fun status (Queue {jobs, ...}) =
   let
-    val (x, y, z) =
-      Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z) =>
+    val (x, y, z, w) =
+      Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) =>
           (case job of
-            Job _ => if null deps then (x + 1, y, z) else (x, y + 1, z)
-          | Running _ => (x, y, z + 1)))
-        jobs (0, 0, 0);
-  in {ready = x, pending = y, running = z} end;
+            Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w)
+          | Running _ => (x, y, z + 1, w)
+          | Passive => (x, y, z, w + 1)))
+        jobs (0, 0, 0, 0);
+  in {ready = x, pending = y, running = z, passive = w} end;
 
 
 (* cancel -- peers and sub-groups *)
@@ -154,18 +161,27 @@
   let
     fun cancel_job (group, job) (groups, running) =
       (cancel_group group Exn.Interrupt;
-        (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
+        (case job of
+          Running t => (insert eq_group group groups, insert Thread.equal t running)
         | _ => (groups, running)));
-    val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
+    val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
     val _ = List.app SimpleThread.interrupt running;
-  in groups end;
+  in running_groups end;
 
 
 (* enqueue *)
 
+fun enqueue_passive group (Queue {groups, jobs, cache}) =
+  let
+    val task = new_task NONE;
+    val groups' = groups
+      |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
+    val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive));
+  in (task, make_queue groups' jobs' cache) end;
+
 fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
   let
-    val task = new_task pri;
+    val task = new_task (SOME pri);
     val groups' = groups
       |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
     val jobs' = jobs