explicit management of Session.Protocol_Handlers, with protocol state and functions;
authorwenzelm
Wed, 22 May 2013 14:10:45 +0200
changeset 52111 1fd184eaa310
parent 52108 06db08182c4b
child 52112 3610ae73cfdb
explicit management of Session.Protocol_Handlers, with protocol state and functions; more self-contained ML/Scala module Invoke_Scala;
src/Pure/PIDE/markup.ML
src/Pure/PIDE/markup.scala
src/Pure/PIDE/protocol.ML
src/Pure/PIDE/protocol.scala
src/Pure/System/invoke_scala.ML
src/Pure/System/invoke_scala.scala
src/Pure/System/isabelle_process.ML
src/Pure/System/session.ML
src/Pure/System/session.scala
--- a/src/Pure/PIDE/markup.ML	Wed May 22 08:46:39 2013 +0200
+++ b/src/Pure/PIDE/markup.ML	Wed May 22 14:10:45 2013 +0200
@@ -142,6 +142,7 @@
   val functionN: string
   val assign_execs: Properties.T
   val removed_versions: Properties.T
+  val protocol_handler: string -> Properties.T
   val invoke_scala: string -> string -> Properties.T
   val cancel_scala: string -> Properties.T
   val ML_statistics: Properties.entry
@@ -461,6 +462,8 @@
 val assign_execs = [(functionN, "assign_execs")];
 val removed_versions = [(functionN, "removed_versions")];
 
+fun protocol_handler name = [(functionN, "protocol_handler"), (nameN, name)];
+
 fun invoke_scala name id = [(functionN, "invoke_scala"), (nameN, name), (idN, id)];
 fun cancel_scala id = [(functionN, "cancel_scala"), (idN, id)];
 
--- a/src/Pure/PIDE/markup.scala	Wed May 22 08:46:39 2013 +0200
+++ b/src/Pure/PIDE/markup.scala	Wed May 22 14:10:45 2013 +0200
@@ -316,19 +316,31 @@
   val Assign_Execs: Properties.T = List((FUNCTION, "assign_execs"))
   val Removed_Versions: Properties.T = List((FUNCTION, "removed_versions"))
 
+  object Protocol_Handler
+  {
+    def unapply(props: Properties.T): Option[(String)] =
+      props match {
+        case List((FUNCTION, "protocol_handler"), (NAME, name)) => Some(name)
+        case _ => None
+      }
+  }
+
+  val INVOKE_SCALA = "invoke_scala"
   object Invoke_Scala
   {
     def unapply(props: Properties.T): Option[(String, String)] =
       props match {
-        case List((FUNCTION, "invoke_scala"), (NAME, name), (ID, id)) => Some((name, id))
+        case List((FUNCTION, INVOKE_SCALA), (NAME, name), (ID, id)) => Some((name, id))
         case _ => None
       }
   }
+
+  val CANCEL_SCALA = "cancel_scala"
   object Cancel_Scala
   {
     def unapply(props: Properties.T): Option[String] =
       props match {
-        case List((FUNCTION, "cancel_scala"), (ID, id)) => Some(id)
+        case List((FUNCTION, CANCEL_SCALA), (ID, id)) => Some(id)
         case _ => None
       }
   }
--- a/src/Pure/PIDE/protocol.ML	Wed May 22 08:46:39 2013 +0200
+++ b/src/Pure/PIDE/protocol.ML	Wed May 22 14:10:45 2013 +0200
@@ -78,11 +78,5 @@
       Active.dialog_result (Markup.parse_int serial) result
         handle exn => if Exn.is_interrupt exn then () else reraise exn);
 
-val _ =
-  Isabelle_Process.protocol_command "Document.invoke_scala"
-    (fn [id, tag, res] =>
-      Invoke_Scala.fulfill_method id tag res
-        handle exn => if Exn.is_interrupt exn then () else reraise exn);
-
 end;
 
--- a/src/Pure/PIDE/protocol.scala	Wed May 22 08:46:39 2013 +0200
+++ b/src/Pure/PIDE/protocol.scala	Wed May 22 14:10:45 2013 +0200
@@ -364,12 +364,4 @@
   {
     input("Document.dialog_result", Properties.Value.Long(serial), result)
   }
-
-
-  /* method invocation service */
-
-  def invoke_scala(id: String, tag: Invoke_Scala.Tag.Value, res: String)
-  {
-    input("Document.invoke_scala", id, tag.toString, res)
-  }
 }
--- a/src/Pure/System/invoke_scala.ML	Wed May 22 08:46:39 2013 +0200
+++ b/src/Pure/System/invoke_scala.ML	Wed May 22 14:10:45 2013 +0200
@@ -6,16 +6,15 @@
 
 signature INVOKE_SCALA =
 sig
