# HG changeset patch # User wenzelm # Date 1398344533 -7200 # Node ID c84bf6f63dfe0186f0221cab954813b4c1c36d0c # Parent 60ad80f5cb622321a7dd1f390e6ad49a0aaa6512 clarified command_input: Consumer_Thread; diff -r 60ad80f5cb62 -r c84bf6f63dfe src/Pure/System/isabelle_process.scala --- a/src/Pure/System/isabelle_process.scala Thu Apr 24 14:59:46 2014 +0200 +++ b/src/Pure/System/isabelle_process.scala Thu Apr 24 15:02:13 2014 +0200 @@ -55,22 +55,6 @@ } - /* command input actor */ - - @volatile private var command_input: (Thread, Actor) = null - - private case class Input_Chunks(chunks: List[Bytes]) - - private case object Close - private def close_input() - { - if (command_input != null && command_input._1.isAlive) { - command_input._2 ! Close - command_input._1.join - } - } - - /** process manager **/ @@ -126,15 +110,14 @@ else { val (command_stream, message_stream) = system_channel.rendezvous() + command_input_init(command_stream) val stdout = physical_output(false) val stderr = physical_output(true) val message = message_output(message_stream) - command_input = input_actor(command_stream) - val rc = process_result.join system_output("process terminated") - close_input() + command_input_close() for (thread <- List(stdout, stderr, message)) thread.join system_output("process_manager terminated") exit_message(rc) @@ -155,7 +138,7 @@ def terminate() { - close_input() + command_input_close() system_output("Terminating Isabelle process") terminate_process() } @@ -164,6 +147,36 @@ /** stream actors **/ + /* command input */ + + private var command_input: Option[Consumer_Thread[List[Bytes]]] = None + + private def command_input_close(): Unit = command_input.foreach(_.shutdown) + + private def command_input_init(raw_stream: OutputStream) + { + val name = "command_input" + val stream = new BufferedOutputStream(raw_stream) + command_input = + Some( + Consumer_Thread.fork(name)( + consume = + { + case chunks => + try { + Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream) + chunks.foreach(_.write(stream)) + stream.flush + true + } + catch { case e: IOException => system_output(name + ": " + e.getMessage); false } + }, + finish = { case () => stream.close; system_output(name + " terminated") } + ) + ) + } + + /* physical output */ private def physical_output(err: Boolean): Thread = @@ -202,36 +215,6 @@ } - /* command input */ - - private def input_actor(raw_stream: OutputStream): (Thread, Actor) = - { - val name = "command_input" - Simple_Thread.actor(name) { - try { - val stream = new BufferedOutputStream(raw_stream) - var finished = false - while (!finished) { - //{{{ - receive { - case Input_Chunks(chunks) => - Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream) - chunks.foreach(_.write(stream)) - stream.flush - case Close => - stream.close - finished = true - case bad => System.err.println(name + ": ignoring bad message " + bad) - } - //}}} - } - } - catch { case e: IOException => system_output(name + ": " + e.getMessage) } - system_output(name + " terminated") - } - } - - /* message output */ private def message_output(stream: InputStream): Thread = @@ -328,7 +311,10 @@ /** protocol commands **/ def protocol_command_bytes(name: String, args: Bytes*): Unit = - command_input._2 ! Input_Chunks(Bytes(name) :: args.toList) + command_input match { + case Some(thread) => thread.send(Bytes(name) :: args.toList) + case None => error("Uninitialized command input thread") + } def protocol_command(name: String, args: String*) {