more management of Invoke_Scala tasks;
authorwenzelm
Thu, 20 Sep 2012 19:23:05 +0200
changeset 49470 ee564db2649b
parent 49469 00c301c8d569
child 49471 97964515a676
more management of Invoke_Scala tasks;
src/Pure/PIDE/protocol.ML
src/Pure/System/invoke_scala.ML
src/Pure/System/invoke_scala.scala
src/Pure/System/session.scala
src/Pure/library.scala
--- a/src/Pure/PIDE/protocol.ML	Thu Sep 20 16:02:10 2012 +0200
+++ b/src/Pure/PIDE/protocol.ML	Thu Sep 20 19:23:05 2012 +0200
@@ -76,7 +76,9 @@
 
 val _ =
   Isabelle_Process.protocol_command "Document.invoke_scala"
-    (fn [id, tag, res] => Invoke_Scala.fulfill_method id tag res);
+    (fn [id, tag, res] =>
+      Invoke_Scala.fulfill_method id tag res
+        handle exn => if Exn.is_interrupt exn then () else reraise exn);
 
 end;
 
--- a/src/Pure/System/invoke_scala.ML	Thu Sep 20 16:02:10 2012 +0200
+++ b/src/Pure/System/invoke_scala.ML	Thu Sep 20 19:23:05 2012 +0200
@@ -52,6 +52,7 @@
       | "1" => Exn.Res res
       | "2" => Exn.Exn (ERROR res)
       | "3" => Exn.Exn (Fail res)
+      | "4" => Exn.Exn Exn.Interrupt
       | _ => raise Fail "Bad tag");
     val promise =
       Synchronized.change_result promises
--- a/src/Pure/System/invoke_scala.scala	Thu Sep 20 16:02:10 2012 +0200
+++ b/src/Pure/System/invoke_scala.scala	Thu Sep 20 19:23:05 2012 +0200
@@ -49,6 +49,7 @@
     val OK = Value("1")
     val ERROR = Value("2")
     val FAIL = Value("3")
+    val INTERRUPT = Value("4")
   }
 
   def method(name: String, arg: String): (Tag.Value, String) =
@@ -57,6 +58,7 @@
         Exn.capture { f(arg) } match {
           case Exn.Res(null) => (Tag.NULL, "")
           case Exn.Res(res) => (Tag.OK, res)
+          case Exn.Exn(_: InterruptedException) => (Tag.INTERRUPT, "")
           case Exn.Exn(e) => (Tag.ERROR, Exn.message(e))
         }
       case Exn.Exn(e) => (Tag.FAIL, Exn.message(e))
--- a/src/Pure/System/session.scala	Thu Sep 20 16:02:10 2012 +0200
+++ b/src/Pure/System/session.scala	Thu Sep 20 19:23:05 2012 +0200
@@ -172,6 +172,7 @@
     previous: Document.Version,
     version: Document.Version)
   private case class Messages(msgs: List[Isabelle_Process.Message])
+  private case class Finished_Scala(id: String, tag: Invoke_Scala.Tag.Value, result: String)
 
   private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true)
   {
@@ -179,6 +180,8 @@
 
     var prune_next = System.currentTimeMillis() + prune_delay.ms
 
+    var futures = Map.empty[String, java.util.concurrent.Future[Unit]]
+
 
     /* buffered prover messages */
 
@@ -338,14 +341,21 @@
           }
 
         case Isabelle_Markup.Invoke_Scala(name, id) if output.is_protocol =>
-          Future.fork {
-            val arg = XML.content(output.body)
-            val (tag, res) = Invoke_Scala.method(name, arg)
-            prover.get.invoke_scala(id, tag, res)
-          }
+          futures += (id ->
+            default_thread_pool.submit(() =>
+              {
+                val arg = XML.content(output.body)
+                val (tag, result) = Invoke_Scala.method(name, arg)
+                this_actor ! Finished_Scala(id, tag, result)
+              }))
 
         case Isabelle_Markup.Cancel_Scala(id) if output.is_protocol =>
-          System.err.println("cancel_scala " + id)  // FIXME actually cancel JVM task
+          futures.get(id) match {
+            case Some(future) =>
+              future.cancel(true)
+              this_actor ! Finished_Scala(id, Invoke_Scala.Tag.INTERRUPT, "")
+            case None =>
+          }
 
         case _ if output.is_init =>
             phase = Session.Ready
@@ -416,6 +426,12 @@
         if prover.isDefined && global_state().is_assigned(change.previous) =>
           handle_change(change)
 
+        case Finished_Scala(id, tag, result) if prover.isDefined =>
+          if (futures.isDefinedAt(id)) {
+            prover.get.invoke_scala(id, tag, result)
+            futures -= id
+          }
+
         case bad if !bad.isInstanceOf[Change] =>
           System.err.println("session_actor: ignoring bad message " + bad)
       }
--- a/src/Pure/library.scala	Thu Sep 20 16:02:10 2012 +0200
+++ b/src/Pure/library.scala	Thu Sep 20 19:23:05 2012 +0200
@@ -199,4 +199,13 @@
   val quote = Library.quote _
   val commas = Library.commas _
   val commas_quote = Library.commas_quote _
+
+
+  /* parallel tasks */
+
+  implicit def function_as_callable[A](f: () => A) =
+    new java.util.concurrent.Callable[A] { def call = f() }
+
+  val default_thread_pool =
+    scala.collection.parallel.ThreadPoolTasks.defaultThreadPool
 }