modernized Future/Promise implementation, bypassing old actors;
authorwenzelm
Wed, 23 Apr 2014 15:22:48 +0200
changeset 56673 42bf8fffdf6a
parent 56672 172ae876de9f
child 56674 70cc1164fb83
modernized Future/Promise implementation, bypassing old actors;
src/Pure/Concurrent/future.scala
--- a/src/Pure/Concurrent/future.scala	Wed Apr 23 14:16:08 2014 +0200
+++ b/src/Pure/Concurrent/future.scala	Wed Apr 23 15:22:48 2014 +0200
@@ -2,30 +2,27 @@
     Module:     PIDE
     Author:     Makarius
 
-Future values.
-
-Notable differences to scala.actors.Future (as of Scala 2.7.7):
-
-  * We capture/release exceptions as in the ML variant.
-
-  * Future.join works for *any* actor, not just the one of the
-    original fork.
+Value-oriented parallelism via futures and promises in Scala -- with
+signatures as in Isabelle/ML.
 */
 
 package isabelle
 
 
-import scala.actors.Actor._
+import scala.util.{Success, Failure}
+import scala.concurrent.{Future => Scala_Future, Promise => Scala_Promise, Await}
+import scala.concurrent.duration.Duration
+import scala.concurrent.ExecutionContext.Implicits.global
 
 
 object Future
 {
   def value[A](x: A): Future[A] = new Finished_Future(x)
-  def fork[A](body: => A): Future[A] = new Pending_Future(body)
-  def promise[A]: Promise[A] = new Promise_Future
+  def fork[A](body: => A): Future[A] = new Pending_Future(Scala_Future[A](body))
+  def promise[A]: Promise[A] = new Promise_Future[A](Scala_Promise[A])
 }
 
-abstract class Future[A]
+trait Future[A]
 {
   def peek: Option[Exn.Result[A]]
   def is_finished: Boolean = peek.isDefined
@@ -41,71 +38,45 @@
     }
 }
 
-abstract class Promise[A] extends Future[A]
+trait Promise[A] extends Future[A]
 {
   def fulfill_result(res: Exn.Result[A]): Unit
   def fulfill(x: A) { fulfill_result(Exn.Res(x)) }
 }
 
+
 private class Finished_Future[A](x: A) extends Future[A]
 {
   val peek: Option[Exn.Result[A]] = Some(Exn.Res(x))
   val join: A = x
 }
 
-private class Pending_Future[A](body: => A) extends Future[A]
+private class Pending_Future[A](future: Scala_Future[A]) extends Future[A]
 {
-  @volatile private var result: Option[Exn.Result[A]] = None
-
-  private val evaluator = actor {
-    result = Some(Exn.capture(body))
-    loop { react { case _ => reply(result.get) } }
-  }
+  def peek: Option[Exn.Result[A]] =
+    future.value match {
+      case Some(Success(x)) => Some(Exn.Res(x))
+      case Some(Failure(e)) => Some(Exn.Exn(e))
+      case None => None
+    }
+  override def is_finished: Boolean = future.isCompleted
 
-  def peek: Option[Exn.Result[A]] = result
+  def join: A = Await.result(future, Duration.Inf)
 
-  def join: A =
-    Exn.release {
-      peek match {
-        case Some(res) => res
-        case None => (evaluator !? (())).asInstanceOf[Exn.Result[A]]
-      }
-    }
+  override def map[B](f: A => B): Future[B] = new Pending_Future[B](future.map(f))
 }
 
-private class Promise_Future[A] extends Promise[A]
+private class Promise_Future[A](promise: Scala_Promise[A])
+  extends Pending_Future(promise.future) with Promise[A]
 {
-  @volatile private var result: Option[Exn.Result[A]] = None
-
-  private case object Read
-  private case class Write(res: Exn.Result[A])
+  override def is_finished: Boolean = promise.isCompleted
 
-  private val receiver = actor {
-    loop {
-      react {
-        case Read if result.isDefined => reply(result.get)
-        case Write(res) =>
-          if (result.isDefined) reply(false)
-          else { result = Some(res); reply(true) }
-      }
-    }
-  }
-
-  def peek: Option[Exn.Result[A]] = result
-
-  def join: A =
-    Exn.release {
-      result match {
-        case Some(res) => res
-        case None => (receiver !? Read).asInstanceOf[Exn.Result[A]]
-      }
+  override def fulfill_result(res: Exn.Result[A]): Unit =
+    res match {
+      case Exn.Res(x) => promise.success(x)
+      case Exn.Exn(e) => promise.failure(e)
     }
 
-  def fulfill_result(res: Exn.Result[A]) {
-    receiver !? Write(res) match {
-      case false => error("Duplicate fulfillment of promise")
-      case _ =>
-    }
-  }
+  override def fulfill(x: A): Unit = promise.success(x)
 }