-  exception Null
   val method: string -> string -> string
   val promise_method: string -> string -> string future
-  val fulfill_method: string -> string -> string -> unit
+  exception Null
 end;
 
 structure Invoke_Scala: INVOKE_SCALA =
 struct
 
-exception Null;
+val _ = Session.protocol_handler "isabelle.Invoke_Scala";
 
 
 (* pending promises *)
@@ -40,9 +39,11 @@
 fun method name arg = Future.join (promise_method name arg);
 
 
-(* fulfill method *)
+(* fulfill *)
 
-fun fulfill_method id tag res =
+exception Null;
+
+fun fulfill id tag res =
   let
     val result =
       (case tag of
@@ -58,5 +59,11 @@
     val _ = Future.fulfill_result promise result;
   in () end;
 
+val _ =
+  Isabelle_Process.protocol_command "Invoke_Scala.fulfill"
+    (fn [id, tag, res] =>
+      fulfill id tag res
+        handle exn => if Exn.is_interrupt exn then () else reraise exn);
+
 end;
 
--- a/src/Pure/System/invoke_scala.scala	Wed May 22 08:46:39 2013 +0200
+++ b/src/Pure/System/invoke_scala.scala	Wed May 22 14:10:45 2013 +0200
@@ -64,3 +64,69 @@
       case Exn.Exn(e) => (Tag.FAIL, Exn.message(e))
     }
 }
+
+
+/* protocol handler */
+
+class Invoke_Scala extends Session.Protocol_Handler
+{
+  private var futures = Map.empty[String, java.util.concurrent.Future[Unit]]
+
+  private def fulfill(prover: Session.Prover,
+    id: String, tag: Invoke_Scala.Tag.Value, res: String): Unit = synchronized
+  {
+    if (futures.isDefinedAt(id)) {
+      prover.input("Invoke_Scala.fulfill", id, tag.toString, res)
+      futures -= id
+    }
+  }
+
+  private def cancel(prover: Session.Prover,
+    id: String, future: java.util.concurrent.Future[Unit])
+  {
+    future.cancel(true)
+    fulfill(prover, id, Invoke_Scala.Tag.INTERRUPT, "")
+  }
+
+  private def invoke_scala(
+    prover: Session.Prover, output: Isabelle_Process.Output): Boolean = synchronized
+  {
+    output.properties match {
+      case Markup.Invoke_Scala(name, id) =>
+        futures += (id ->
+          default_thread_pool.submit(() =>
+            {
+              val arg = XML.content(output.body)
+              val (tag, result) = Invoke_Scala.method(name, arg)
+              fulfill(prover, id, tag, result)
+            }))
+        true
+      case _ => false
+    }
+  }
+
+  private def cancel_scala(
+    prover: Session.Prover, output: Isabelle_Process.Output): Boolean = synchronized
+  {
+    output.properties match {
+      case Markup.Cancel_Scala(id) =>
+        futures.get(id) match {
+          case Some(future) => cancel(prover, id, future)
+          case None =>
+        }
+        true
+      case _ => false
+    }
+  }
+
+  override def stop(prover: Session.Prover): Unit = synchronized
+  {
+    for ((id, future) <- futures) cancel(prover, id, future)
+    futures = Map.empty
+  }
+
+  val functions = Map(
+    Markup.INVOKE_SCALA -> invoke_scala _,
+    Markup.CANCEL_SCALA -> cancel_scala _)
+}
+
--- a/src/Pure/System/isabelle_process.ML	Wed May 22 08:46:39 2013 +0200
+++ b/src/Pure/System/isabelle_process.ML	Wed May 22 14:10:45 2013 +0200
@@ -221,6 +221,7 @@
 
     val channel = rendezvous ();
     val _ = init_channels channel;
+    val _ = Session.init_protocol_handlers ();
   in loop channel end));
 
 fun init_fifos fifo1 fifo2 = init (fn () => System_Channel.fifo_rendezvous fifo1 fifo2);
--- a/src/Pure/System/session.ML	Wed May 22 08:46:39 2013 +0200
+++ b/src/Pure/System/session.ML	Wed May 22 14:10:45 2013 +0200
@@ -1,7 +1,7 @@
 (*  Title:      Pure/System/session.ML
     Author:     Makarius
 
-Session management -- internal state of logic images (not thread-safe).
+Session management -- internal state of logic images.
 *)
 
 signature SESSION =
@@ -11,12 +11,14 @@
   val init: bool -> bool -> Path.T -> string -> bool -> string -> (string * string) list ->
     string -> string * string -> bool * string -> bool -> unit
   val finish: unit -> unit
+  val protocol_handler: string -> unit
+  val init_protocol_handlers: unit -> unit
 end;
 
 structure Session: SESSION =
 struct
 
