more direct task future implementation, with proper cancel operation;
authorwenzelm
Tue, 03 Nov 2015 16:35:38 +0100
changeset 61559 313eca3fa847
parent 61558 68b86028e02a
child 61560 7c985fd653c5
more direct task future implementation, with proper cancel operation; more uniform Future.thread;
src/Pure/Concurrent/future.scala
src/Pure/Concurrent/standard_thread.scala
src/Pure/PIDE/prover.scala
src/Pure/System/isabelle_system.scala
src/Pure/Tools/build.scala
--- a/src/Pure/Concurrent/future.scala	Tue Nov 03 16:35:00 2015 +0100
+++ b/src/Pure/Concurrent/future.scala	Tue Nov 03 16:35:38 2015 +0100
@@ -2,31 +2,21 @@
     Module:     PIDE
     Author:     Makarius
 
-Value-oriented parallel execution via futures and promises in Scala -- with
-signatures as in Isabelle/ML.
+Value-oriented parallel execution via futures and promises.
 */
 
 package isabelle
 
 
-import scala.util.{Success, Failure}
-import scala.concurrent.{ExecutionContext, ExecutionContextExecutor,
-  Future => Scala_Future, Promise => Scala_Promise, Await}
-import scala.concurrent.duration.Duration
-
+/* futures and promises */
 
 object Future
 {
-  lazy val execution_context: ExecutionContextExecutor =
-    ExecutionContext.fromExecutorService(Standard_Thread.default_pool)
-
-  def value[A](x: A): Future[A] = new Finished_Future(x)
-
-  def fork[A](body: => A): Future[A] =
-    new Pending_Future(Scala_Future[A](body)(execution_context))
-
-  def promise[A]: Promise[A] =
-    new Promise_Future[A](Scala_Promise[A]())
+  def value[A](x: A): Future[A] = new Value_Future(x)
+  def fork[A](body: => A): Future[A] = new Task_Future[A](body)
+  def promise[A]: Promise[A] = new Promise_Future[A]
+  def thread[A](name: String = "", daemon: Boolean = false)(body: => A): Future[A] =
+    new Thread_Future[A](name, daemon, body)
 }
 
 trait Future[A]
@@ -34,8 +24,10 @@
   def peek: Option[Exn.Result[A]]
   def is_finished: Boolean = peek.isDefined
   def get_finished: A = { require(is_finished); Exn.release(peek.get) }
-  def join: A
+  def join_result: Exn.Result[A]
+  def join: A = Exn.release(join_result)
   def map[B](f: A => B): Future[B] = Future.fork { f(join) }
+  def cancel: Unit
 
   override def toString: String =
     peek match {
@@ -47,46 +39,103 @@
 
 trait Promise[A] extends Future[A]
 {
-  def cancel: Unit
   def fulfill_result(res: Exn.Result[A]): Unit
   def fulfill(x: A): Unit
 }
 
 
-private class Finished_Future[A](x: A) extends Future[A]
+/* value future */
+
+private class Value_Future[A](x: A) extends Future[A]
 {
   val peek: Option[Exn.Result[A]] = Some(Exn.Res(x))
-  val join: A = x
+  def join_result: Exn.Result[A] = peek.get
+  def cancel {}
 }
 
-private class Pending_Future[A](future: Scala_Future[A]) extends Future[A]
+
+/* task future via thread pool */
+
+private class Task_Future[A](body: => A) extends Future[A]
 {
+  private sealed abstract class Status
+  private case object Ready extends Status
+  private case class Running(thread: Thread) extends Status
+  private case object Terminated extends Status
+  private case class Finished(result: Exn.Result[A]) extends Status
+
+  private val status = Synchronized[Status](Ready)
+
   def peek: Option[Exn.Result[A]] =
-    future.value match {
-      case Some(Success(x)) => Some(Exn.Res(x))
-      case Some(Failure(e)) => Some(Exn.Exn(e))
-      case None => None
+    status.value match {
+      case Finished(result) => Some(result)
+      case _ => None
+    }
+
+  private def try_run()
+  {
+    val do_run =
+      status.change_result {
+        case Ready => (true, Running(Thread.currentThread))
+        case st => (false, st)
+      }
+    if (do_run) {
+      val result = Exn.capture(body)
+      status.change(_ => Terminated)
+      status.change(_ => Finished(if (Thread.interrupted) Exn.Exn(Exn.Interrupt()) else result))
     }
-  override def is_finished: Boolean = future.isCompleted
+  }
+  private val task = Standard_Thread.submit_task { try_run() }
 
-  def join: A = Await.result(future, Duration.Inf)
-  override def map[B](f: A => B): Future[B] =
-    new Pending_Future[B](future.map(f)(Future.execution_context))
+  def join_result: Exn.Result[A] =
+  {
+    try_run()
+    status.guarded_access {
+      case st @ Finished(result) => Some((result, st))
+      case _ => None
+    }
+  }
+
+  def cancel =
+  {
+    status.change {
+      case Ready => task.cancel(false); Finished(Exn.Exn(Exn.Interrupt()))
+      case st @ Running(thread) => thread.interrupt; st
+      case st => st
+    }
+  }
 }
 
-private class Promise_Future[A](promise: Scala_Promise[A])
-  extends Pending_Future(promise.future) with Promise[A]
+
+/* promise future */
+
+private class Promise_Future[A] extends Promise[A]
 {
-  override def is_finished: Boolean = promise.isCompleted
+  private val state = Synchronized[Option[Exn.Result[A]]](None)
+  def peek: Option[Exn.Result[A]] = state.value
+
+  def join_result: Exn.Result[A] =
+    state.guarded_access(st => if (st.isEmpty) None else Some((st.get, st)))
+
+  def fulfill_result(result: Exn.Result[A]): Unit =
+    state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException)
+
+  def fulfill(x: A): Unit = fulfill_result(Exn.Res(x))
 
   def cancel: Unit =
-    try { fulfill_result(Exn.Exn(Exn.Interrupt())) }
-    catch { case _: IllegalStateException => }
+    state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st)
+}
+
+
+/* thread future */
 
