clarified command_input: Consumer_Thread;
authorwenzelm
Thu, 24 Apr 2014 15:02:13 +0200
changeset 56700 c84bf6f63dfe
parent 56699 60ad80f5cb62
child 56701 ac5b66fa2a56
clarified command_input: Consumer_Thread;
src/Pure/System/isabelle_process.scala
--- a/src/Pure/System/isabelle_process.scala	Thu Apr 24 14:59:46 2014 +0200
+++ b/src/Pure/System/isabelle_process.scala	Thu Apr 24 15:02:13 2014 +0200
@@ -55,22 +55,6 @@
   }
 
 
-  /* command input actor */
-
-  @volatile private var command_input: (Thread, Actor) = null
-
-  private case class Input_Chunks(chunks: List[Bytes])
-
-  private case object Close
-  private def close_input()
-  {
-    if (command_input != null && command_input._1.isAlive) {
-      command_input._2 ! Close
-      command_input._1.join
-    }
-  }
-
-
 
   /** process manager **/
 
@@ -126,15 +110,14 @@
     else {
       val (command_stream, message_stream) = system_channel.rendezvous()
 
+      command_input_init(command_stream)
       val stdout = physical_output(false)
       val stderr = physical_output(true)
       val message = message_output(message_stream)
 
-      command_input = input_actor(command_stream)
-
       val rc = process_result.join
       system_output("process terminated")
-      close_input()
+      command_input_close()
       for (thread <- List(stdout, stderr, message)) thread.join
       system_output("process_manager terminated")
       exit_message(rc)
@@ -155,7 +138,7 @@
 
   def terminate()
   {
-    close_input()
+    command_input_close()
     system_output("Terminating Isabelle process")
     terminate_process()
   }
@@ -164,6 +147,36 @@
 
   /** stream actors **/
 
+  /* command input */
+
+  private var command_input: Option[Consumer_Thread[List[Bytes]]] = None
+
+  private def command_input_close(): Unit = command_input.foreach(_.shutdown)
+
+  private def command_input_init(raw_stream: OutputStream)
+  {
+    val name = "command_input"
+    val stream = new BufferedOutputStream(raw_stream)
+    command_input =
+      Some(
+        Consumer_Thread.fork(name)(
+          consume =
+            {
+              case chunks =>
+                try {
+                  Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream)
+                  chunks.foreach(_.write(stream))
+                  stream.flush
+                  true
+                }
+                catch { case e: IOException => system_output(name + ": " + e.getMessage); false }
+            },
+          finish = { case () => stream.close; system_output(name + " terminated") }
+        )
+      )
+  }
+
+
   /* physical output */
 
   private def physical_output(err: Boolean): Thread =
@@ -202,36 +215,6 @@
   }
 
 
-  /* command input */
-
-  private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
-  {
-    val name = "command_input"
-    Simple_Thread.actor(name) {
-      try {
-        val stream = new BufferedOutputStream(raw_stream)
-        var finished = false
-        while (!finished) {
-          //{{{
-          receive {
-            case Input_Chunks(chunks) =>
-              Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream)
-              chunks.foreach(_.write(stream))
-              stream.flush
-            case Close =>
-              stream.close
-              finished = true
-            case bad => System.err.println(name + ": ignoring bad message " + bad)
-          }
-          //}}}
-        }
-      }
-      catch { case e: IOException => system_output(name + ": " + e.getMessage) }
-      system_output(name + " terminated")
-    }
-  }
-
-
   /* message output */
 
   private def message_output(stream: InputStream): Thread =
@@ -328,7 +311,10 @@
   /** protocol commands **/
 
   def protocol_command_bytes(name: String, args: Bytes*): Unit =
-    command_input._2 ! Input_Chunks(Bytes(name) :: args.toList)
+    command_input match {
+      case Some(thread) => thread.send(Bytes(name) :: args.toList)
+      case None => error("Uninitialized command input thread")
+    }
 
   def protocol_command(name: String, args: String*)
   {