--- a/src/Pure/Concurrent/future.scala Wed Apr 23 14:16:08 2014 +0200
+++ b/src/Pure/Concurrent/future.scala Wed Apr 23 15:22:48 2014 +0200
@@ -2,30 +2,27 @@
Module: PIDE
Author: Makarius
-Future values.
-
-Notable differences to scala.actors.Future (as of Scala 2.7.7):
-
- * We capture/release exceptions as in the ML variant.
-
- * Future.join works for *any* actor, not just the one of the
- original fork.
+Value-oriented parallelism via futures and promises in Scala -- with
+signatures as in Isabelle/ML.
*/
package isabelle
-import scala.actors.Actor._
+import scala.util.{Success, Failure}
+import scala.concurrent.{Future => Scala_Future, Promise => Scala_Promise, Await}
+import scala.concurrent.duration.Duration
+import scala.concurrent.ExecutionContext.Implicits.global
object Future
{
def value[A](x: A): Future[A] = new Finished_Future(x)
- def fork[A](body: => A): Future[A] = new Pending_Future(body)
- def promise[A]: Promise[A] = new Promise_Future
+ def fork[A](body: => A): Future[A] = new Pending_Future(Scala_Future[A](body))
+ def promise[A]: Promise[A] = new Promise_Future[A](Scala_Promise[A])
}
-abstract class Future[A]
+trait Future[A]
{
def peek: Option[Exn.Result[A]]
def is_finished: Boolean = peek.isDefined
@@ -41,71 +38,45 @@
}
}
-abstract class Promise[A] extends Future[A]
+trait Promise[A] extends Future[A]
{
def fulfill_result(res: Exn.Result[A]): Unit
def fulfill(x: A) { fulfill_result(Exn.Res(x)) }
}
+
private class Finished_Future[A](x: A) extends Future[A]
{
val peek: Option[Exn.Result[A]] = Some(Exn.Res(x))
val join: A = x
}
-private class Pending_Future[A](body: => A) extends Future[A]
+private class Pending_Future[A](future: Scala_Future[A]) extends Future[A]
{
- @volatile private var result: Option[Exn.Result[A]] = None
-
- private val evaluator = actor {
- result = Some(Exn.capture(body))
- loop { react { case _ => reply(result.get) } }
- }
+ def peek: Option[Exn.Result[A]] =
+ future.value match {
+ case Some(Success(x)) => Some(Exn.Res(x))
+ case Some(Failure(e)) => Some(Exn.Exn(e))
+ case None => None
+ }
+ override def is_finished: Boolean = future.isCompleted
- def peek: Option[Exn.Result[A]] = result
+ def join: A = Await.result(future, Duration.Inf)
- def join: A =
- Exn.release {
- peek match {
- case Some(res) => res
- case None => (evaluator !? (())).asInstanceOf[Exn.Result[A]]
- }
- }
+ override def map[B](f: A => B): Future[B] = new Pending_Future[B](future.map(f))
}
-private class Promise_Future[A] extends Promise[A]
+private class Promise_Future[A](promise: Scala_Promise[A])
+ extends Pending_Future(promise.future) with Promise[A]
{
- @volatile private var result: Option[Exn.Result[A]] = None
-
- private case object Read
- private case class Write(res: Exn.Result[A])
+ override def is_finished: Boolean = promise.isCompleted
- private val receiver = actor {
- loop {
- react {
- case Read if result.isDefined => reply(result.get)
- case Write(res) =>
- if (result.isDefined) reply(false)
- else { result = Some(res); reply(true) }
- }
- }
- }
-
- def peek: Option[Exn.Result[A]] = result
-
- def join: A =
- Exn.release {
- result match {
- case Some(res) => res
- case None => (receiver !? Read).asInstanceOf[Exn.Result[A]]
- }
+ override def fulfill_result(res: Exn.Result[A]): Unit =
+ res match {
+ case Exn.Res(x) => promise.success(x)
+ case Exn.Exn(e) => promise.failure(e)
}
- def fulfill_result(res: Exn.Result[A]) {
- receiver !? Write(res) match {
- case false => error("Duplicate fulfillment of promise")
- case _ =>
- }
- }
+ override def fulfill(x: A): Unit = promise.success(x)
}