src/Pure/Concurrent/future.scala
changeset 61559 313eca3fa847
parent 61556 0d4ee4168e41
child 61563 91c3aedbfc5e
     1.1 --- a/src/Pure/Concurrent/future.scala	Tue Nov 03 16:35:00 2015 +0100
     1.2 +++ b/src/Pure/Concurrent/future.scala	Tue Nov 03 16:35:38 2015 +0100
     1.3 @@ -2,31 +2,21 @@
     1.4      Module:     PIDE
     1.5      Author:     Makarius
     1.6  
     1.7 -Value-oriented parallel execution via futures and promises in Scala -- with
     1.8 -signatures as in Isabelle/ML.
     1.9 +Value-oriented parallel execution via futures and promises.
    1.10  */
    1.11  
    1.12  package isabelle
    1.13  
    1.14  
    1.15 -import scala.util.{Success, Failure}
    1.16 -import scala.concurrent.{ExecutionContext, ExecutionContextExecutor,
    1.17 -  Future => Scala_Future, Promise => Scala_Promise, Await}
    1.18 -import scala.concurrent.duration.Duration
    1.19 -
    1.20 +/* futures and promises */
    1.21  
    1.22  object Future
    1.23  {
    1.24 -  lazy val execution_context: ExecutionContextExecutor =
    1.25 -    ExecutionContext.fromExecutorService(Standard_Thread.default_pool)
    1.26 -
    1.27 -  def value[A](x: A): Future[A] = new Finished_Future(x)
    1.28 -
    1.29 -  def fork[A](body: => A): Future[A] =
    1.30 -    new Pending_Future(Scala_Future[A](body)(execution_context))
    1.31 -
    1.32 -  def promise[A]: Promise[A] =
    1.33 -    new Promise_Future[A](Scala_Promise[A]())
    1.34 +  def value[A](x: A): Future[A] = new Value_Future(x)
    1.35 +  def fork[A](body: => A): Future[A] = new Task_Future[A](body)
    1.36 +  def promise[A]: Promise[A] = new Promise_Future[A]
    1.37 +  def thread[A](name: String = "", daemon: Boolean = false)(body: => A): Future[A] =
    1.38 +    new Thread_Future[A](name, daemon, body)
    1.39  }
    1.40  
    1.41  trait Future[A]
    1.42 @@ -34,8 +24,10 @@
    1.43    def peek: Option[Exn.Result[A]]
    1.44    def is_finished: Boolean = peek.isDefined
    1.45    def get_finished: A = { require(is_finished); Exn.release(peek.get) }
    1.46 -  def join: A
    1.47 +  def join_result: Exn.Result[A]
    1.48 +  def join: A = Exn.release(join_result)
    1.49    def map[B](f: A => B): Future[B] = Future.fork { f(join) }
    1.50 +  def cancel: Unit
    1.51  
    1.52    override def toString: String =
    1.53      peek match {
    1.54 @@ -47,46 +39,103 @@
    1.55  
    1.56  trait Promise[A] extends Future[A]
    1.57  {
    1.58 -  def cancel: Unit
    1.59    def fulfill_result(res: Exn.Result[A]): Unit
    1.60    def fulfill(x: A): Unit
    1.61  }
    1.62  
    1.63  
    1.64 -private class Finished_Future[A](x: A) extends Future[A]
    1.65 +/* value future */
    1.66 +
    1.67 +private class Value_Future[A](x: A) extends Future[A]
    1.68  {
    1.69    val peek: Option[Exn.Result[A]] = Some(Exn.Res(x))
    1.70 -  val join: A = x
    1.71 +  def join_result: Exn.Result[A] = peek.get
    1.72 +  def cancel {}
    1.73  }
    1.74  
    1.75 -private class Pending_Future[A](future: Scala_Future[A]) extends Future[A]
    1.76 +
    1.77 +/* task future via thread pool */
    1.78 +
    1.79 +private class Task_Future[A](body: => A) extends Future[A]
    1.80  {
    1.81 +  private sealed abstract class Status
    1.82 +  private case object Ready extends Status
    1.83 +  private case class Running(thread: Thread) extends Status
    1.84 +  private case object Terminated extends Status
    1.85 +  private case class Finished(result: Exn.Result[A]) extends Status
    1.86 +
    1.87 +  private val status = Synchronized[Status](Ready)
    1.88 +
    1.89    def peek: Option[Exn.Result[A]] =
    1.90 -    future.value match {
    1.91 -      case Some(Success(x)) => Some(Exn.Res(x))
    1.92 -      case Some(Failure(e)) => Some(Exn.Exn(e))
    1.93 -      case None => None
    1.94 +    status.value match {
    1.95 +      case Finished(result) => Some(result)
    1.96 +      case _ => None
    1.97 +    }
    1.98 +
    1.99 +  private def try_run()
   1.100 +  {
   1.101 +    val do_run =
   1.102 +      status.change_result {
   1.103 +        case Ready => (true, Running(Thread.currentThread))
   1.104 +        case st => (false, st)
   1.105 +      }
   1.106 +    if (do_run) {
   1.107 +      val result = Exn.capture(body)
   1.108 +      status.change(_ => Terminated)
   1.109 +      status.change(_ => Finished(if (Thread.interrupted) Exn.Exn(Exn.Interrupt()) else result))
   1.110      }
   1.111 -  override def is_finished: Boolean = future.isCompleted
   1.112 +  }
   1.113 +  private val task = Standard_Thread.submit_task { try_run() }
   1.114  
   1.115 -  def join: A = Await.result(future, Duration.Inf)
   1.116 -  override def map[B](f: A => B): Future[B] =
   1.117 -    new Pending_Future[B](future.map(f)(Future.execution_context))
   1.118 +  def join_result: Exn.Result[A] =
   1.119 +  {
   1.120 +    try_run()
   1.121 +    status.guarded_access {
   1.122 +      case st @ Finished(result) => Some((result, st))
   1.123 +      case _ => None
   1.124 +    }
   1.125 +  }
   1.126 +
   1.127 +  def cancel =
   1.128 +  {
   1.129 +    status.change {
   1.130 +      case Ready => task.cancel(false); Finished(Exn.Exn(Exn.Interrupt()))
   1.131 +      case st @ Running(thread) => thread.interrupt; st
   1.132 +      case st => st
   1.133 +    }
   1.134 +  }
   1.135  }
   1.136  
   1.137 -private class Promise_Future[A](promise: Scala_Promise[A])
   1.138 -  extends Pending_Future(promise.future) with Promise[A]
   1.139 +
   1.140 +/* promise future */
   1.141 +
   1.142 +private class Promise_Future[A] extends Promise[A]
   1.143  {
   1.144 -  override def is_finished: Boolean = promise.isCompleted
   1.145 +  private val state = Synchronized[Option[Exn.Result[A]]](None)
   1.146 +  def peek: Option[Exn.Result[A]] = state.value
   1.147 +
   1.148 +  def join_result: Exn.Result[A] =
   1.149 +    state.guarded_access(st => if (st.isEmpty) None else Some((st.get, st)))
   1.150 +
   1.151 +  def fulfill_result(result: Exn.Result[A]): Unit =
   1.152 +    state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException)
   1.153 +
   1.154 +  def fulfill(x: A): Unit = fulfill_result(Exn.Res(x))
   1.155  
   1.156    def cancel: Unit =
   1.157 -    try { fulfill_result(Exn.Exn(Exn.Interrupt())) }
   1.158 -    catch { case _: IllegalStateException => }
   1.159 +    state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st)
   1.160 +}
   1.161 +
   1.162 +
   1.163 +/* thread future */
   1.164  
   1.165 -  def fulfill_result(res: Exn.Result[A]): Unit =
   1.166 -    res match {
   1.167 -      case Exn.Res(x) => promise.success(x)
   1.168 -      case Exn.Exn(e) => promise.failure(e)
   1.169 -    }
   1.170 -  def fulfill(x: A): Unit = promise.success(x)
   1.171 +private class Thread_Future[A](name: String, daemon: Boolean, body: => A) extends Future[A]
   1.172 +{
   1.173 +  private val result = Future.promise[A]
   1.174 +  private val thread =
   1.175 +    Standard_Thread.fork(name, daemon) { result.fulfill_result(Exn.capture(body)) }
   1.176 +
   1.177 +  def peek: Option[Exn.Result[A]] = result.peek
   1.178 +  def join_result: Exn.Result[A] = result.join_result
   1.179 +  def cancel: Unit = thread.interrupt
   1.180  }