-(* session state *)
+(** session identification -- not thread-safe **)
 
 val session = Unsynchronized.ref {chapter = "Pure", name = "Pure"};
 val session_finished = Unsynchronized.ref false;
@@ -58,4 +60,20 @@
   Event_Timer.shutdown ();
   session_finished := true);
 
+
+(** protocol handlers **)
+
+val protocol_handlers = Synchronized.var "protocol_handlers" ([]: string list);
+
+fun protocol_handler name =
+  Synchronized.change protocol_handlers (fn handlers =>
+   (Output.try_protocol_message (Markup.protocol_handler name) "";
+    if not (member (op =) handlers name) then ()
+    else warning ("Redefining protocol handler: " ^ quote name);
+    update (op =) name handlers));
+
+fun init_protocol_handlers () =
+  Synchronized.value protocol_handlers
+  |> List.app (fn name => Output.try_protocol_message (Markup.protocol_handler name) "");
+
 end;
--- a/src/Pure/System/session.scala	Wed May 22 08:46:39 2013 +0200
+++ b/src/Pure/System/session.scala	Wed May 22 14:10:45 2013 +0200
@@ -37,6 +37,68 @@
   case object Ready extends Phase
   case object Shutdown extends Phase  // transient
   //}}}
+
+
+  /* protocol handlers */
+
+  type Prover = Isabelle_Process with Protocol
+
+  abstract class Protocol_Handler
+  {
+    def stop(prover: Prover): Unit = {}
+    val functions: Map[String, (Prover, Isabelle_Process.Output) => Boolean]
+  }
+
+  class Protocol_Handlers(
+    handlers: Map[String, Session.Protocol_Handler] = Map.empty,
+    functions: Map[String, Isabelle_Process.Output => Boolean] = Map.empty)
+  {
+    def add(prover: Prover, name: String): Protocol_Handlers =
+    {
+      val (handlers1, functions1) =
+        handlers.get(name) match {
+          case Some(old_handler) =>
+            System.err.println("Redefining protocol handler: " + name)
+            old_handler.stop(prover)
+            (handlers - name, functions -- old_handler.functions.keys)
+          case None => (handlers, functions)
+        }
+
+      val (handlers2, functions2) =
+        try {
+          val new_handler = Class.forName(name).newInstance.asInstanceOf[Protocol_Handler]
+          val new_functions =
+            for ((a, f) <- new_handler.functions.toList) yield
+              (a, (output: Isabelle_Process.Output) => f(prover, output))
+
+          val dups = for ((a, _) <- new_functions if functions1.isDefinedAt(a)) yield a
+          if (!dups.isEmpty) error("Duplicate protocol functions: " + commas_quote(dups))
+
+          (handlers1 + (name -> new_handler), functions1 ++ new_functions)
+        }
+        catch {
+          case exn: Throwable =>
+            System.err.println("Failed to initialize protocol handler: " +
+              name + "\n" + Exn.message(exn))
+            (handlers1, functions1)
+        }
+
+      new Protocol_Handlers(handlers2, functions2)
+    }
+
+    def invoke(output: Isabelle_Process.Output): Boolean =
+      output.properties match {
+        case Markup.Function(a) if functions.isDefinedAt(a) =>
+          try { functions(a)(output) }
+          catch {
+            case exn: Throwable =>
+              System.err.println("Failed invocation of protocol function: " +
+                quote(a) + "\n" + Exn.message(exn))
+            false
+          }
+        case _ => false
+      }
+  }
 }
 
 
@@ -176,16 +238,15 @@
     previous: Document.Version,
     version: Document.Version)
   private case class Messages(msgs: List[Isabelle_Process.Message])
-  private case class Finished_Scala(id: String, tag: Invoke_Scala.Tag.Value, result: String)
   private case class Update_Options(options: Options)
 
   private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true)
   {
     val this_actor = self
 
-    var prune_next = System.currentTimeMillis() + prune_delay.ms
+    var protocol_handlers = new Session.Protocol_Handlers()
 
-    var futures = Map.empty[String, java.util.concurrent.Future[Unit]]
+    var prune_next = System.currentTimeMillis() + prune_delay.ms
 
 
     /* buffered prover messages */
@@ -222,7 +283,7 @@
       def cancel() { timer.cancel() }
     }
 
-    var prover: Option[Isabelle_Process with Protocol] = None
+    var prover: Option[Session.Prover] = None
 
 
     /* delayed command changes */
@@ -318,73 +379,68 @@
         }
       }
 
