src/Pure/Concurrent/future.scala
changeset 56673 42bf8fffdf6a
parent 56663 2d09b437c168
child 56674 70cc1164fb83
equal deleted inserted replaced
56672:172ae876de9f 56673:42bf8fffdf6a
     1 /*  Title:      Pure/Concurrent/future.scala
     1 /*  Title:      Pure/Concurrent/future.scala
     2     Module:     PIDE
     2     Module:     PIDE
     3     Author:     Makarius
     3     Author:     Makarius
     4 
     4 
     5 Future values.
     5 Value-oriented parallelism via futures and promises in Scala -- with
     6 
     6 signatures as in Isabelle/ML.
     7 Notable differences to scala.actors.Future (as of Scala 2.7.7):
       
     8 
       
     9   * We capture/release exceptions as in the ML variant.
       
    10 
       
    11   * Future.join works for *any* actor, not just the one of the
       
    12     original fork.
       
    13 */
     7 */
    14 
     8 
    15 package isabelle
     9 package isabelle
    16 
    10 
    17 
    11 
    18 import scala.actors.Actor._
    12 import scala.util.{Success, Failure}
       
    13 import scala.concurrent.{Future => Scala_Future, Promise => Scala_Promise, Await}
       
    14 import scala.concurrent.duration.Duration
       
    15 import scala.concurrent.ExecutionContext.Implicits.global
    19 
    16 
    20 
    17 
    21 object Future
    18 object Future
    22 {
    19 {
    23   def value[A](x: A): Future[A] = new Finished_Future(x)
    20   def value[A](x: A): Future[A] = new Finished_Future(x)
    24   def fork[A](body: => A): Future[A] = new Pending_Future(body)
    21   def fork[A](body: => A): Future[A] = new Pending_Future(Scala_Future[A](body))
    25   def promise[A]: Promise[A] = new Promise_Future
    22   def promise[A]: Promise[A] = new Promise_Future[A](Scala_Promise[A])
    26 }
    23 }
    27 
    24 
    28 abstract class Future[A]
    25 trait Future[A]
    29 {
    26 {
    30   def peek: Option[Exn.Result[A]]
    27   def peek: Option[Exn.Result[A]]
    31   def is_finished: Boolean = peek.isDefined
    28   def is_finished: Boolean = peek.isDefined
    32   def get_finished: A = { require(is_finished); Exn.release(peek.get) }
    29   def get_finished: A = { require(is_finished); Exn.release(peek.get) }
    33   def join: A
    30   def join: A
    39       case Some(Exn.Exn(_)) => "<failed>"
    36       case Some(Exn.Exn(_)) => "<failed>"
    40       case Some(Exn.Res(x)) => x.toString
    37       case Some(Exn.Res(x)) => x.toString
    41     }
    38     }
    42 }
    39 }
    43 
    40 
    44 abstract class Promise[A] extends Future[A]
    41 trait Promise[A] extends Future[A]
    45 {
    42 {
    46   def fulfill_result(res: Exn.Result[A]): Unit
    43   def fulfill_result(res: Exn.Result[A]): Unit
    47   def fulfill(x: A) { fulfill_result(Exn.Res(x)) }
    44   def fulfill(x: A) { fulfill_result(Exn.Res(x)) }
    48 }
    45 }
       
    46 
    49 
    47 
    50 private class Finished_Future[A](x: A) extends Future[A]
    48 private class Finished_Future[A](x: A) extends Future[A]
    51 {
    49 {
    52   val peek: Option[Exn.Result[A]] = Some(Exn.Res(x))
    50   val peek: Option[Exn.Result[A]] = Some(Exn.Res(x))
    53   val join: A = x
    51   val join: A = x
    54 }
    52 }
    55 
    53 
    56 private class Pending_Future[A](body: => A) extends Future[A]
    54 private class Pending_Future[A](future: Scala_Future[A]) extends Future[A]
    57 {
    55 {
    58   @volatile private var result: Option[Exn.Result[A]] = None
    56   def peek: Option[Exn.Result[A]] =
       
    57     future.value match {
       
    58       case Some(Success(x)) => Some(Exn.Res(x))
       
    59       case Some(Failure(e)) => Some(Exn.Exn(e))
       
    60       case None => None
       
    61     }
       
    62   override def is_finished: Boolean = future.isCompleted
    59 
    63 
    60   private val evaluator = actor {
    64   def join: A = Await.result(future, Duration.Inf)
    61     result = Some(Exn.capture(body))
       
    62     loop { react { case _ => reply(result.get) } }
       
    63   }
       
    64 
    65 
    65   def peek: Option[Exn.Result[A]] = result
    66   override def map[B](f: A => B): Future[B] = new Pending_Future[B](future.map(f))
    66 
       
    67   def join: A =
       
    68     Exn.release {
       
    69       peek match {
       
    70         case Some(res) => res
       
    71         case None => (evaluator !? (())).asInstanceOf[Exn.Result[A]]
       
    72       }
       
    73     }
       
    74 }
    67 }
    75 
    68 
    76 private class Promise_Future[A] extends Promise[A]
    69 private class Promise_Future[A](promise: Scala_Promise[A])
       
    70   extends Pending_Future(promise.future) with Promise[A]
    77 {
    71 {
    78   @volatile private var result: Option[Exn.Result[A]] = None
    72   override def is_finished: Boolean = promise.isCompleted
    79 
    73 
    80   private case object Read
    74   override def fulfill_result(res: Exn.Result[A]): Unit =
    81   private case class Write(res: Exn.Result[A])
    75     res match {
    82 
    76       case Exn.Res(x) => promise.success(x)
    83   private val receiver = actor {
    77       case Exn.Exn(e) => promise.failure(e)
    84     loop {
       
    85       react {
       
    86         case Read if result.isDefined => reply(result.get)
       
    87         case Write(res) =>
       
    88           if (result.isDefined) reply(false)
       
    89           else { result = Some(res); reply(true) }
       
    90       }
       
    91     }
       
    92   }
       
    93 
       
    94   def peek: Option[Exn.Result[A]] = result
       
    95 
       
    96   def join: A =
       
    97     Exn.release {
       
    98       result match {
       
    99         case Some(res) => res
       
   100         case None => (receiver !? Read).asInstanceOf[Exn.Result[A]]
       
   101       }
       
   102     }
    78     }
   103 
    79 
   104   def fulfill_result(res: Exn.Result[A]) {
    80   override def fulfill(x: A): Unit = promise.success(x)
   105     receiver !? Write(res) match {
       
   106       case false => error("Duplicate fulfillment of promise")
       
   107       case _ =>
       
   108     }
       
   109   }
       
   110 }
    81 }
   111 
    82