-  def fulfill_result(res: Exn.Result[A]): Unit =
-    res match {
-      case Exn.Res(x) => promise.success(x)
-      case Exn.Exn(e) => promise.failure(e)
-    }
-  def fulfill(x: A): Unit = promise.success(x)
+private class Thread_Future[A](name: String, daemon: Boolean, body: => A) extends Future[A]
+{
+  private val result = Future.promise[A]
+  private val thread =
+    Standard_Thread.fork(name, daemon) { result.fulfill_result(Exn.capture(body)) }
+
+  def peek: Option[Exn.Result[A]] = result.peek
+  def join_result: Exn.Result[A] = result.join_result
+  def cancel: Unit = thread.interrupt
 }
--- a/src/Pure/Concurrent/standard_thread.scala	Tue Nov 03 16:35:00 2015 +0100
+++ b/src/Pure/Concurrent/standard_thread.scala	Tue Nov 03 16:35:38 2015 +0100
@@ -28,19 +28,9 @@
   }
 
 
-  /* future result via thread */
-
-  def future[A](name: String = "", daemon: Boolean = false)(body: => A): (Thread, Future[A]) =
-  {
-    val result = Future.promise[A]
-    val thread = fork(name, daemon) { result.fulfill_result(Exn.capture(body)) }
-    (thread, result)
-  }
-
-
   /* thread pool */
 
-  lazy val default_pool =
+  lazy val pool: ThreadPoolExecutor =
     {
       val m = Properties.Value.Int.unapply(System.getProperty("isabelle.threads", "0")) getOrElse 0
       val n = if (m > 0) m else (Runtime.getRuntime.availableProcessors max 1) min 8
@@ -48,7 +38,7 @@
     }
 
   def submit_task[A](body: => A): JFuture[A] =
-    default_pool.submit(new Callable[A] { def call = body })
+    pool.submit(new Callable[A] { def call = body })
 
 
   /* delayed events */
--- a/src/Pure/PIDE/prover.scala	Tue Nov 03 16:35:00 2015 +0100
+++ b/src/Pure/PIDE/prover.scala	Tue Nov 03 16:35:38 2015 +0100
@@ -121,8 +121,8 @@
 
   /** process manager **/
 
-  private val (_, process_result) =
-    Standard_Thread.future("process_result") { system_process.join }
+  private val process_result =
+    Future.thread("process_result") { system_process.join }
 
   private def terminate_process()
   {
--- a/src/Pure/System/isabelle_system.scala	Tue Nov 03 16:35:00 2015 +0100
+++ b/src/Pure/System/isabelle_system.scala	Tue Nov 03 16:35:38 2015 +0100
@@ -328,14 +328,10 @@
       proc.stdin.close
 
       val limited = new Limited_Progress(proc, progress_limit)
-      val (_, stdout) =
-        Standard_Thread.future("bash_stdout") {
-          File.read_lines(proc.stdout, limited(progress_stdout))
-        }
-      val (_, stderr) =
-        Standard_Thread.future("bash_stderr") {
-          File.read_lines(proc.stderr, limited(progress_stderr))
-        }
+      val stdout =
+        Future.thread("bash_stdout") { File.read_lines(proc.stdout, limited(progress_stdout)) }
+      val stderr =
+        Future.thread("bash_stderr") { File.read_lines(proc.stderr, limited(progress_stderr)) }
 
       val rc =
         try { proc.join }
--- a/src/Pure/Tools/build.scala	Tue Nov 03 16:35:00 2015 +0100
+++ b/src/Pure/Tools/build.scala	Tue Nov 03 16:35:38 2015 +0100
@@ -598,8 +598,8 @@
         """
       }
 
-    private val (thread, result) =
-      Standard_Thread.future("build") {
+    private val result =
+      Future.thread("build") {
         Isabelle_System.bash_env(info.dir.file, env, script,
           progress_stdout = (line: String) =>
             Library.try_unprefix("\floading_theory = ", line) match {
@@ -614,7 +614,7 @@
           strict = false)
       }
 
-    def terminate: Unit = thread.interrupt
+    def terminate: Unit = result.cancel
     def is_finished: Boolean = result.is_finished
 
     @volatile private var was_timeout = false