--- a/src/Pure/PIDE/protocol.ML Thu Sep 20 16:02:10 2012 +0200
+++ b/src/Pure/PIDE/protocol.ML Thu Sep 20 19:23:05 2012 +0200
@@ -76,7 +76,9 @@
val _ =
Isabelle_Process.protocol_command "Document.invoke_scala"
- (fn [id, tag, res] => Invoke_Scala.fulfill_method id tag res);
+ (fn [id, tag, res] =>
+ Invoke_Scala.fulfill_method id tag res
+ handle exn => if Exn.is_interrupt exn then () else reraise exn);
end;
--- a/src/Pure/System/invoke_scala.ML Thu Sep 20 16:02:10 2012 +0200
+++ b/src/Pure/System/invoke_scala.ML Thu Sep 20 19:23:05 2012 +0200
@@ -52,6 +52,7 @@
| "1" => Exn.Res res
| "2" => Exn.Exn (ERROR res)
| "3" => Exn.Exn (Fail res)
+ | "4" => Exn.Exn Exn.Interrupt
| _ => raise Fail "Bad tag");
val promise =
Synchronized.change_result promises
--- a/src/Pure/System/invoke_scala.scala Thu Sep 20 16:02:10 2012 +0200
+++ b/src/Pure/System/invoke_scala.scala Thu Sep 20 19:23:05 2012 +0200
@@ -49,6 +49,7 @@
val OK = Value("1")
val ERROR = Value("2")
val FAIL = Value("3")
+ val INTERRUPT = Value("4")
}
def method(name: String, arg: String): (Tag.Value, String) =
@@ -57,6 +58,7 @@
Exn.capture { f(arg) } match {
case Exn.Res(null) => (Tag.NULL, "")
case Exn.Res(res) => (Tag.OK, res)
+ case Exn.Exn(_: InterruptedException) => (Tag.INTERRUPT, "")
case Exn.Exn(e) => (Tag.ERROR, Exn.message(e))
}
case Exn.Exn(e) => (Tag.FAIL, Exn.message(e))
--- a/src/Pure/System/session.scala Thu Sep 20 16:02:10 2012 +0200
+++ b/src/Pure/System/session.scala Thu Sep 20 19:23:05 2012 +0200
@@ -172,6 +172,7 @@
previous: Document.Version,
version: Document.Version)
private case class Messages(msgs: List[Isabelle_Process.Message])
+ private case class Finished_Scala(id: String, tag: Invoke_Scala.Tag.Value, result: String)
private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true)
{
@@ -179,6 +180,8 @@
var prune_next = System.currentTimeMillis() + prune_delay.ms
+ var futures = Map.empty[String, java.util.concurrent.Future[Unit]]
+
/* buffered prover messages */
@@ -338,14 +341,21 @@
}
case Isabelle_Markup.Invoke_Scala(name, id) if output.is_protocol =>
- Future.fork {
- val arg = XML.content(output.body)
- val (tag, res) = Invoke_Scala.method(name, arg)
- prover.get.invoke_scala(id, tag, res)
- }
+ futures += (id ->
+ default_thread_pool.submit(() =>
+ {
+ val arg = XML.content(output.body)
+ val (tag, result) = Invoke_Scala.method(name, arg)
+ this_actor ! Finished_Scala(id, tag, result)
+ }))
case Isabelle_Markup.Cancel_Scala(id) if output.is_protocol =>
- System.err.println("cancel_scala " + id) // FIXME actually cancel JVM task
+ futures.get(id) match {
+ case Some(future) =>
+ future.cancel(true)
+ this_actor ! Finished_Scala(id, Invoke_Scala.Tag.INTERRUPT, "")
+ case None =>
+ }
case _ if output.is_init =>
phase = Session.Ready
@@ -416,6 +426,12 @@
if prover.isDefined && global_state().is_assigned(change.previous) =>
handle_change(change)
+ case Finished_Scala(id, tag, result) if prover.isDefined =>
+ if (futures.isDefinedAt(id)) {
+ prover.get.invoke_scala(id, tag, result)
+ futures -= id
+ }
+
case bad if !bad.isInstanceOf[Change] =>
System.err.println("session_actor: ignoring bad message " + bad)
}
--- a/src/Pure/library.scala Thu Sep 20 16:02:10 2012 +0200
+++ b/src/Pure/library.scala Thu Sep 20 19:23:05 2012 +0200
@@ -199,4 +199,13 @@
val quote = Library.quote _
val commas = Library.commas _
val commas_quote = Library.commas_quote _
+
+
+ /* parallel tasks */
+
+ implicit def function_as_callable[A](f: () => A) =
+ new java.util.concurrent.Callable[A] { def call = f() }
+
+ val default_thread_pool =
+ scala.collection.parallel.ThreadPoolTasks.defaultThreadPool
}