# HG changeset patch # User wenzelm # Date 1369224645 -7200 # Node ID 1fd184eaa3107593ad44009ae7461327b4a9a825 # Parent 06db08182c4bb59bc6203924529c938a033a313e explicit management of Session.Protocol_Handlers, with protocol state and functions; more self-contained ML/Scala module Invoke_Scala; diff -r 06db08182c4b -r 1fd184eaa310 src/Pure/PIDE/markup.ML --- a/src/Pure/PIDE/markup.ML Wed May 22 08:46:39 2013 +0200 +++ b/src/Pure/PIDE/markup.ML Wed May 22 14:10:45 2013 +0200 @@ -142,6 +142,7 @@ val functionN: string val assign_execs: Properties.T val removed_versions: Properties.T + val protocol_handler: string -> Properties.T val invoke_scala: string -> string -> Properties.T val cancel_scala: string -> Properties.T val ML_statistics: Properties.entry @@ -461,6 +462,8 @@ val assign_execs = [(functionN, "assign_execs")]; val removed_versions = [(functionN, "removed_versions")]; +fun protocol_handler name = [(functionN, "protocol_handler"), (nameN, name)]; + fun invoke_scala name id = [(functionN, "invoke_scala"), (nameN, name), (idN, id)]; fun cancel_scala id = [(functionN, "cancel_scala"), (idN, id)]; diff -r 06db08182c4b -r 1fd184eaa310 src/Pure/PIDE/markup.scala --- a/src/Pure/PIDE/markup.scala Wed May 22 08:46:39 2013 +0200 +++ b/src/Pure/PIDE/markup.scala Wed May 22 14:10:45 2013 +0200 @@ -316,19 +316,31 @@ val Assign_Execs: Properties.T = List((FUNCTION, "assign_execs")) val Removed_Versions: Properties.T = List((FUNCTION, "removed_versions")) + object Protocol_Handler + { + def unapply(props: Properties.T): Option[(String)] = + props match { + case List((FUNCTION, "protocol_handler"), (NAME, name)) => Some(name) + case _ => None + } + } + + val INVOKE_SCALA = "invoke_scala" object Invoke_Scala { def unapply(props: Properties.T): Option[(String, String)] = props match { - case List((FUNCTION, "invoke_scala"), (NAME, name), (ID, id)) => Some((name, id)) + case List((FUNCTION, INVOKE_SCALA), (NAME, name), (ID, id)) => Some((name, id)) case _ => None } } + + val CANCEL_SCALA = "cancel_scala" object Cancel_Scala { def unapply(props: Properties.T): Option[String] = props match { - case List((FUNCTION, "cancel_scala"), (ID, id)) => Some(id) + case List((FUNCTION, CANCEL_SCALA), (ID, id)) => Some(id) case _ => None } } diff -r 06db08182c4b -r 1fd184eaa310 src/Pure/PIDE/protocol.ML --- a/src/Pure/PIDE/protocol.ML Wed May 22 08:46:39 2013 +0200 +++ b/src/Pure/PIDE/protocol.ML Wed May 22 14:10:45 2013 +0200 @@ -78,11 +78,5 @@ Active.dialog_result (Markup.parse_int serial) result handle exn => if Exn.is_interrupt exn then () else reraise exn); -val _ = - Isabelle_Process.protocol_command "Document.invoke_scala" - (fn [id, tag, res] => - Invoke_Scala.fulfill_method id tag res - handle exn => if Exn.is_interrupt exn then () else reraise exn); - end; diff -r 06db08182c4b -r 1fd184eaa310 src/Pure/PIDE/protocol.scala --- a/src/Pure/PIDE/protocol.scala Wed May 22 08:46:39 2013 +0200 +++ b/src/Pure/PIDE/protocol.scala Wed May 22 14:10:45 2013 +0200 @@ -364,12 +364,4 @@ { input("Document.dialog_result", Properties.Value.Long(serial), result) } - - - /* method invocation service */ - - def invoke_scala(id: String, tag: Invoke_Scala.Tag.Value, res: String) - { - input("Document.invoke_scala", id, tag.toString, res) - } } diff -r 06db08182c4b -r 1fd184eaa310 src/Pure/System/invoke_scala.ML --- a/src/Pure/System/invoke_scala.ML Wed May 22 08:46:39 2013 +0200 +++ b/src/Pure/System/invoke_scala.ML Wed May 22 14:10:45 2013 +0200 @@ -6,16 +6,15 @@ signature INVOKE_SCALA = sig - exception Null val method: string -> string -> string val promise_method: string -> string -> string future - val fulfill_method: string -> string -> string -> unit + exception Null end; structure Invoke_Scala: INVOKE_SCALA = struct -exception Null; +val _ = Session.protocol_handler "isabelle.Invoke_Scala"; (* pending promises *) @@ -40,9 +39,11 @@ fun method name arg = Future.join (promise_method name arg); -(* fulfill method *) +(* fulfill *) -fun fulfill_method id tag res = +exception Null; + +fun fulfill id tag res = let val result = (case tag of @@ -58,5 +59,11 @@ val _ = Future.fulfill_result promise result; in () end; +val _ = + Isabelle_Process.protocol_command "Invoke_Scala.fulfill" + (fn [id, tag, res] => + fulfill id tag res + handle exn => if Exn.is_interrupt exn then () else reraise exn); + end; diff -r 06db08182c4b -r 1fd184eaa310 src/Pure/System/invoke_scala.scala --- a/src/Pure/System/invoke_scala.scala Wed May 22 08:46:39 2013 +0200 +++ b/src/Pure/System/invoke_scala.scala Wed May 22 14:10:45 2013 +0200 @@ -64,3 +64,69 @@ case Exn.Exn(e) => (Tag.FAIL, Exn.message(e)) } } + + +/* protocol handler */ + +class Invoke_Scala extends Session.Protocol_Handler +{ + private var futures = Map.empty[String, java.util.concurrent.Future[Unit]] + + private def fulfill(prover: Session.Prover, + id: String, tag: Invoke_Scala.Tag.Value, res: String): Unit = synchronized + { + if (futures.isDefinedAt(id)) { + prover.input("Invoke_Scala.fulfill", id, tag.toString, res) + futures -= id + } + } + + private def cancel(prover: Session.Prover, + id: String, future: java.util.concurrent.Future[Unit]) + { + future.cancel(true) + fulfill(prover, id, Invoke_Scala.Tag.INTERRUPT, "") + } + + private def invoke_scala( + prover: Session.Prover, output: Isabelle_Process.Output): Boolean = synchronized + { + output.properties match { + case Markup.Invoke_Scala(name, id) => + futures += (id -> + default_thread_pool.submit(() => + { + val arg = XML.content(output.body) + val (tag, result) = Invoke_Scala.method(name, arg) + fulfill(prover, id, tag, result) + })) + true + case _ => false + } + } + + private def cancel_scala( + prover: Session.Prover, output: Isabelle_Process.Output): Boolean = synchronized + { + output.properties match { + case Markup.Cancel_Scala(id) => + futures.get(id) match { + case Some(future) => cancel(prover, id, future) + case None => + } + true + case _ => false + } + } + + override def stop(prover: Session.Prover): Unit = synchronized + { + for ((id, future) <- futures) cancel(prover, id, future) + futures = Map.empty + } + + val functions = Map( + Markup.INVOKE_SCALA -> invoke_scala _, + Markup.CANCEL_SCALA -> cancel_scala _) +} + diff -r 06db08182c4b -r 1fd184eaa310 src/Pure/System/isabelle_process.ML --- a/src/Pure/System/isabelle_process.ML Wed May 22 08:46:39 2013 +0200 +++ b/src/Pure/System/isabelle_process.ML Wed May 22 14:10:45 2013 +0200 @@ -221,6 +221,7 @@ val channel = rendezvous (); val _ = init_channels channel; + val _ = Session.init_protocol_handlers (); in loop channel end)); fun init_fifos fifo1 fifo2 = init (fn () => System_Channel.fifo_rendezvous fifo1 fifo2); diff -r 06db08182c4b -r 1fd184eaa310 src/Pure/System/session.ML --- a/src/Pure/System/session.ML Wed May 22 08:46:39 2013 +0200 +++ b/src/Pure/System/session.ML Wed May 22 14:10:45 2013 +0200 @@ -1,7 +1,7 @@ (* Title: Pure/System/session.ML Author: Makarius -Session management -- internal state of logic images (not thread-safe). +Session management -- internal state of logic images. *) signature SESSION = @@ -11,12 +11,14 @@ val init: bool -> bool -> Path.T -> string -> bool -> string -> (string * string) list -> string -> string * string -> bool * string -> bool -> unit val finish: unit -> unit + val protocol_handler: string -> unit + val init_protocol_handlers: unit -> unit end; structure Session: SESSION = struct -(* session state *) +(** session identification -- not thread-safe **) val session = Unsynchronized.ref {chapter = "Pure", name = "Pure"}; val session_finished = Unsynchronized.ref false; @@ -58,4 +60,20 @@ Event_Timer.shutdown (); session_finished := true); + +(** protocol handlers **) + +val protocol_handlers = Synchronized.var "protocol_handlers" ([]: string list); + +fun protocol_handler name = + Synchronized.change protocol_handlers (fn handlers => + (Output.try_protocol_message (Markup.protocol_handler name) ""; + if not (member (op =) handlers name) then () + else warning ("Redefining protocol handler: " ^ quote name); + update (op =) name handlers)); + +fun init_protocol_handlers () = + Synchronized.value protocol_handlers + |> List.app (fn name => Output.try_protocol_message (Markup.protocol_handler name) ""); + end; diff -r 06db08182c4b -r 1fd184eaa310 src/Pure/System/session.scala --- a/src/Pure/System/session.scala Wed May 22 08:46:39 2013 +0200 +++ b/src/Pure/System/session.scala Wed May 22 14:10:45 2013 +0200 @@ -37,6 +37,68 @@ case object Ready extends Phase case object Shutdown extends Phase // transient //}}} + + + /* protocol handlers */ + + type Prover = Isabelle_Process with Protocol + + abstract class Protocol_Handler + { + def stop(prover: Prover): Unit = {} + val functions: Map[String, (Prover, Isabelle_Process.Output) => Boolean] + } + + class Protocol_Handlers( + handlers: Map[String, Session.Protocol_Handler] = Map.empty, + functions: Map[String, Isabelle_Process.Output => Boolean] = Map.empty) + { + def add(prover: Prover, name: String): Protocol_Handlers = + { + val (handlers1, functions1) = + handlers.get(name) match { + case Some(old_handler) => + System.err.println("Redefining protocol handler: " + name) + old_handler.stop(prover) + (handlers - name, functions -- old_handler.functions.keys) + case None => (handlers, functions) + } + + val (handlers2, functions2) = + try { + val new_handler = Class.forName(name).newInstance.asInstanceOf[Protocol_Handler] + val new_functions = + for ((a, f) <- new_handler.functions.toList) yield + (a, (output: Isabelle_Process.Output) => f(prover, output)) + + val dups = for ((a, _) <- new_functions if functions1.isDefinedAt(a)) yield a + if (!dups.isEmpty) error("Duplicate protocol functions: " + commas_quote(dups)) + + (handlers1 + (name -> new_handler), functions1 ++ new_functions) + } + catch { + case exn: Throwable => + System.err.println("Failed to initialize protocol handler: " + + name + "\n" + Exn.message(exn)) + (handlers1, functions1) + } + + new Protocol_Handlers(handlers2, functions2) + } + + def invoke(output: Isabelle_Process.Output): Boolean = + output.properties match { + case Markup.Function(a) if functions.isDefinedAt(a) => + try { functions(a)(output) } + catch { + case exn: Throwable => + System.err.println("Failed invocation of protocol function: " + + quote(a) + "\n" + Exn.message(exn)) + false + } + case _ => false + } + } } @@ -176,16 +238,15 @@ previous: Document.Version, version: Document.Version) private case class Messages(msgs: List[Isabelle_Process.Message]) - private case class Finished_Scala(id: String, tag: Invoke_Scala.Tag.Value, result: String) private case class Update_Options(options: Options) private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true) { val this_actor = self - var prune_next = System.currentTimeMillis() + prune_delay.ms + var protocol_handlers = new Session.Protocol_Handlers() - var futures = Map.empty[String, java.util.concurrent.Future[Unit]] + var prune_next = System.currentTimeMillis() + prune_delay.ms /* buffered prover messages */ @@ -222,7 +283,7 @@ def cancel() { timer.cancel() } } - var prover: Option[Isabelle_Process with Protocol] = None + var prover: Option[Session.Prover] = None /* delayed command changes */ @@ -318,73 +379,68 @@ } } - output.properties match { + if (output.is_protocol) { + val handled = protocol_handlers.invoke(output) + if (!handled) { + output.properties match { + case Markup.Protocol_Handler(name) => + protocol_handlers = protocol_handlers.add(prover.get, name) - case Position.Id(state_id) if !output.is_protocol => - accumulate(state_id, output.message) - - case Protocol.Command_Timing(state_id, timing) if output.is_protocol => - val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil))) - accumulate(state_id, prover.get.xml_cache.elem(message)) + case Protocol.Command_Timing(state_id, timing) => + val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil))) + accumulate(state_id, prover.get.xml_cache.elem(message)) - case Markup.Assign_Execs if output.is_protocol => - XML.content(output.body) match { - case Protocol.Assign(id, assign) => - try { - val cmds = global_state >>> (_.assign(id, assign)) - delay_commands_changed.invoke(true, cmds) + case Markup.Assign_Execs => + XML.content(output.body) match { + case Protocol.Assign(id, assign) => + try { + val cmds = global_state >>> (_.assign(id, assign)) + delay_commands_changed.invoke(true, cmds) + } + catch { case _: Document.State.Fail => bad_output() } + case _ => bad_output() + } + // FIXME separate timeout event/message!? + if (prover.isDefined && System.currentTimeMillis() > prune_next) { + val old_versions = global_state >>> (_.prune_history(prune_size)) + if (!old_versions.isEmpty) prover.get.remove_versions(old_versions) + prune_next = System.currentTimeMillis() + prune_delay.ms } - catch { case _: Document.State.Fail => bad_output() } + + case Markup.Removed_Versions => + XML.content(output.body) match { + case Protocol.Removed(removed) => + try { + global_state >> (_.removed_versions(removed)) + } + catch { case _: Document.State.Fail => bad_output() } + case _ => bad_output() + } + + case Markup.ML_Statistics(props) => + statistics.event(Session.Statistics(props)) + + case Markup.Task_Statistics(props) => + // FIXME + case _ => bad_output() } - // FIXME separate timeout event/message!? - if (prover.isDefined && System.currentTimeMillis() > prune_next) { - val old_versions = global_state >>> (_.prune_history(prune_size)) - if (!old_versions.isEmpty) prover.get.remove_versions(old_versions) - prune_next = System.currentTimeMillis() + prune_delay.ms - } - - case Markup.Removed_Versions if output.is_protocol => - XML.content(output.body) match { - case Protocol.Removed(removed) => - try { - global_state >> (_.removed_versions(removed)) - } - catch { case _: Document.State.Fail => bad_output() } - case _ => bad_output() - } + } + } + else { + output.properties match { + case Position.Id(state_id) => + accumulate(state_id, output.message) - case Markup.Invoke_Scala(name, id) if output.is_protocol => - futures += (id -> - default_thread_pool.submit(() => - { - val arg = XML.content(output.body) - val (tag, result) = Invoke_Scala.method(name, arg) - this_actor ! Finished_Scala(id, tag, result) - })) + case _ if output.is_init => + phase = Session.Ready - case Markup.Cancel_Scala(id) if output.is_protocol => - futures.get(id) match { - case Some(future) => - future.cancel(true) - this_actor ! Finished_Scala(id, Invoke_Scala.Tag.INTERRUPT, "") - case None => - } - - case Markup.ML_Statistics(props) if output.is_protocol => - statistics.event(Session.Statistics(props)) + case Markup.Return_Code(rc) if output.is_exit => + if (rc == 0) phase = Session.Inactive + else phase = Session.Failed - case Markup.Task_Statistics(props) if output.is_protocol => - // FIXME - - case _ if output.is_init => - phase = Session.Ready - - case Markup.Return_Code(rc) if output.is_exit => - if (rc == 0) phase = Session.Inactive - else phase = Session.Failed - - case _ => bad_output() + case _ => bad_output() + } } } //}}} @@ -455,12 +511,6 @@ if prover.isDefined && global_state().is_assigned(change.previous) => handle_change(change) - case Finished_Scala(id, tag, result) if prover.isDefined => - if (futures.isDefinedAt(id)) { - prover.get.invoke_scala(id, tag, result) - futures -= id - } - case bad if !bad.isInstanceOf[Change] => System.err.println("session_actor: ignoring bad message " + bad) }