added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
authorwenzelm
Mon, 04 Jan 2010 18:54:22 +0100
changeset 34240 3274571e45c1
parent 34239 e18b0f7b9902
child 34241 8611f1813fc9
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
src/Pure/Concurrent/future.scala
--- a/src/Pure/Concurrent/future.scala	Mon Jan 04 15:35:53 2010 +0100
+++ b/src/Pure/Concurrent/future.scala	Mon Jan 04 18:54:22 2010 +0100
@@ -21,6 +21,7 @@
 {
   def value[A](x: A): Future[A] = new Finished_Future(x)
   def fork[A](body: => A): Future[A] = new Pending_Future(body)
+  def promise[A]: Promise[A] = new Promise_Future
 }
 
 abstract class Future[A]
@@ -38,6 +39,11 @@
     }
 }
 
+abstract class Promise[A] extends Future[A]
+{
+  def fulfill(x: A): Unit
+}
+
 private class Finished_Future[A](x: A) extends Future[A]
 {
   val peek: Option[Exn.Result[A]] = Some(Exn.Res(x))
@@ -64,4 +70,37 @@
     }
 }
 
+private class Promise_Future[A] extends Promise[A]
+{
+  @volatile private var result: Option[A] = None
 
+  private case object Read
+  private case class Write(x: A)
+
+  private val receiver = actor {
+    loop {
+      react {
+        case Read if result.isDefined => reply(result.get)
+        case Write(x) =>
+          if (result.isDefined) reply(false)
+          else { result = Some(x); reply(true) }
+      }
+    }
+  }
+
+  def peek: Option[Exn.Result[A]] = result.map(Exn.Res(_))
+
+  def join: A =
+    result match {
+      case Some(res) => res
+      case None => (receiver !? Read).asInstanceOf[A]
+    }
+
+  def fulfill(x: A) {
+    receiver !? Write(x) match {
+      case false => error("Duplicate fulfillment of promise")
+      case _ =>
+    }
+  }
+}
+