# HG changeset patch # User wenzelm # Date 1322252951 -3600 # Node ID d9cf3520083c5c3b7d4ada24f6a7dc737a77987e # Parent b3dce535960f75e6c94e0c6f156d5717393de9be explicit change_parser thread, which avoids undirected Future.fork with its tendency towards hundreds of worker threads; diff -r b3dce535960f -r d9cf3520083c src/Pure/System/event_bus.scala --- a/src/Pure/System/event_bus.scala Fri Nov 25 21:27:16 2011 +0100 +++ b/src/Pure/System/event_bus.scala Fri Nov 25 21:29:11 2011 +0100 @@ -32,29 +32,4 @@ /* event invocation */ def event(x: Event) { synchronized { receivers.foreach(_ ! x) } } - - - /* await global condition -- triggered via bus events */ - - def await(cond: => Boolean) - { - case object Wait - val a = new Actor { - def act { - if (cond) react { case Wait => reply(()); exit(Wait) } - else { - loop { - react { - case trigger if trigger != Wait => - if (cond) { react { case Wait => reply(()); exit(Wait) } } - } - } - } - } - } - this += a - a.start - a !? Wait - this -= a - } } diff -r b3dce535960f -r d9cf3520083c src/Pure/System/session.scala --- a/src/Pure/System/session.scala Fri Nov 25 21:27:16 2011 +0100 +++ b/src/Pure/System/session.scala Fri Nov 25 21:29:11 2011 +0100 @@ -23,7 +23,6 @@ //{{{ case object Global_Settings case object Caret_Focus - case object Assignment case class Commands_Changed(nodes: Set[Document.Node.Name], commands: Set[Command]) sealed abstract class Phase @@ -53,7 +52,6 @@ val global_settings = new Event_Bus[Session.Global_Settings.type] val caret_focus = new Event_Bus[Session.Caret_Focus.type] - val assignments = new Event_Bus[Session.Assignment.type] val commands_changed = new Event_Bus[Session.Commands_Changed] val phase_changed = new Event_Bus[Session.Phase] val syslog_messages = new Event_Bus[Isabelle_Process.Result] @@ -82,6 +80,35 @@ //}}} + /** pipelined change parsing **/ + + //{{{ + private case class Text_Edits( + syntax: Outer_Syntax, + name: Document.Node.Name, + previous: Future[Document.Version], + text_edits: List[Document.Edit_Text], + version_result: Promise[Document.Version]) + + private val (_, change_parser) = Simple_Thread.actor("change_parser", daemon = true) + { + var finished = false + while (!finished) { + receive { + case Stop => finished = true; reply(()) + + case Text_Edits(syntax, name, previous, text_edits, version_result) => + val prev = previous.get_finished + val (doc_edits, version) = Thy_Syntax.text_edits(syntax, prev, text_edits) + version_result.fulfill(version) + sender ! Change_Node(name, doc_edits, prev, version) + + case bad => System.err.println("change_parser: ignoring bad message " + bad) + } + } + } + //}}} + /** main protocol actor **/ @@ -258,15 +285,10 @@ prover.get.cancel_execution() val text_edits = header_edit(name, header) :: edits.map(edit => (name, edit)) - val result = Future.fork { Thy_Syntax.text_edits(syntax, previous.join, text_edits) } - val change = - global_state.change_yield(_.continue_history(previous, text_edits, result.map(_._2))) + val version = Future.promise[Document.Version] + val change = global_state.change_yield(_.continue_history(previous, text_edits, version)) - result.map { - case (doc_edits, _) => - assignments.await { global_state().is_assigned(previous.get_finished) } - this_actor ! Change_Node(name, doc_edits, previous.join, change.version.join) - } + change_parser ! Text_Edits(syntax, name, previous, text_edits, version) } //}}} @@ -278,7 +300,6 @@ { val cmds = global_state.change_yield(_.assign(id, assign)) for (cmd <- cmds) commands_changed_delay.invoke(cmd) - assignments.event(Session.Assignment) } //}}} @@ -444,9 +465,6 @@ List(Document.Node.Edits(text_edits), Document.Node.Perspective(perspective))) reply(()) - case change: Change_Node if prover.isDefined => - handle_change(change) - case Messages(msgs) => msgs foreach { case input: Isabelle_Process.Input => @@ -459,7 +477,12 @@ raw_messages.event(result) } - case bad => System.err.println("session_actor: ignoring bad message " + bad) + case change: Change_Node + if prover.isDefined && global_state().is_assigned(change.previous) => + handle_change(change) + + case bad if !bad.isInstanceOf[Change_Node] => + System.err.println("session_actor: ignoring bad message " + bad) } } //}}} @@ -473,7 +496,7 @@ def start(args: List[String]) { start (Time.seconds(25), args) } - def stop() { commands_changed_buffer !? Stop; session_actor !? Stop } + def stop() { commands_changed_buffer !? Stop; change_parser !? Stop; session_actor !? Stop } def cancel_execution() { session_actor ! Cancel_Execution }