explicit change_parser thread, which avoids undirected Future.fork with its tendency towards hundreds of worker threads;
--- a/src/Pure/System/event_bus.scala Fri Nov 25 21:27:16 2011 +0100
+++ b/src/Pure/System/event_bus.scala Fri Nov 25 21:29:11 2011 +0100
@@ -32,29 +32,4 @@
/* event invocation */
def event(x: Event) { synchronized { receivers.foreach(_ ! x) } }
-
-
- /* await global condition -- triggered via bus events */
-
- def await(cond: => Boolean)
- {
- case object Wait
- val a = new Actor {
- def act {
- if (cond) react { case Wait => reply(()); exit(Wait) }
- else {
- loop {
- react {
- case trigger if trigger != Wait =>
- if (cond) { react { case Wait => reply(()); exit(Wait) } }
- }
- }
- }
- }
- }
- this += a
- a.start
- a !? Wait
- this -= a
- }
}
--- a/src/Pure/System/session.scala Fri Nov 25 21:27:16 2011 +0100
+++ b/src/Pure/System/session.scala Fri Nov 25 21:29:11 2011 +0100
@@ -23,7 +23,6 @@
//{{{
case object Global_Settings
case object Caret_Focus
- case object Assignment
case class Commands_Changed(nodes: Set[Document.Node.Name], commands: Set[Command])
sealed abstract class Phase
@@ -53,7 +52,6 @@
val global_settings = new Event_Bus[Session.Global_Settings.type]
val caret_focus = new Event_Bus[Session.Caret_Focus.type]
- val assignments = new Event_Bus[Session.Assignment.type]
val commands_changed = new Event_Bus[Session.Commands_Changed]
val phase_changed = new Event_Bus[Session.Phase]
val syslog_messages = new Event_Bus[Isabelle_Process.Result]
@@ -82,6 +80,35 @@
//}}}
+ /** pipelined change parsing **/
+
+ //{{{
+ private case class Text_Edits(
+ syntax: Outer_Syntax,
+ name: Document.Node.Name,
+ previous: Future[Document.Version],
+ text_edits: List[Document.Edit_Text],
+ version_result: Promise[Document.Version])
+
+ private val (_, change_parser) = Simple_Thread.actor("change_parser", daemon = true)
+ {
+ var finished = false
+ while (!finished) {
+ receive {
+ case Stop => finished = true; reply(())
+
+ case Text_Edits(syntax, name, previous, text_edits, version_result) =>
+ val prev = previous.get_finished
+ val (doc_edits, version) = Thy_Syntax.text_edits(syntax, prev, text_edits)
+ version_result.fulfill(version)
+ sender ! Change_Node(name, doc_edits, prev, version)
+
+ case bad => System.err.println("change_parser: ignoring bad message " + bad)
+ }
+ }
+ }
+ //}}}
+
/** main protocol actor **/
@@ -258,15 +285,10 @@
prover.get.cancel_execution()
val text_edits = header_edit(name, header) :: edits.map(edit => (name, edit))
- val result = Future.fork { Thy_Syntax.text_edits(syntax, previous.join, text_edits) }
- val change =
- global_state.change_yield(_.continue_history(previous, text_edits, result.map(_._2)))
+ val version = Future.promise[Document.Version]
+ val change = global_state.change_yield(_.continue_history(previous, text_edits, version))
- result.map {
- case (doc_edits, _) =>
- assignments.await { global_state().is_assigned(previous.get_finished) }
- this_actor ! Change_Node(name, doc_edits, previous.join, change.version.join)
- }
+ change_parser ! Text_Edits(syntax, name, previous, text_edits, version)
}
//}}}
@@ -278,7 +300,6 @@
{
val cmds = global_state.change_yield(_.assign(id, assign))
for (cmd <- cmds) commands_changed_delay.invoke(cmd)
- assignments.event(Session.Assignment)
}
//}}}
@@ -444,9 +465,6 @@
List(Document.Node.Edits(text_edits), Document.Node.Perspective(perspective)))
reply(())
- case change: Change_Node if prover.isDefined =>
- handle_change(change)
-
case Messages(msgs) =>
msgs foreach {
case input: Isabelle_Process.Input =>
@@ -459,7 +477,12 @@
raw_messages.event(result)
}
- case bad => System.err.println("session_actor: ignoring bad message " + bad)
+ case change: Change_Node
+ if prover.isDefined && global_state().is_assigned(change.previous) =>
+ handle_change(change)
+
+ case bad if !bad.isInstanceOf[Change_Node] =>
+ System.err.println("session_actor: ignoring bad message " + bad)
}
}
//}}}
@@ -473,7 +496,7 @@
def start(args: List[String]) { start (Time.seconds(25), args) }
- def stop() { commands_changed_buffer !? Stop; session_actor !? Stop }
+ def stop() { commands_changed_buffer !? Stop; change_parser !? Stop; session_actor !? Stop }
def cancel_execution() { session_actor ! Cancel_Execution }