force_result within the current execution context -- avoids overhead of potential thread context switch and robustifies Interrupt handling;
recovered some similarity to sequential version;
--- a/src/Pure/Concurrent/lazy.ML Sat May 29 15:31:15 2010 +0200
+++ b/src/Pure/Concurrent/lazy.ML Sat May 29 15:52:47 2010 +0200
@@ -23,46 +23,49 @@
datatype 'a expr =
Expr of unit -> 'a |
- Result of 'a;
+ Result of 'a future;
-abstype 'a lazy = Lazy of 'a expr future Synchronized.var
+abstype 'a lazy = Lazy of 'a expr Synchronized.var
with
fun peek (Lazy var) =
- (case Future.peek (Synchronized.value var) of
- NONE => NONE
- | SOME (Exn.Result (Expr _)) => NONE
- | SOME (Exn.Result (Result x)) => SOME (Exn.Result x)
- | SOME (Exn.Exn exn) => SOME (Exn.Exn exn));
+ (case Synchronized.value var of
+ Expr _ => NONE
+ | Result res => Future.peek res);
-fun lazy e = Lazy (Synchronized.var "lazy" (Future.value (Expr e)));
-fun value a = Lazy (Synchronized.var "lazy" (Future.value (Result a)));
+fun lazy e = Lazy (Synchronized.var "lazy" (Expr e));
+fun value a = Lazy (Synchronized.var "lazy" (Result (Future.value a)));
(* force result *)
-fun fork e =
- let val x = Future.fork (fn () => Result (e ()) handle Exn.Interrupt => Expr e)
- in (x, x) end;
-
fun force_result (Lazy var) =
(case peek (Lazy var) of
SOME res => res
| NONE =>
- let
- val result =
- Synchronized.change_result var
- (fn x =>
- (case Future.peek x of
- SOME (Exn.Result (Expr e)) => fork e
- | _ => (x, x)))
- |> Future.join_result;
- in
- case result of
- Exn.Result (Expr _) => Exn.Exn Exn.Interrupt
- | Exn.Result (Result x) => Exn.Result x
- | Exn.Exn exn => Exn.Exn exn
- end);
+ uninterruptible (fn restore_interrupts => fn () =>
+ let
+ val (expr, x) =
+ Synchronized.change_result var
+ (fn Expr e =>
+ let val x = Future.promise ()
+ in ((SOME e, x), Result x) end
+ | Result x => ((NONE, x), Result x));
+ in
+ (case expr of
+ SOME e =>
+ let
+ val res = restore_interrupts (fn () => Exn.capture e ()) ();
+ val _ = Future.fulfill_result x res;
+ (*semantic race: some other threads might see the same
+ Interrupt, until there is a fresh start*)
+ val _ =
+ (case res of
+ Exn.Exn Exn.Interrupt => Synchronized.change var (fn _ => Expr e)
+ | _ => ());
+ in res end
+ | NONE => restore_interrupts (fn () => Future.join_result x) ())
+ end) ());
fun force r = Exn.release (force_result r);