more direct task future implementation, with proper cancel operation;
more uniform Future.thread;
--- a/src/Pure/Concurrent/future.scala Tue Nov 03 16:35:00 2015 +0100
+++ b/src/Pure/Concurrent/future.scala Tue Nov 03 16:35:38 2015 +0100
@@ -2,31 +2,21 @@
Module: PIDE
Author: Makarius
-Value-oriented parallel execution via futures and promises in Scala -- with
-signatures as in Isabelle/ML.
+Value-oriented parallel execution via futures and promises.
*/
package isabelle
-import scala.util.{Success, Failure}
-import scala.concurrent.{ExecutionContext, ExecutionContextExecutor,
- Future => Scala_Future, Promise => Scala_Promise, Await}
-import scala.concurrent.duration.Duration
-
+/* futures and promises */
object Future
{
- lazy val execution_context: ExecutionContextExecutor =
- ExecutionContext.fromExecutorService(Standard_Thread.default_pool)
-
- def value[A](x: A): Future[A] = new Finished_Future(x)
-
- def fork[A](body: => A): Future[A] =
- new Pending_Future(Scala_Future[A](body)(execution_context))
-
- def promise[A]: Promise[A] =
- new Promise_Future[A](Scala_Promise[A]())
+ def value[A](x: A): Future[A] = new Value_Future(x)
+ def fork[A](body: => A): Future[A] = new Task_Future[A](body)
+ def promise[A]: Promise[A] = new Promise_Future[A]
+ def thread[A](name: String = "", daemon: Boolean = false)(body: => A): Future[A] =
+ new Thread_Future[A](name, daemon, body)
}
trait Future[A]
@@ -34,8 +24,10 @@
def peek: Option[Exn.Result[A]]
def is_finished: Boolean = peek.isDefined
def get_finished: A = { require(is_finished); Exn.release(peek.get) }
- def join: A
+ def join_result: Exn.Result[A]
+ def join: A = Exn.release(join_result)
def map[B](f: A => B): Future[B] = Future.fork { f(join) }
+ def cancel: Unit
override def toString: String =
peek match {
@@ -47,46 +39,103 @@
trait Promise[A] extends Future[A]
{
- def cancel: Unit
def fulfill_result(res: Exn.Result[A]): Unit
def fulfill(x: A): Unit
}
-private class Finished_Future[A](x: A) extends Future[A]
+/* value future */
+
+private class Value_Future[A](x: A) extends Future[A]
{
val peek: Option[Exn.Result[A]] = Some(Exn.Res(x))
- val join: A = x
+ def join_result: Exn.Result[A] = peek.get
+ def cancel {}
}
-private class Pending_Future[A](future: Scala_Future[A]) extends Future[A]
+
+/* task future via thread pool */
+
+private class Task_Future[A](body: => A) extends Future[A]
{
+ private sealed abstract class Status
+ private case object Ready extends Status
+ private case class Running(thread: Thread) extends Status
+ private case object Terminated extends Status
+ private case class Finished(result: Exn.Result[A]) extends Status
+
+ private val status = Synchronized[Status](Ready)
+
def peek: Option[Exn.Result[A]] =
- future.value match {
- case Some(Success(x)) => Some(Exn.Res(x))
- case Some(Failure(e)) => Some(Exn.Exn(e))
- case None => None
+ status.value match {
+ case Finished(result) => Some(result)
+ case _ => None
+ }
+
+ private def try_run()
+ {
+ val do_run =
+ status.change_result {
+ case Ready => (true, Running(Thread.currentThread))
+ case st => (false, st)
+ }
+ if (do_run) {
+ val result = Exn.capture(body)
+ status.change(_ => Terminated)
+ status.change(_ => Finished(if (Thread.interrupted) Exn.Exn(Exn.Interrupt()) else result))
}
- override def is_finished: Boolean = future.isCompleted
+ }
+ private val task = Standard_Thread.submit_task { try_run() }
- def join: A = Await.result(future, Duration.Inf)
- override def map[B](f: A => B): Future[B] =
- new Pending_Future[B](future.map(f)(Future.execution_context))
+ def join_result: Exn.Result[A] =
+ {
+ try_run()
+ status.guarded_access {
+ case st @ Finished(result) => Some((result, st))
+ case _ => None
+ }
+ }
+
+ def cancel =
+ {
+ status.change {
+ case Ready => task.cancel(false); Finished(Exn.Exn(Exn.Interrupt()))
+ case st @ Running(thread) => thread.interrupt; st
+ case st => st
+ }
+ }
}
-private class Promise_Future[A](promise: Scala_Promise[A])
- extends Pending_Future(promise.future) with Promise[A]
+
+/* promise future */
+
+private class Promise_Future[A] extends Promise[A]
{
- override def is_finished: Boolean = promise.isCompleted
+ private val state = Synchronized[Option[Exn.Result[A]]](None)
+ def peek: Option[Exn.Result[A]] = state.value
+
+ def join_result: Exn.Result[A] =
+ state.guarded_access(st => if (st.isEmpty) None else Some((st.get, st)))
+
+ def fulfill_result(result: Exn.Result[A]): Unit =
+ state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException)
+
+ def fulfill(x: A): Unit = fulfill_result(Exn.Res(x))
def cancel: Unit =
- try { fulfill_result(Exn.Exn(Exn.Interrupt())) }
- catch { case _: IllegalStateException => }
+ state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st)
+}
+
+
+/* thread future */
- def fulfill_result(res: Exn.Result[A]): Unit =
- res match {
- case Exn.Res(x) => promise.success(x)
- case Exn.Exn(e) => promise.failure(e)
- }
- def fulfill(x: A): Unit = promise.success(x)
+private class Thread_Future[A](name: String, daemon: Boolean, body: => A) extends Future[A]
+{
+ private val result = Future.promise[A]
+ private val thread =
+ Standard_Thread.fork(name, daemon) { result.fulfill_result(Exn.capture(body)) }
+
+ def peek: Option[Exn.Result[A]] = result.peek
+ def join_result: Exn.Result[A] = result.join_result
+ def cancel: Unit = thread.interrupt
}
--- a/src/Pure/Concurrent/standard_thread.scala Tue Nov 03 16:35:00 2015 +0100
+++ b/src/Pure/Concurrent/standard_thread.scala Tue Nov 03 16:35:38 2015 +0100
@@ -28,19 +28,9 @@
}
- /* future result via thread */
-
- def future[A](name: String = "", daemon: Boolean = false)(body: => A): (Thread, Future[A]) =
- {
- val result = Future.promise[A]
- val thread = fork(name, daemon) { result.fulfill_result(Exn.capture(body)) }
- (thread, result)
- }
-
-
/* thread pool */
- lazy val default_pool =
+ lazy val pool: ThreadPoolExecutor =
{
val m = Properties.Value.Int.unapply(System.getProperty("isabelle.threads", "0")) getOrElse 0
val n = if (m > 0) m else (Runtime.getRuntime.availableProcessors max 1) min 8
@@ -48,7 +38,7 @@
}
def submit_task[A](body: => A): JFuture[A] =
- default_pool.submit(new Callable[A] { def call = body })
+ pool.submit(new Callable[A] { def call = body })
/* delayed events */
--- a/src/Pure/PIDE/prover.scala Tue Nov 03 16:35:00 2015 +0100
+++ b/src/Pure/PIDE/prover.scala Tue Nov 03 16:35:38 2015 +0100
@@ -121,8 +121,8 @@
/** process manager **/
- private val (_, process_result) =
- Standard_Thread.future("process_result") { system_process.join }
+ private val process_result =
+ Future.thread("process_result") { system_process.join }
private def terminate_process()
{
--- a/src/Pure/System/isabelle_system.scala Tue Nov 03 16:35:00 2015 +0100
+++ b/src/Pure/System/isabelle_system.scala Tue Nov 03 16:35:38 2015 +0100
@@ -328,14 +328,10 @@
proc.stdin.close
val limited = new Limited_Progress(proc, progress_limit)
- val (_, stdout) =
- Standard_Thread.future("bash_stdout") {
- File.read_lines(proc.stdout, limited(progress_stdout))
- }
- val (_, stderr) =
- Standard_Thread.future("bash_stderr") {
- File.read_lines(proc.stderr, limited(progress_stderr))
- }
+ val stdout =
+ Future.thread("bash_stdout") { File.read_lines(proc.stdout, limited(progress_stdout)) }
+ val stderr =
+ Future.thread("bash_stderr") { File.read_lines(proc.stderr, limited(progress_stderr)) }
val rc =
try { proc.join }
--- a/src/Pure/Tools/build.scala Tue Nov 03 16:35:00 2015 +0100
+++ b/src/Pure/Tools/build.scala Tue Nov 03 16:35:38 2015 +0100
@@ -598,8 +598,8 @@
"""
}
- private val (thread, result) =
- Standard_Thread.future("build") {
+ private val result =
+ Future.thread("build") {
Isabelle_System.bash_env(info.dir.file, env, script,
progress_stdout = (line: String) =>
Library.try_unprefix("\floading_theory = ", line) match {
@@ -614,7 +614,7 @@
strict = false)
}
- def terminate: Unit = thread.interrupt
+ def terminate: Unit = result.cancel
def is_finished: Boolean = result.is_finished
@volatile private var was_timeout = false