src/Pure/Concurrent/lazy.ML
author wenzelm
Sun, 28 Dec 2014 22:10:09 +0100
changeset 59195 f8588372d70e
parent 59193 59f1591a11cb
child 59348 8a6788917b32
permissions -rw-r--r--
back to full synchronization (cf. eb3e399f5b9f);

(*  Title:      Pure/Concurrent/lazy.ML
    Author:     Makarius

Lazy evaluation with memoing of results and regular exceptions.
Parallel version based on (passive) futures, to avoid critical or
multiple evaluation (unless interrupted).
*)

signature LAZY =
sig
  type 'a lazy
  val lazy: (unit -> 'a) -> 'a lazy
  val value: 'a -> 'a lazy
  val peek: 'a lazy -> 'a Exn.result option
  val is_running: 'a lazy -> bool
  val is_finished: 'a lazy -> bool
  val finished_result: 'a lazy -> 'a Exn.result option
  val force_result: 'a lazy -> 'a Exn.result
  val force: 'a lazy -> 'a
  val map: ('a -> 'b) -> 'a lazy -> 'b lazy
  val future: Future.params -> 'a lazy -> 'a future
end;

structure Lazy: LAZY =
struct

(* datatype *)

datatype 'a expr =
  Expr of unit -> 'a |
  Result of 'a future;

abstype 'a lazy = Lazy of 'a expr Synchronized.var
with

fun lazy e = Lazy (Synchronized.var "lazy" (Expr e));
fun value a = Lazy (Synchronized.var "lazy" (Result (Future.value a)));

fun peek (Lazy var) =
  (case Synchronized.value var of
    Expr _ => NONE
  | Result res => Future.peek res);


(* status *)

fun is_future pred (Lazy var) =
  (case Synchronized.value var of
    Expr _ => false
  | Result res => pred res);

fun is_running x = is_future (not o Future.is_finished) x;

fun is_finished x =
  is_future (fn res =>
    Future.is_finished res andalso not (Exn.is_interrupt_exn (Future.join_result res))) x;

fun finished_result (Lazy var) =
  (case Synchronized.value var of
    Expr _ => NONE
  | Result res =>
      if Future.is_finished res then
        let val result = Future.join_result res
        in if Exn.is_interrupt_exn result then NONE else SOME result end
      else NONE);


(* force result *)

fun force_result (Lazy var) =
  (case peek (Lazy var) of
    SOME res => res
  | NONE =>
      uninterruptible (fn restore_attributes => fn () =>
        let
          val (expr, x) =
            Synchronized.change_result var
              (fn Expr e =>
                    let val x = Future.promise I
                    in ((SOME e, x), Result x) end
                | Result x => ((NONE, x), Result x));
        in
          (case expr of
            SOME e =>
              let
                val res0 = Exn.capture (restore_attributes e) ();
                val _ = Exn.capture (fn () => Future.fulfill_result x res0) ();
                val res = Future.join_result x;
                (*semantic race: some other threads might see the same
                  interrupt, until there is a fresh start*)
                val _ =
                  if Exn.is_interrupt_exn res then
                    Synchronized.change var (fn _ => Expr e)
                  else ();
              in res end
          | NONE => Exn.capture (restore_attributes (fn () => Future.join x)) ())
        end) ());


end;

fun force r = Exn.release (force_result r);
fun map f x = lazy (fn () => f (force x));


(* future evaluation *)

fun future params x =
  if is_finished x then Future.value_result (force_result x)
  else (singleton o Future.forks) params (fn () => force x);

end;

type 'a lazy = 'a Lazy.lazy;