src/Pure/Concurrent/future.scala
changeset 61559 313eca3fa847
parent 61556 0d4ee4168e41
child 61563 91c3aedbfc5e
equal deleted inserted replaced
61558:68b86028e02a 61559:313eca3fa847
     1 /*  Title:      Pure/Concurrent/future.scala
     1 /*  Title:      Pure/Concurrent/future.scala
     2     Module:     PIDE
     2     Module:     PIDE
     3     Author:     Makarius
     3     Author:     Makarius
     4 
     4 
     5 Value-oriented parallel execution via futures and promises in Scala -- with
     5 Value-oriented parallel execution via futures and promises.
     6 signatures as in Isabelle/ML.
       
     7 */
     6 */
     8 
     7 
     9 package isabelle
     8 package isabelle
    10 
     9 
    11 
    10 
    12 import scala.util.{Success, Failure}
    11 /* futures and promises */
    13 import scala.concurrent.{ExecutionContext, ExecutionContextExecutor,
       
    14   Future => Scala_Future, Promise => Scala_Promise, Await}
       
    15 import scala.concurrent.duration.Duration
       
    16 
       
    17 
    12 
    18 object Future
    13 object Future
    19 {
    14 {
    20   lazy val execution_context: ExecutionContextExecutor =
    15   def value[A](x: A): Future[A] = new Value_Future(x)
    21     ExecutionContext.fromExecutorService(Standard_Thread.default_pool)
    16   def fork[A](body: => A): Future[A] = new Task_Future[A](body)
    22 
    17   def promise[A]: Promise[A] = new Promise_Future[A]
    23   def value[A](x: A): Future[A] = new Finished_Future(x)
    18   def thread[A](name: String = "", daemon: Boolean = false)(body: => A): Future[A] =
    24 
    19     new Thread_Future[A](name, daemon, body)
    25   def fork[A](body: => A): Future[A] =
       
    26     new Pending_Future(Scala_Future[A](body)(execution_context))
       
    27 
       
    28   def promise[A]: Promise[A] =
       
    29     new Promise_Future[A](Scala_Promise[A]())
       
    30 }
    20 }
    31 
    21 
    32 trait Future[A]
    22 trait Future[A]
    33 {
    23 {
    34   def peek: Option[Exn.Result[A]]
    24   def peek: Option[Exn.Result[A]]
    35   def is_finished: Boolean = peek.isDefined
    25   def is_finished: Boolean = peek.isDefined
    36   def get_finished: A = { require(is_finished); Exn.release(peek.get) }
    26   def get_finished: A = { require(is_finished); Exn.release(peek.get) }
    37   def join: A
    27   def join_result: Exn.Result[A]
       
    28   def join: A = Exn.release(join_result)
    38   def map[B](f: A => B): Future[B] = Future.fork { f(join) }
    29   def map[B](f: A => B): Future[B] = Future.fork { f(join) }
       
    30   def cancel: Unit
    39 
    31 
    40   override def toString: String =
    32   override def toString: String =
    41     peek match {
    33     peek match {
    42       case None => "<future>"
    34       case None => "<future>"
    43       case Some(Exn.Exn(_)) => "<failed>"
    35       case Some(Exn.Exn(_)) => "<failed>"
    45     }
    37     }
    46 }
    38 }
    47 
    39 
    48 trait Promise[A] extends Future[A]
    40 trait Promise[A] extends Future[A]
    49 {
    41 {
    50   def cancel: Unit
       
    51   def fulfill_result(res: Exn.Result[A]): Unit
    42   def fulfill_result(res: Exn.Result[A]): Unit
    52   def fulfill(x: A): Unit
    43   def fulfill(x: A): Unit
    53 }
    44 }
    54 
    45 
    55 
    46 
    56 private class Finished_Future[A](x: A) extends Future[A]
    47 /* value future */
       
    48 
       
    49 private class Value_Future[A](x: A) extends Future[A]
    57 {
    50 {
    58   val peek: Option[Exn.Result[A]] = Some(Exn.Res(x))
    51   val peek: Option[Exn.Result[A]] = Some(Exn.Res(x))
    59   val join: A = x
    52   def join_result: Exn.Result[A] = peek.get
       
    53   def cancel {}
    60 }
    54 }
    61 
    55 
    62 private class Pending_Future[A](future: Scala_Future[A]) extends Future[A]
    56 
       
    57 /* task future via thread pool */
       
    58 
       
    59 private class Task_Future[A](body: => A) extends Future[A]
    63 {
    60 {
       
    61   private sealed abstract class Status
       
    62   private case object Ready extends Status
       
    63   private case class Running(thread: Thread) extends Status
       
    64   private case object Terminated extends Status
       
    65   private case class Finished(result: Exn.Result[A]) extends Status
       
    66 
       
    67   private val status = Synchronized[Status](Ready)
       
    68 
    64   def peek: Option[Exn.Result[A]] =
    69   def peek: Option[Exn.Result[A]] =
    65     future.value match {
    70     status.value match {
    66       case Some(Success(x)) => Some(Exn.Res(x))
    71       case Finished(result) => Some(result)
    67       case Some(Failure(e)) => Some(Exn.Exn(e))
    72       case _ => None
    68       case None => None
       
    69     }
    73     }
    70   override def is_finished: Boolean = future.isCompleted
       
    71 
    74 
    72   def join: A = Await.result(future, Duration.Inf)
    75   private def try_run()
    73   override def map[B](f: A => B): Future[B] =
    76   {
    74     new Pending_Future[B](future.map(f)(Future.execution_context))
    77     val do_run =
       
    78       status.change_result {
       
    79         case Ready => (true, Running(Thread.currentThread))
       
    80         case st => (false, st)
       
    81       }
       
    82     if (do_run) {
       
    83       val result = Exn.capture(body)
       
    84       status.change(_ => Terminated)
       
    85       status.change(_ => Finished(if (Thread.interrupted) Exn.Exn(Exn.Interrupt()) else result))
       
    86     }
       
    87   }
       
    88   private val task = Standard_Thread.submit_task { try_run() }
       
    89 
       
    90   def join_result: Exn.Result[A] =
       
    91   {
       
    92     try_run()
       
    93     status.guarded_access {
       
    94       case st @ Finished(result) => Some((result, st))
       
    95       case _ => None
       
    96     }
       
    97   }
       
    98 
       
    99   def cancel =
       
   100   {
       
   101     status.change {
       
   102       case Ready => task.cancel(false); Finished(Exn.Exn(Exn.Interrupt()))
       
   103       case st @ Running(thread) => thread.interrupt; st
       
   104       case st => st
       
   105     }
       
   106   }
    75 }
   107 }
    76 
   108 
    77 private class Promise_Future[A](promise: Scala_Promise[A])
   109 
    78   extends Pending_Future(promise.future) with Promise[A]
   110 /* promise future */
       
   111 
       
   112 private class Promise_Future[A] extends Promise[A]
    79 {
   113 {
    80   override def is_finished: Boolean = promise.isCompleted
   114   private val state = Synchronized[Option[Exn.Result[A]]](None)
       
   115   def peek: Option[Exn.Result[A]] = state.value
       
   116 
       
   117   def join_result: Exn.Result[A] =
       
   118     state.guarded_access(st => if (st.isEmpty) None else Some((st.get, st)))
       
   119 
       
   120   def fulfill_result(result: Exn.Result[A]): Unit =
       
   121     state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException)
       
   122 
       
   123   def fulfill(x: A): Unit = fulfill_result(Exn.Res(x))
    81 
   124 
    82   def cancel: Unit =
   125   def cancel: Unit =
    83     try { fulfill_result(Exn.Exn(Exn.Interrupt())) }
   126     state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st)
    84     catch { case _: IllegalStateException => }
   127 }
    85 
   128 
    86   def fulfill_result(res: Exn.Result[A]): Unit =
   129 
    87     res match {
   130 /* thread future */
    88       case Exn.Res(x) => promise.success(x)
   131 
    89       case Exn.Exn(e) => promise.failure(e)
   132 private class Thread_Future[A](name: String, daemon: Boolean, body: => A) extends Future[A]
    90     }
   133 {
    91   def fulfill(x: A): Unit = promise.success(x)
   134   private val result = Future.promise[A]
       
   135   private val thread =
       
   136     Standard_Thread.fork(name, daemon) { result.fulfill_result(Exn.capture(body)) }
       
   137 
       
   138   def peek: Option[Exn.Result[A]] = result.peek
       
   139   def join_result: Exn.Result[A] = result.join_result
       
   140   def cancel: Unit = thread.interrupt
    92 }
   141 }