--- a/src/Pure/Concurrent/future.scala Tue Nov 03 16:35:00 2015 +0100
+++ b/src/Pure/Concurrent/future.scala Tue Nov 03 16:35:38 2015 +0100
@@ -2,31 +2,21 @@
Module: PIDE
Author: Makarius
-Value-oriented parallel execution via futures and promises in Scala -- with
-signatures as in Isabelle/ML.
+Value-oriented parallel execution via futures and promises.
*/
package isabelle
-import scala.util.{Success, Failure}
-import scala.concurrent.{ExecutionContext, ExecutionContextExecutor,
- Future => Scala_Future, Promise => Scala_Promise, Await}
-import scala.concurrent.duration.Duration
-
+/* futures and promises */
object Future
{
- lazy val execution_context: ExecutionContextExecutor =
- ExecutionContext.fromExecutorService(Standard_Thread.default_pool)
-
- def value[A](x: A): Future[A] = new Finished_Future(x)
-
- def fork[A](body: => A): Future[A] =
- new Pending_Future(Scala_Future[A](body)(execution_context))
-
- def promise[A]: Promise[A] =
- new Promise_Future[A](Scala_Promise[A]())
+ def value[A](x: A): Future[A] = new Value_Future(x)
+ def fork[A](body: => A): Future[A] = new Task_Future[A](body)
+ def promise[A]: Promise[A] = new Promise_Future[A]
+ def thread[A](name: String = "", daemon: Boolean = false)(body: => A): Future[A] =
+ new Thread_Future[A](name, daemon, body)
}
trait Future[A]
@@ -34,8 +24,10 @@
def peek: Option[Exn.Result[A]]
def is_finished: Boolean = peek.isDefined
def get_finished: A = { require(is_finished); Exn.release(peek.get) }
- def join: A
+ def join_result: Exn.Result[A]
+ def join: A = Exn.release(join_result)
def map[B](f: A => B): Future[B] = Future.fork { f(join) }
+ def cancel: Unit
override def toString: String =
peek match {
@@ -47,46 +39,103 @@
trait Promise[A] extends Future[A]
{
- def cancel: Unit
def fulfill_result(res: Exn.Result[A]): Unit
def fulfill(x: A): Unit
}
-private class Finished_Future[A](x: A) extends Future[A]
+/* value future */
+
+private class Value_Future[A](x: A) extends Future[A]
{
val peek: Option[Exn.Result[A]] = Some(Exn.Res(x))
- val join: A = x
+ def join_result: Exn.Result[A] = peek.get
+ def cancel {}
}
-private class Pending_Future[A](future: Scala_Future[A]) extends Future[A]
+
+/* task future via thread pool */
+
+private class Task_Future[A](body: => A) extends Future[A]
{
+ private sealed abstract class Status
+ private case object Ready extends Status
+ private case class Running(thread: Thread) extends Status
+ private case object Terminated extends Status
+ private case class Finished(result: Exn.Result[A]) extends Status
+
+ private val status = Synchronized[Status](Ready)
+
def peek: Option[Exn.Result[A]] =
- future.value match {
- case Some(Success(x)) => Some(Exn.Res(x))
- case Some(Failure(e)) => Some(Exn.Exn(e))
- case None => None
+ status.value match {
+ case Finished(result) => Some(result)
+ case _ => None
+ }
+
+ private def try_run()
+ {
+ val do_run =
+ status.change_result {
+ case Ready => (true, Running(Thread.currentThread))
+ case st => (false, st)
+ }
+ if (do_run) {
+ val result = Exn.capture(body)
+ status.change(_ => Terminated)
+ status.change(_ => Finished(if (Thread.interrupted) Exn.Exn(Exn.Interrupt()) else result))
}
- override def is_finished: Boolean = future.isCompleted
+ }
+ private val task = Standard_Thread.submit_task { try_run() }
- def join: A = Await.result(future, Duration.Inf)
- override def map[B](f: A => B): Future[B] =
- new Pending_Future[B](future.map(f)(Future.execution_context))
+ def join_result: Exn.Result[A] =
+ {
+ try_run()
+ status.guarded_access {
+ case st @ Finished(result) => Some((result, st))
+ case _ => None
+ }
+ }
+
+ def cancel =
+ {
+ status.change {
+ case Ready => task.cancel(false); Finished(Exn.Exn(Exn.Interrupt()))
+ case st @ Running(thread) => thread.interrupt; st
+ case st => st
+ }
+ }
}
-private class Promise_Future[A](promise: Scala_Promise[A])
- extends Pending_Future(promise.future) with Promise[A]
+
+/* promise future */
+
+private class Promise_Future[A] extends Promise[A]
{
- override def is_finished: Boolean = promise.isCompleted
+ private val state = Synchronized[Option[Exn.Result[A]]](None)
+ def peek: Option[Exn.Result[A]] = state.value
+
+ def join_result: Exn.Result[A] =
+ state.guarded_access(st => if (st.isEmpty) None else Some((st.get, st)))
+
+ def fulfill_result(result: Exn.Result[A]): Unit =
+ state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException)
+
+ def fulfill(x: A): Unit = fulfill_result(Exn.Res(x))
def cancel: Unit =
- try { fulfill_result(Exn.Exn(Exn.Interrupt())) }
- catch { case _: IllegalStateException => }
+ state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st)
+}
+
+
+/* thread future */
- def fulfill_result(res: Exn.Result[A]): Unit =
- res match {
- case Exn.Res(x) => promise.success(x)
- case Exn.Exn(e) => promise.failure(e)
- }
- def fulfill(x: A): Unit = promise.success(x)
+private class Thread_Future[A](name: String, daemon: Boolean, body: => A) extends Future[A]
+{
+ private val result = Future.promise[A]
+ private val thread =
+ Standard_Thread.fork(name, daemon) { result.fulfill_result(Exn.capture(body)) }
+
+ def peek: Option[Exn.Result[A]] = result.peek
+ def join_result: Exn.Result[A] = result.join_result
+ def cancel: Unit = thread.interrupt
}