diff -r 172ae876de9f -r 42bf8fffdf6a src/Pure/Concurrent/future.scala --- a/src/Pure/Concurrent/future.scala Wed Apr 23 14:16:08 2014 +0200 +++ b/src/Pure/Concurrent/future.scala Wed Apr 23 15:22:48 2014 +0200 @@ -2,30 +2,27 @@ Module: PIDE Author: Makarius -Future values. - -Notable differences to scala.actors.Future (as of Scala 2.7.7): - - * We capture/release exceptions as in the ML variant. - - * Future.join works for *any* actor, not just the one of the - original fork. +Value-oriented parallelism via futures and promises in Scala -- with +signatures as in Isabelle/ML. */ package isabelle -import scala.actors.Actor._ +import scala.util.{Success, Failure} +import scala.concurrent.{Future => Scala_Future, Promise => Scala_Promise, Await} +import scala.concurrent.duration.Duration +import scala.concurrent.ExecutionContext.Implicits.global object Future { def value[A](x: A): Future[A] = new Finished_Future(x) - def fork[A](body: => A): Future[A] = new Pending_Future(body) - def promise[A]: Promise[A] = new Promise_Future + def fork[A](body: => A): Future[A] = new Pending_Future(Scala_Future[A](body)) + def promise[A]: Promise[A] = new Promise_Future[A](Scala_Promise[A]) } -abstract class Future[A] +trait Future[A] { def peek: Option[Exn.Result[A]] def is_finished: Boolean = peek.isDefined @@ -41,71 +38,45 @@ } } -abstract class Promise[A] extends Future[A] +trait Promise[A] extends Future[A] { def fulfill_result(res: Exn.Result[A]): Unit def fulfill(x: A) { fulfill_result(Exn.Res(x)) } } + private class Finished_Future[A](x: A) extends Future[A] { val peek: Option[Exn.Result[A]] = Some(Exn.Res(x)) val join: A = x } -private class Pending_Future[A](body: => A) extends Future[A] +private class Pending_Future[A](future: Scala_Future[A]) extends Future[A] { - @volatile private var result: Option[Exn.Result[A]] = None - - private val evaluator = actor { - result = Some(Exn.capture(body)) - loop { react { case _ => reply(result.get) } } - } + def peek: Option[Exn.Result[A]] = + future.value match { + case Some(Success(x)) => Some(Exn.Res(x)) + case Some(Failure(e)) => Some(Exn.Exn(e)) + case None => None + } + override def is_finished: Boolean = future.isCompleted - def peek: Option[Exn.Result[A]] = result + def join: A = Await.result(future, Duration.Inf) - def join: A = - Exn.release { - peek match { - case Some(res) => res - case None => (evaluator !? (())).asInstanceOf[Exn.Result[A]] - } - } + override def map[B](f: A => B): Future[B] = new Pending_Future[B](future.map(f)) } -private class Promise_Future[A] extends Promise[A] +private class Promise_Future[A](promise: Scala_Promise[A]) + extends Pending_Future(promise.future) with Promise[A] { - @volatile private var result: Option[Exn.Result[A]] = None - - private case object Read - private case class Write(res: Exn.Result[A]) + override def is_finished: Boolean = promise.isCompleted - private val receiver = actor { - loop { - react { - case Read if result.isDefined => reply(result.get) - case Write(res) => - if (result.isDefined) reply(false) - else { result = Some(res); reply(true) } - } - } - } - - def peek: Option[Exn.Result[A]] = result - - def join: A = - Exn.release { - result match { - case Some(res) => res - case None => (receiver !? Read).asInstanceOf[Exn.Result[A]] - } + override def fulfill_result(res: Exn.Result[A]): Unit = + res match { + case Exn.Res(x) => promise.success(x) + case Exn.Exn(e) => promise.failure(e) } - def fulfill_result(res: Exn.Result[A]) { - receiver !? Write(res) match { - case false => error("Duplicate fulfillment of promise") - case _ => - } - } + override def fulfill(x: A): Unit = promise.success(x) }