# HG changeset patch # User wenzelm # Date 1348161785 -7200 # Node ID ee564db2649b2a66bfd9052d1f9949fa4213d98d # Parent 00c301c8d569955b7120176ecc849111637a508c more management of Invoke_Scala tasks; diff -r 00c301c8d569 -r ee564db2649b src/Pure/PIDE/protocol.ML --- a/src/Pure/PIDE/protocol.ML Thu Sep 20 16:02:10 2012 +0200 +++ b/src/Pure/PIDE/protocol.ML Thu Sep 20 19:23:05 2012 +0200 @@ -76,7 +76,9 @@ val _ = Isabelle_Process.protocol_command "Document.invoke_scala" - (fn [id, tag, res] => Invoke_Scala.fulfill_method id tag res); + (fn [id, tag, res] => + Invoke_Scala.fulfill_method id tag res + handle exn => if Exn.is_interrupt exn then () else reraise exn); end; diff -r 00c301c8d569 -r ee564db2649b src/Pure/System/invoke_scala.ML --- a/src/Pure/System/invoke_scala.ML Thu Sep 20 16:02:10 2012 +0200 +++ b/src/Pure/System/invoke_scala.ML Thu Sep 20 19:23:05 2012 +0200 @@ -52,6 +52,7 @@ | "1" => Exn.Res res | "2" => Exn.Exn (ERROR res) | "3" => Exn.Exn (Fail res) + | "4" => Exn.Exn Exn.Interrupt | _ => raise Fail "Bad tag"); val promise = Synchronized.change_result promises diff -r 00c301c8d569 -r ee564db2649b src/Pure/System/invoke_scala.scala --- a/src/Pure/System/invoke_scala.scala Thu Sep 20 16:02:10 2012 +0200 +++ b/src/Pure/System/invoke_scala.scala Thu Sep 20 19:23:05 2012 +0200 @@ -49,6 +49,7 @@ val OK = Value("1") val ERROR = Value("2") val FAIL = Value("3") + val INTERRUPT = Value("4") } def method(name: String, arg: String): (Tag.Value, String) = @@ -57,6 +58,7 @@ Exn.capture { f(arg) } match { case Exn.Res(null) => (Tag.NULL, "") case Exn.Res(res) => (Tag.OK, res) + case Exn.Exn(_: InterruptedException) => (Tag.INTERRUPT, "") case Exn.Exn(e) => (Tag.ERROR, Exn.message(e)) } case Exn.Exn(e) => (Tag.FAIL, Exn.message(e)) diff -r 00c301c8d569 -r ee564db2649b src/Pure/System/session.scala --- a/src/Pure/System/session.scala Thu Sep 20 16:02:10 2012 +0200 +++ b/src/Pure/System/session.scala Thu Sep 20 19:23:05 2012 +0200 @@ -172,6 +172,7 @@ previous: Document.Version, version: Document.Version) private case class Messages(msgs: List[Isabelle_Process.Message]) + private case class Finished_Scala(id: String, tag: Invoke_Scala.Tag.Value, result: String) private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true) { @@ -179,6 +180,8 @@ var prune_next = System.currentTimeMillis() + prune_delay.ms + var futures = Map.empty[String, java.util.concurrent.Future[Unit]] + /* buffered prover messages */ @@ -338,14 +341,21 @@ } case Isabelle_Markup.Invoke_Scala(name, id) if output.is_protocol => - Future.fork { - val arg = XML.content(output.body) - val (tag, res) = Invoke_Scala.method(name, arg) - prover.get.invoke_scala(id, tag, res) - } + futures += (id -> + default_thread_pool.submit(() => + { + val arg = XML.content(output.body) + val (tag, result) = Invoke_Scala.method(name, arg) + this_actor ! Finished_Scala(id, tag, result) + })) case Isabelle_Markup.Cancel_Scala(id) if output.is_protocol => - System.err.println("cancel_scala " + id) // FIXME actually cancel JVM task + futures.get(id) match { + case Some(future) => + future.cancel(true) + this_actor ! Finished_Scala(id, Invoke_Scala.Tag.INTERRUPT, "") + case None => + } case _ if output.is_init => phase = Session.Ready @@ -416,6 +426,12 @@ if prover.isDefined && global_state().is_assigned(change.previous) => handle_change(change) + case Finished_Scala(id, tag, result) if prover.isDefined => + if (futures.isDefinedAt(id)) { + prover.get.invoke_scala(id, tag, result) + futures -= id + } + case bad if !bad.isInstanceOf[Change] => System.err.println("session_actor: ignoring bad message " + bad) } diff -r 00c301c8d569 -r ee564db2649b src/Pure/library.scala --- a/src/Pure/library.scala Thu Sep 20 16:02:10 2012 +0200 +++ b/src/Pure/library.scala Thu Sep 20 19:23:05 2012 +0200 @@ -199,4 +199,13 @@ val quote = Library.quote _ val commas = Library.commas _ val commas_quote = Library.commas_quote _ + + + /* parallel tasks */ + + implicit def function_as_callable[A](f: () => A) = + new java.util.concurrent.Callable[A] { def call = f() } + + val default_thread_pool = + scala.collection.parallel.ThreadPoolTasks.defaultThreadPool }