src/Pure/Concurrent/future.ML
changeset 34277 7325a5e3587f
parent 33416 13d00799fe49
child 34279 02936e77a07c
--- a/src/Pure/Concurrent/future.ML	Tue Jan 05 18:20:18 2010 +0100
+++ b/src/Pure/Concurrent/future.ML	Tue Jan 05 23:38:10 2010 +0100
@@ -23,6 +23,10 @@
     of runtime resources is distorted either if workers yield CPU time
     (e.g. via system sleep or wait operations), or if non-worker
     threads contend for significant runtime resources independently.
+
+  * Promised futures are fulfilled by external means.  There is no
+    associated evaluation task, but other futures can depend on them
+    as usual.
 *)
 
 signature FUTURE =
@@ -37,7 +41,6 @@
   val group_of: 'a future -> group
   val peek: 'a future -> 'a Exn.result option
   val is_finished: 'a future -> bool
-  val value: 'a -> 'a future
   val fork_group: group -> (unit -> 'a) -> 'a future
   val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
@@ -46,7 +49,12 @@
   val join_results: 'a future list -> 'a Exn.result list
   val join_result: 'a future -> 'a Exn.result
   val join: 'a future -> 'a
+  val value: 'a -> 'a future
   val map: ('a -> 'b) -> 'a future -> 'b future
+  val promise_group: group -> 'a future
+  val promise: unit -> 'a future
+  val fulfill_result: 'a future -> 'a Exn.result -> unit
+  val fulfill: 'a future -> 'a -> unit
   val interruptible_task: ('a -> 'b) -> 'a -> 'b
   val cancel_group: group -> unit
   val cancel: 'a future -> unit
@@ -75,11 +83,14 @@
 val worker_task = Option.map #1 o thread_data;
 val worker_group = Option.map #2 o thread_data;
 
+fun new_group () = Task_Queue.new_group (worker_group ());
+
 
 (* datatype future *)
 
 datatype 'a future = Future of
- {task: task,
+ {promised: bool,
+  task: task,
   group: group,
   result: 'a Exn.result option Synchronized.var};
 
@@ -90,10 +101,14 @@
 fun peek x = Synchronized.value (result_of x);
 fun is_finished x = is_some (peek x);
 
-fun value x = Future
- {task = Task_Queue.new_task 0,
-  group = Task_Queue.new_group NONE,
-  result = Synchronized.var "future" (SOME (Exn.Result x))};
+fun assign_result group result res =
+  let
+    val _ = Synchronized.assign result (K (SOME res));
+    val ok =
+      (case res of
+        Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
+      | Exn.Result _ => true);
+  in ok end;
 
 
 
@@ -111,6 +126,9 @@
 
 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
 
+fun raw_wait cond = (*requires SYNCHRONIZED*)
+  ConditionVar.wait (cond, lock);
+
 fun wait cond = (*requires SYNCHRONIZED*)
   Multithreading.sync_wait NONE NONE cond lock;
 
@@ -160,12 +178,7 @@
             Exn.capture (fn () =>
               Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
           else Exn.Exn Exn.Interrupt;
-        val _ = Synchronized.assign result (K (SOME res));
-      in
-        (case res of
-          Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
-        | Exn.Result _ => true)
-      end;
+      in assign_result group result res end;
   in (result, job) end;
 
 fun do_cancel group = (*requires SYNCHRONIZED*)
@@ -247,7 +260,7 @@
       if tick andalso ! status_ticks = 0 then
         Multithreading.tracing 1 (fn () =>
           let
-            val {ready, pending, running} = Task_Queue.status (! queue);
+            val {ready, pending, running, passive} = Task_Queue.status (! queue);
             val total = length (! workers);
             val active = count_workers Working;
             val waiting = count_workers Waiting;
@@ -255,7 +268,8 @@
             "SCHEDULE " ^ Time.toString now ^ ": " ^
               string_of_int ready ^ " ready, " ^
               string_of_int pending ^ " pending, " ^
-              string_of_int running ^ " running; " ^
+              string_of_int running ^ " running, " ^
+              string_of_int passive ^ " passive; " ^
               string_of_int total ^ " workers, " ^
               string_of_int active ^ " active, " ^
               string_of_int waiting ^ " waiting "
@@ -329,7 +343,7 @@
 
     (* shutdown *)
 
-    val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
+    val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
     val continue = not (! do_shutdown andalso null (! workers));
     val _ = if continue then () else scheduler := NONE;
 
@@ -364,8 +378,8 @@
   let
     val group =
       (case opt_group of
-        SOME group => group
-      | NONE => Task_Queue.new_group (worker_group ()));
+        NONE => new_group ()
+      | SOME group => group);
     val (result, job) = future_job group e;
     val task = SYNCHRONIZED "enqueue" (fn () =>
       let
@@ -374,7 +388,7 @@
         val _ = if minimal then signal work_available else ();
         val _ = scheduler_check ();
       in task end);
-  in Future {task = task, group = group, result = result} end;
+  in Future {promised = false, task = task, group = group, result = result} end;
 
 fun fork_group group e = fork_future (SOME group) [] 0 e;
 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
@@ -432,7 +446,14 @@
 fun join x = Exn.release (join_result x);
 
 
-(* map *)
+(* fast-path versions -- bypassing full task management *)
+
+fun value (x: 'a) =
+  let
+    val group = Task_Queue.new_group NONE;
+    val result = Synchronized.var "value" NONE : 'a Exn.result option Synchronized.var;
+    val _ = assign_result group result (Exn.Result x);
+  in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
 
 fun map_future f x =
   let
@@ -445,11 +466,32 @@
         SOME queue' => (queue := queue'; true)
       | NONE => false));
   in
-    if extended then Future {task = task, group = group, result = result}
+    if extended then Future {promised = false, task = task, group = group, result = result}
     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   end;
 
 
+(* promised futures -- fulfilled by external means *)
+
+fun promise_group group : 'a future =
+  let
+    val result = Synchronized.var "promise" (NONE: 'a Exn.result option);
+    val task = SYNCHRONIZED "enqueue" (fn () =>
+      Unsynchronized.change_result queue (Task_Queue.enqueue_passive group));
+  in Future {promised = true, task = task, group = group, result = result} end;
+
+fun promise () = promise_group (new_group ());
+
+fun fulfill_result (Future {promised, task, group, result}) res =
+  let
+    val _ = promised orelse raise Fail "Not a promised future";
+    fun job ok = assign_result group result (if ok then res else Exn.Exn Exn.Interrupt);
+    val _ = execute (task, group, [job]);
+  in () end;
+
+fun fulfill x res = fulfill_result x (Exn.Result res);
+
+
 (* cancellation *)
 
 fun interruptible_task f x =
@@ -472,7 +514,7 @@
   if Multithreading.available then
     SYNCHRONIZED "shutdown" (fn () =>
      while scheduler_active () do
-      (wait scheduler_event; broadcast_work ()))
+      (raw_wait scheduler_event; broadcast_work ()))
   else ();