simplified -- prefer Consumer_Thread over Actor;
authorwenzelm
Thu, 24 Apr 2014 16:52:17 +0200
changeset 56704 c2f0ddd14747
parent 56703 2d0ca179e749
child 56705 937826d702d5
simplified -- prefer Consumer_Thread over Actor;
src/Pure/Concurrent/simple_thread.scala
src/Pure/PIDE/session.scala
--- a/src/Pure/Concurrent/simple_thread.scala	Thu Apr 24 16:47:47 2014 +0200
+++ b/src/Pure/Concurrent/simple_thread.scala	Thu Apr 24 16:52:17 2014 +0200
@@ -46,11 +46,11 @@
 
   /* thread as actor */
 
-  def actor(name: String, daemon: Boolean = false)(body: => Unit): (Thread, Actor) =
+  def actor(name: String, daemon: Boolean = false)(body: => Unit): Actor =
   {
     val actor = Future.promise[Actor]
     val thread = fork(name, daemon) { actor.fulfill(Actor.self); body }
-    (thread, actor.join)
+    actor.join
   }
 }
 
--- a/src/Pure/PIDE/session.scala	Thu Apr 24 16:47:47 2014 +0200
+++ b/src/Pure/PIDE/session.scala	Thu Apr 24 16:52:17 2014 +0200
@@ -12,7 +12,7 @@
 
 import scala.collection.mutable
 import scala.collection.immutable.Queue
-import scala.actors.TIMEOUT
+import scala.actors.{Actor, TIMEOUT}
 import scala.actors.Actor._
 
 
@@ -152,54 +152,33 @@
 
   /** buffered command changes (delay_first discipline) **/
 
-  //{{{
-  private case object Stop
-
-  private val (_, commands_changed_buffer) =
-    Simple_Thread.actor("commands_changed_buffer", daemon = true)
+  private val commands_changed_buffer =
+    Consumer_Thread.fork[Session.Commands_Changed]("commands_changed_buffer", daemon = true)
   {
-    var finished = false
-    while (!finished) {
-      receive {
-        case Stop => finished = true; reply(())
-        case changed: Session.Commands_Changed => commands_changed.event(changed)
-        case bad => System.err.println("commands_changed_buffer: ignoring bad message " + bad)
-      }
-    }
+    case changed => commands_changed.event(changed); true
   }
-  //}}}
 
 
   /** pipelined change parsing **/
 
-  //{{{
   private case class Text_Edits(
     previous: Future[Document.Version],
     doc_blobs: Document.Blobs,
     text_edits: List[Document.Edit_Text],
     version_result: Promise[Document.Version])
 
-  private val (_, change_parser) = Simple_Thread.actor("change_parser", daemon = true)
+  private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true)
   {
-    var finished = false
-    while (!finished) {
-      receive {
-        case Stop => finished = true; reply(())
-
-        case Text_Edits(previous, doc_blobs, text_edits, version_result) =>
-          val prev = previous.get_finished
-          val change =
-            Timing.timeit("parse_change", timing) {
-              resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
-            }
-          version_result.fulfill(change.version)
-          sender ! change
-
-        case bad => System.err.println("change_parser: ignoring bad message " + bad)
-      }
-    }
+    case Text_Edits(previous, doc_blobs, text_edits, version_result) =>
+      val prev = previous.get_finished
+      val change =
+        Timing.timeit("parse_change", timing) {
+          resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
+        }
+      version_result.fulfill(change.version)
+      session_actor ! change
+      true
   }
-  //}}}
 
 
 
@@ -255,13 +234,14 @@
 
   /* actor messages */
 
+  private case object Stop
   private case class Start(name: String, args: List[String])
   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   private case class Protocol_Command(name: String, args: List[String])
   private case class Messages(msgs: List[Prover.Message])
   private case class Update_Options(options: Options)
 
-  private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true)
+  private val session_actor: Actor = Simple_Thread.actor("session_actor", daemon = true)
   {
     val this_actor = self
 
@@ -326,8 +306,8 @@
       def flush()
       {
         if (changed_assignment || !changed_nodes.isEmpty || !changed_commands.isEmpty)
-          commands_changed_buffer !
-            Session.Commands_Changed(changed_assignment, changed_nodes, changed_commands)
+          commands_changed_buffer.send(
+            Session.Commands_Changed(changed_assignment, changed_nodes, changed_commands))
         changed_assignment = false
         changed_nodes = Set.empty
         changed_commands = Set.empty
@@ -362,7 +342,7 @@
       val change = global_state.change_result(_.continue_history(previous, edits, version))
 
       raw_edits.event(Session.Raw_Edits(doc_blobs, edits))
-      change_parser ! Text_Edits(previous, doc_blobs, edits, version)
+      change_parser.send(Text_Edits(previous, doc_blobs, edits, version))
     }
     //}}}
 
@@ -576,8 +556,8 @@
 
   def stop()
   {
-    commands_changed_buffer !? Stop
-    change_parser !? Stop
+    commands_changed_buffer.shutdown()
+    change_parser.shutdown()
     session_actor !? Stop
   }