explicit change_parser thread, which avoids undirected Future.fork with its tendency towards hundreds of worker threads;
authorwenzelm
Fri, 25 Nov 2011 21:29:11 +0100
changeset 45635 d9cf3520083c
parent 45634 b3dce535960f
child 45636 202071bb7f86
explicit change_parser thread, which avoids undirected Future.fork with its tendency towards hundreds of worker threads;
src/Pure/System/event_bus.scala
src/Pure/System/session.scala
--- a/src/Pure/System/event_bus.scala	Fri Nov 25 21:27:16 2011 +0100
+++ b/src/Pure/System/event_bus.scala	Fri Nov 25 21:29:11 2011 +0100
@@ -32,29 +32,4 @@
   /* event invocation */
 
   def event(x: Event) { synchronized { receivers.foreach(_ ! x) } }
-
-
-  /* await global condition -- triggered via bus events */
-
-  def await(cond: => Boolean)
-  {
-    case object Wait
-    val a = new Actor {
-      def act {
-        if (cond) react { case Wait => reply(()); exit(Wait) }
-        else {
-          loop {
-            react {
-              case trigger if trigger != Wait =>
-                if (cond) { react { case Wait => reply(()); exit(Wait) } }
-            }
-          }
-        }
-      }
-    }
-    this += a
-    a.start
-    a !? Wait
-    this -= a
-  }
 }
--- a/src/Pure/System/session.scala	Fri Nov 25 21:27:16 2011 +0100
+++ b/src/Pure/System/session.scala	Fri Nov 25 21:29:11 2011 +0100
@@ -23,7 +23,6 @@
   //{{{
   case object Global_Settings
   case object Caret_Focus
-  case object Assignment
   case class Commands_Changed(nodes: Set[Document.Node.Name], commands: Set[Command])
 
   sealed abstract class Phase
@@ -53,7 +52,6 @@
 
   val global_settings = new Event_Bus[Session.Global_Settings.type]
   val caret_focus = new Event_Bus[Session.Caret_Focus.type]
-  val assignments = new Event_Bus[Session.Assignment.type]
   val commands_changed = new Event_Bus[Session.Commands_Changed]
   val phase_changed = new Event_Bus[Session.Phase]
   val syslog_messages = new Event_Bus[Isabelle_Process.Result]
@@ -82,6 +80,35 @@
   //}}}
 
 
+  /** pipelined change parsing **/
+
+  //{{{
+  private case class Text_Edits(
+    syntax: Outer_Syntax,
+    name: Document.Node.Name,
+    previous: Future[Document.Version],
+    text_edits: List[Document.Edit_Text],
+    version_result: Promise[Document.Version])
+
+  private val (_, change_parser) = Simple_Thread.actor("change_parser", daemon = true)
+  {
+    var finished = false
+    while (!finished) {
+      receive {
+        case Stop => finished = true; reply(())
+
+        case Text_Edits(syntax, name, previous, text_edits, version_result) =>
+          val prev = previous.get_finished
+          val (doc_edits, version) = Thy_Syntax.text_edits(syntax, prev, text_edits)
+          version_result.fulfill(version)
+          sender ! Change_Node(name, doc_edits, prev, version)
+
+        case bad => System.err.println("change_parser: ignoring bad message " + bad)
+      }
+    }
+  }
+  //}}}
+
 
   /** main protocol actor **/
 
@@ -258,15 +285,10 @@
       prover.get.cancel_execution()
 
       val text_edits = header_edit(name, header) :: edits.map(edit => (name, edit))
-      val result = Future.fork { Thy_Syntax.text_edits(syntax, previous.join, text_edits) }
-      val change =
-        global_state.change_yield(_.continue_history(previous, text_edits, result.map(_._2)))
+      val version = Future.promise[Document.Version]
+      val change = global_state.change_yield(_.continue_history(previous, text_edits, version))
 
-      result.map {
-        case (doc_edits, _) =>
-          assignments.await { global_state().is_assigned(previous.get_finished) }
-          this_actor ! Change_Node(name, doc_edits, previous.join, change.version.join)
-      }
+      change_parser ! Text_Edits(syntax, name, previous, text_edits, version)
     }
     //}}}
 
@@ -278,7 +300,6 @@
     {
       val cmds = global_state.change_yield(_.assign(id, assign))
       for (cmd <- cmds) commands_changed_delay.invoke(cmd)
-      assignments.event(Session.Assignment)
     }
     //}}}
 
@@ -444,9 +465,6 @@
               List(Document.Node.Edits(text_edits), Document.Node.Perspective(perspective)))
           reply(())
 
-        case change: Change_Node if prover.isDefined =>
-          handle_change(change)
-
         case Messages(msgs) =>
           msgs foreach {
             case input: Isabelle_Process.Input =>
@@ -459,7 +477,12 @@
               raw_messages.event(result)
           }
 
-        case bad => System.err.println("session_actor: ignoring bad message " + bad)
+        case change: Change_Node
+        if prover.isDefined && global_state().is_assigned(change.previous) =>
+          handle_change(change)
+
+        case bad if !bad.isInstanceOf[Change_Node] =>
+          System.err.println("session_actor: ignoring bad message " + bad)
       }
     }
     //}}}
@@ -473,7 +496,7 @@
 
   def start(args: List[String]) { start (Time.seconds(25), args) }
 
-  def stop() { commands_changed_buffer !? Stop; session_actor !? Stop }
+  def stop() { commands_changed_buffer !? Stop; change_parser !? Stop; session_actor !? Stop }
 
   def cancel_execution() { session_actor ! Cancel_Execution }