# HG changeset patch # User wenzelm # Date 1446564938 -3600 # Node ID 313eca3fa84720998c0ba021ae57a0e173439bba # Parent 68b86028e02aade5aeb7385c2d0bc1714defa5eb more direct task future implementation, with proper cancel operation; more uniform Future.thread; diff -r 68b86028e02a -r 313eca3fa847 src/Pure/Concurrent/future.scala --- a/src/Pure/Concurrent/future.scala Tue Nov 03 16:35:00 2015 +0100 +++ b/src/Pure/Concurrent/future.scala Tue Nov 03 16:35:38 2015 +0100 @@ -2,31 +2,21 @@ Module: PIDE Author: Makarius -Value-oriented parallel execution via futures and promises in Scala -- with -signatures as in Isabelle/ML. +Value-oriented parallel execution via futures and promises. */ package isabelle -import scala.util.{Success, Failure} -import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, - Future => Scala_Future, Promise => Scala_Promise, Await} -import scala.concurrent.duration.Duration - +/* futures and promises */ object Future { - lazy val execution_context: ExecutionContextExecutor = - ExecutionContext.fromExecutorService(Standard_Thread.default_pool) - - def value[A](x: A): Future[A] = new Finished_Future(x) - - def fork[A](body: => A): Future[A] = - new Pending_Future(Scala_Future[A](body)(execution_context)) - - def promise[A]: Promise[A] = - new Promise_Future[A](Scala_Promise[A]()) + def value[A](x: A): Future[A] = new Value_Future(x) + def fork[A](body: => A): Future[A] = new Task_Future[A](body) + def promise[A]: Promise[A] = new Promise_Future[A] + def thread[A](name: String = "", daemon: Boolean = false)(body: => A): Future[A] = + new Thread_Future[A](name, daemon, body) } trait Future[A] @@ -34,8 +24,10 @@ def peek: Option[Exn.Result[A]] def is_finished: Boolean = peek.isDefined def get_finished: A = { require(is_finished); Exn.release(peek.get) } - def join: A + def join_result: Exn.Result[A] + def join: A = Exn.release(join_result) def map[B](f: A => B): Future[B] = Future.fork { f(join) } + def cancel: Unit override def toString: String = peek match { @@ -47,46 +39,103 @@ trait Promise[A] extends Future[A] { - def cancel: Unit def fulfill_result(res: Exn.Result[A]): Unit def fulfill(x: A): Unit } -private class Finished_Future[A](x: A) extends Future[A] +/* value future */ + +private class Value_Future[A](x: A) extends Future[A] { val peek: Option[Exn.Result[A]] = Some(Exn.Res(x)) - val join: A = x + def join_result: Exn.Result[A] = peek.get + def cancel {} } -private class Pending_Future[A](future: Scala_Future[A]) extends Future[A] + +/* task future via thread pool */ + +private class Task_Future[A](body: => A) extends Future[A] { + private sealed abstract class Status + private case object Ready extends Status + private case class Running(thread: Thread) extends Status + private case object Terminated extends Status + private case class Finished(result: Exn.Result[A]) extends Status + + private val status = Synchronized[Status](Ready) + def peek: Option[Exn.Result[A]] = - future.value match { - case Some(Success(x)) => Some(Exn.Res(x)) - case Some(Failure(e)) => Some(Exn.Exn(e)) - case None => None + status.value match { + case Finished(result) => Some(result) + case _ => None + } + + private def try_run() + { + val do_run = + status.change_result { + case Ready => (true, Running(Thread.currentThread)) + case st => (false, st) + } + if (do_run) { + val result = Exn.capture(body) + status.change(_ => Terminated) + status.change(_ => Finished(if (Thread.interrupted) Exn.Exn(Exn.Interrupt()) else result)) } - override def is_finished: Boolean = future.isCompleted + } + private val task = Standard_Thread.submit_task { try_run() } - def join: A = Await.result(future, Duration.Inf) - override def map[B](f: A => B): Future[B] = - new Pending_Future[B](future.map(f)(Future.execution_context)) + def join_result: Exn.Result[A] = + { + try_run() + status.guarded_access { + case st @ Finished(result) => Some((result, st)) + case _ => None + } + } + + def cancel = + { + status.change { + case Ready => task.cancel(false); Finished(Exn.Exn(Exn.Interrupt())) + case st @ Running(thread) => thread.interrupt; st + case st => st + } + } } -private class Promise_Future[A](promise: Scala_Promise[A]) - extends Pending_Future(promise.future) with Promise[A] + +/* promise future */ + +private class Promise_Future[A] extends Promise[A] { - override def is_finished: Boolean = promise.isCompleted + private val state = Synchronized[Option[Exn.Result[A]]](None) + def peek: Option[Exn.Result[A]] = state.value + + def join_result: Exn.Result[A] = + state.guarded_access(st => if (st.isEmpty) None else Some((st.get, st))) + + def fulfill_result(result: Exn.Result[A]): Unit = + state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException) + + def fulfill(x: A): Unit = fulfill_result(Exn.Res(x)) def cancel: Unit = - try { fulfill_result(Exn.Exn(Exn.Interrupt())) } - catch { case _: IllegalStateException => } + state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st) +} + + +/* thread future */ - def fulfill_result(res: Exn.Result[A]): Unit = - res match { - case Exn.Res(x) => promise.success(x) - case Exn.Exn(e) => promise.failure(e) - } - def fulfill(x: A): Unit = promise.success(x) +private class Thread_Future[A](name: String, daemon: Boolean, body: => A) extends Future[A] +{ + private val result = Future.promise[A] + private val thread = + Standard_Thread.fork(name, daemon) { result.fulfill_result(Exn.capture(body)) } + + def peek: Option[Exn.Result[A]] = result.peek + def join_result: Exn.Result[A] = result.join_result + def cancel: Unit = thread.interrupt } diff -r 68b86028e02a -r 313eca3fa847 src/Pure/Concurrent/standard_thread.scala --- a/src/Pure/Concurrent/standard_thread.scala Tue Nov 03 16:35:00 2015 +0100 +++ b/src/Pure/Concurrent/standard_thread.scala Tue Nov 03 16:35:38 2015 +0100 @@ -28,19 +28,9 @@ } - /* future result via thread */ - - def future[A](name: String = "", daemon: Boolean = false)(body: => A): (Thread, Future[A]) = - { - val result = Future.promise[A] - val thread = fork(name, daemon) { result.fulfill_result(Exn.capture(body)) } - (thread, result) - } - - /* thread pool */ - lazy val default_pool = + lazy val pool: ThreadPoolExecutor = { val m = Properties.Value.Int.unapply(System.getProperty("isabelle.threads", "0")) getOrElse 0 val n = if (m > 0) m else (Runtime.getRuntime.availableProcessors max 1) min 8 @@ -48,7 +38,7 @@ } def submit_task[A](body: => A): JFuture[A] = - default_pool.submit(new Callable[A] { def call = body }) + pool.submit(new Callable[A] { def call = body }) /* delayed events */ diff -r 68b86028e02a -r 313eca3fa847 src/Pure/PIDE/prover.scala --- a/src/Pure/PIDE/prover.scala Tue Nov 03 16:35:00 2015 +0100 +++ b/src/Pure/PIDE/prover.scala Tue Nov 03 16:35:38 2015 +0100 @@ -121,8 +121,8 @@ /** process manager **/ - private val (_, process_result) = - Standard_Thread.future("process_result") { system_process.join } + private val process_result = + Future.thread("process_result") { system_process.join } private def terminate_process() { diff -r 68b86028e02a -r 313eca3fa847 src/Pure/System/isabelle_system.scala --- a/src/Pure/System/isabelle_system.scala Tue Nov 03 16:35:00 2015 +0100 +++ b/src/Pure/System/isabelle_system.scala Tue Nov 03 16:35:38 2015 +0100 @@ -328,14 +328,10 @@ proc.stdin.close val limited = new Limited_Progress(proc, progress_limit) - val (_, stdout) = - Standard_Thread.future("bash_stdout") { - File.read_lines(proc.stdout, limited(progress_stdout)) - } - val (_, stderr) = - Standard_Thread.future("bash_stderr") { - File.read_lines(proc.stderr, limited(progress_stderr)) - } + val stdout = + Future.thread("bash_stdout") { File.read_lines(proc.stdout, limited(progress_stdout)) } + val stderr = + Future.thread("bash_stderr") { File.read_lines(proc.stderr, limited(progress_stderr)) } val rc = try { proc.join } diff -r 68b86028e02a -r 313eca3fa847 src/Pure/Tools/build.scala --- a/src/Pure/Tools/build.scala Tue Nov 03 16:35:00 2015 +0100 +++ b/src/Pure/Tools/build.scala Tue Nov 03 16:35:38 2015 +0100 @@ -598,8 +598,8 @@ """ } - private val (thread, result) = - Standard_Thread.future("build") { + private val result = + Future.thread("build") { Isabelle_System.bash_env(info.dir.file, env, script, progress_stdout = (line: String) => Library.try_unprefix("\floading_theory = ", line) match { @@ -614,7 +614,7 @@ strict = false) } - def terminate: Unit = thread.interrupt + def terminate: Unit = result.cancel def is_finished: Boolean = result.is_finished @volatile private var was_timeout = false