more management of Invoke_Scala tasks;
authorwenzelm
Thu Sep 20 19:23:05 2012 +0200 (2012-09-20 ago)
changeset 49470ee564db2649b
parent 49469 00c301c8d569
child 49471 97964515a676
more management of Invoke_Scala tasks;
src/Pure/PIDE/protocol.ML
src/Pure/System/invoke_scala.ML
src/Pure/System/invoke_scala.scala
src/Pure/System/session.scala
src/Pure/library.scala
     1.1 --- a/src/Pure/PIDE/protocol.ML	Thu Sep 20 16:02:10 2012 +0200
     1.2 +++ b/src/Pure/PIDE/protocol.ML	Thu Sep 20 19:23:05 2012 +0200
     1.3 @@ -76,7 +76,9 @@
     1.4  
     1.5  val _ =
     1.6    Isabelle_Process.protocol_command "Document.invoke_scala"
     1.7 -    (fn [id, tag, res] => Invoke_Scala.fulfill_method id tag res);
     1.8 +    (fn [id, tag, res] =>
     1.9 +      Invoke_Scala.fulfill_method id tag res
    1.10 +        handle exn => if Exn.is_interrupt exn then () else reraise exn);
    1.11  
    1.12  end;
    1.13  
     2.1 --- a/src/Pure/System/invoke_scala.ML	Thu Sep 20 16:02:10 2012 +0200
     2.2 +++ b/src/Pure/System/invoke_scala.ML	Thu Sep 20 19:23:05 2012 +0200
     2.3 @@ -52,6 +52,7 @@
     2.4        | "1" => Exn.Res res
     2.5        | "2" => Exn.Exn (ERROR res)
     2.6        | "3" => Exn.Exn (Fail res)
     2.7 +      | "4" => Exn.Exn Exn.Interrupt
     2.8        | _ => raise Fail "Bad tag");
     2.9      val promise =
    2.10        Synchronized.change_result promises
     3.1 --- a/src/Pure/System/invoke_scala.scala	Thu Sep 20 16:02:10 2012 +0200
     3.2 +++ b/src/Pure/System/invoke_scala.scala	Thu Sep 20 19:23:05 2012 +0200
     3.3 @@ -49,6 +49,7 @@
     3.4      val OK = Value("1")
     3.5      val ERROR = Value("2")
     3.6      val FAIL = Value("3")
     3.7 +    val INTERRUPT = Value("4")
     3.8    }
     3.9  
    3.10    def method(name: String, arg: String): (Tag.Value, String) =
    3.11 @@ -57,6 +58,7 @@
    3.12          Exn.capture { f(arg) } match {
    3.13            case Exn.Res(null) => (Tag.NULL, "")
    3.14            case Exn.Res(res) => (Tag.OK, res)
    3.15 +          case Exn.Exn(_: InterruptedException) => (Tag.INTERRUPT, "")
    3.16            case Exn.Exn(e) => (Tag.ERROR, Exn.message(e))
    3.17          }
    3.18        case Exn.Exn(e) => (Tag.FAIL, Exn.message(e))
     4.1 --- a/src/Pure/System/session.scala	Thu Sep 20 16:02:10 2012 +0200
     4.2 +++ b/src/Pure/System/session.scala	Thu Sep 20 19:23:05 2012 +0200
     4.3 @@ -172,6 +172,7 @@
     4.4      previous: Document.Version,
     4.5      version: Document.Version)
     4.6    private case class Messages(msgs: List[Isabelle_Process.Message])
     4.7 +  private case class Finished_Scala(id: String, tag: Invoke_Scala.Tag.Value, result: String)
     4.8  
     4.9    private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true)
    4.10    {
    4.11 @@ -179,6 +180,8 @@
    4.12  
    4.13      var prune_next = System.currentTimeMillis() + prune_delay.ms
    4.14  
    4.15 +    var futures = Map.empty[String, java.util.concurrent.Future[Unit]]
    4.16 +
    4.17  
    4.18      /* buffered prover messages */
    4.19  
    4.20 @@ -338,14 +341,21 @@
    4.21            }
    4.22  
    4.23          case Isabelle_Markup.Invoke_Scala(name, id) if output.is_protocol =>
    4.24 -          Future.fork {
    4.25 -            val arg = XML.content(output.body)
    4.26 -            val (tag, res) = Invoke_Scala.method(name, arg)
    4.27 -            prover.get.invoke_scala(id, tag, res)
    4.28 -          }
    4.29 +          futures += (id ->
    4.30 +            default_thread_pool.submit(() =>
    4.31 +              {
    4.32 +                val arg = XML.content(output.body)
    4.33 +                val (tag, result) = Invoke_Scala.method(name, arg)
    4.34 +                this_actor ! Finished_Scala(id, tag, result)
    4.35 +              }))
    4.36  
    4.37          case Isabelle_Markup.Cancel_Scala(id) if output.is_protocol =>
    4.38 -          System.err.println("cancel_scala " + id)  // FIXME actually cancel JVM task
    4.39 +          futures.get(id) match {
    4.40 +            case Some(future) =>
    4.41 +              future.cancel(true)
    4.42 +              this_actor ! Finished_Scala(id, Invoke_Scala.Tag.INTERRUPT, "")
    4.43 +            case None =>
    4.44 +          }
    4.45  
    4.46          case _ if output.is_init =>
    4.47              phase = Session.Ready
    4.48 @@ -416,6 +426,12 @@
    4.49          if prover.isDefined && global_state().is_assigned(change.previous) =>
    4.50            handle_change(change)
    4.51  
    4.52 +        case Finished_Scala(id, tag, result) if prover.isDefined =>
    4.53 +          if (futures.isDefinedAt(id)) {
    4.54 +            prover.get.invoke_scala(id, tag, result)
    4.55 +            futures -= id
    4.56 +          }
    4.57 +
    4.58          case bad if !bad.isInstanceOf[Change] =>
    4.59            System.err.println("session_actor: ignoring bad message " + bad)
    4.60        }
     5.1 --- a/src/Pure/library.scala	Thu Sep 20 16:02:10 2012 +0200
     5.2 +++ b/src/Pure/library.scala	Thu Sep 20 19:23:05 2012 +0200
     5.3 @@ -199,4 +199,13 @@
     5.4    val quote = Library.quote _
     5.5    val commas = Library.commas _
     5.6    val commas_quote = Library.commas_quote _
     5.7 +
     5.8 +
     5.9 +  /* parallel tasks */
    5.10 +
    5.11 +  implicit def function_as_callable[A](f: () => A) =
    5.12 +    new java.util.concurrent.Callable[A] { def call = f() }
    5.13 +
    5.14 +  val default_thread_pool =
    5.15 +    scala.collection.parallel.ThreadPoolTasks.defaultThreadPool
    5.16  }