--- a/src/Pure/System/isabelle_process.scala Thu Apr 24 14:59:46 2014 +0200
+++ b/src/Pure/System/isabelle_process.scala Thu Apr 24 15:02:13 2014 +0200
@@ -55,22 +55,6 @@
}
- /* command input actor */
-
- @volatile private var command_input: (Thread, Actor) = null
-
- private case class Input_Chunks(chunks: List[Bytes])
-
- private case object Close
- private def close_input()
- {
- if (command_input != null && command_input._1.isAlive) {
- command_input._2 ! Close
- command_input._1.join
- }
- }
-
-
/** process manager **/
@@ -126,15 +110,14 @@
else {
val (command_stream, message_stream) = system_channel.rendezvous()
+ command_input_init(command_stream)
val stdout = physical_output(false)
val stderr = physical_output(true)
val message = message_output(message_stream)
- command_input = input_actor(command_stream)
-
val rc = process_result.join
system_output("process terminated")
- close_input()
+ command_input_close()
for (thread <- List(stdout, stderr, message)) thread.join
system_output("process_manager terminated")
exit_message(rc)
@@ -155,7 +138,7 @@
def terminate()
{
- close_input()
+ command_input_close()
system_output("Terminating Isabelle process")
terminate_process()
}
@@ -164,6 +147,36 @@
/** stream actors **/
+ /* command input */
+
+ private var command_input: Option[Consumer_Thread[List[Bytes]]] = None
+
+ private def command_input_close(): Unit = command_input.foreach(_.shutdown)
+
+ private def command_input_init(raw_stream: OutputStream)
+ {
+ val name = "command_input"
+ val stream = new BufferedOutputStream(raw_stream)
+ command_input =
+ Some(
+ Consumer_Thread.fork(name)(
+ consume =
+ {
+ case chunks =>
+ try {
+ Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream)
+ chunks.foreach(_.write(stream))
+ stream.flush
+ true
+ }
+ catch { case e: IOException => system_output(name + ": " + e.getMessage); false }
+ },
+ finish = { case () => stream.close; system_output(name + " terminated") }
+ )
+ )
+ }
+
+
/* physical output */
private def physical_output(err: Boolean): Thread =
@@ -202,36 +215,6 @@
}
- /* command input */
-
- private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
- {
- val name = "command_input"
- Simple_Thread.actor(name) {
- try {
- val stream = new BufferedOutputStream(raw_stream)
- var finished = false
- while (!finished) {
- //{{{
- receive {
- case Input_Chunks(chunks) =>
- Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream)
- chunks.foreach(_.write(stream))
- stream.flush
- case Close =>
- stream.close
- finished = true
- case bad => System.err.println(name + ": ignoring bad message " + bad)
- }
- //}}}
- }
- }
- catch { case e: IOException => system_output(name + ": " + e.getMessage) }
- system_output(name + " terminated")
- }
- }
-
-
/* message output */
private def message_output(stream: InputStream): Thread =
@@ -328,7 +311,10 @@
/** protocol commands **/
def protocol_command_bytes(name: String, args: Bytes*): Unit =
- command_input._2 ! Input_Chunks(Bytes(name) :: args.toList)
+ command_input match {
+ case Some(thread) => thread.send(Bytes(name) :: args.toList)
+ case None => error("Uninitialized command input thread")
+ }
def protocol_command(name: String, args: String*)
{