-      output.properties match {
+      if (output.is_protocol) {
+        val handled = protocol_handlers.invoke(output)
+        if (!handled) {
+          output.properties match {
+            case Markup.Protocol_Handler(name) =>
+              protocol_handlers = protocol_handlers.add(prover.get, name)
 
-        case Position.Id(state_id) if !output.is_protocol =>
-          accumulate(state_id, output.message)
-
-        case Protocol.Command_Timing(state_id, timing) if output.is_protocol =>
-          val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
-          accumulate(state_id, prover.get.xml_cache.elem(message))
+            case Protocol.Command_Timing(state_id, timing) =>
+              val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
+              accumulate(state_id, prover.get.xml_cache.elem(message))
 
-        case Markup.Assign_Execs if output.is_protocol =>
-          XML.content(output.body) match {
-            case Protocol.Assign(id, assign) =>
-              try {
-                val cmds = global_state >>> (_.assign(id, assign))
-                delay_commands_changed.invoke(true, cmds)
+            case Markup.Assign_Execs =>
+              XML.content(output.body) match {
+                case Protocol.Assign(id, assign) =>
+                  try {
+                    val cmds = global_state >>> (_.assign(id, assign))
+                    delay_commands_changed.invoke(true, cmds)
+                  }
+                  catch { case _: Document.State.Fail => bad_output() }
+                case _ => bad_output()
+              }
+              // FIXME separate timeout event/message!?
+              if (prover.isDefined && System.currentTimeMillis() > prune_next) {
+                val old_versions = global_state >>> (_.prune_history(prune_size))
+                if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
+                prune_next = System.currentTimeMillis() + prune_delay.ms
               }
-              catch { case _: Document.State.Fail => bad_output() }
+
+            case Markup.Removed_Versions =>
+              XML.content(output.body) match {
+                case Protocol.Removed(removed) =>
+                  try {
+                    global_state >> (_.removed_versions(removed))
+                  }
+                  catch { case _: Document.State.Fail => bad_output() }
+                case _ => bad_output()
+              }
+
+            case Markup.ML_Statistics(props) =>
+              statistics.event(Session.Statistics(props))
+
+            case Markup.Task_Statistics(props) =>
+              // FIXME
+
             case _ => bad_output()
           }
-          // FIXME separate timeout event/message!?
-          if (prover.isDefined && System.currentTimeMillis() > prune_next) {
-            val old_versions = global_state >>> (_.prune_history(prune_size))
-            if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
-            prune_next = System.currentTimeMillis() + prune_delay.ms
-          }
-
-        case Markup.Removed_Versions if output.is_protocol =>
-          XML.content(output.body) match {
-            case Protocol.Removed(removed) =>
-              try {
-                global_state >> (_.removed_versions(removed))
-              }
-              catch { case _: Document.State.Fail => bad_output() }
-            case _ => bad_output()
-          }
+        }
+      }
+      else {
+        output.properties match {
+          case Position.Id(state_id) =>
+            accumulate(state_id, output.message)
 
-        case Markup.Invoke_Scala(name, id) if output.is_protocol =>
-          futures += (id ->
-            default_thread_pool.submit(() =>
-              {
-                val arg = XML.content(output.body)
-                val (tag, result) = Invoke_Scala.method(name, arg)
-                this_actor ! Finished_Scala(id, tag, result)
-              }))
+          case _ if output.is_init =>
+            phase = Session.Ready
 
-        case Markup.Cancel_Scala(id) if output.is_protocol =>
-          futures.get(id) match {
-            case Some(future) =>
-              future.cancel(true)
-              this_actor ! Finished_Scala(id, Invoke_Scala.Tag.INTERRUPT, "")
-            case None =>
-          }
-
-        case Markup.ML_Statistics(props) if output.is_protocol =>
-          statistics.event(Session.Statistics(props))
+          case Markup.Return_Code(rc) if output.is_exit =>
+            if (rc == 0) phase = Session.Inactive
+            else phase = Session.Failed
 
-        case Markup.Task_Statistics(props) if output.is_protocol =>
-          // FIXME
-
-        case _ if output.is_init =>
-          phase = Session.Ready
-
-        case Markup.Return_Code(rc) if output.is_exit =>
-          if (rc == 0) phase = Session.Inactive
-          else phase = Session.Failed
-
-        case _ => bad_output()
+          case _ => bad_output()
+        }
       }
     }
     //}}}
@@ -455,12 +511,6 @@
         if prover.isDefined && global_state().is_assigned(change.previous) =>
           handle_change(change)
 
-        case Finished_Scala(id, tag, result) if prover.isDefined =>
-          if (futures.isDefinedAt(id)) {
-            prover.get.invoke_scala(id, tag, result)
-            futures -= id
-          }
-
         case bad if !bad.isInstanceOf[Change] =>
           System.err.println("session_actor: ignoring bad message " + bad)
       }