# HG changeset patch # User wenzelm # Date 1281436151 -7200 # Node ID 2b61c5e27399612875bd3c69c798f5dd6d239b19 # Parent dd7dcb9b2637122ae0380ea870c41e314947afd7 distinguish proper Isabelle_Process INPUT vs. raw STDIN, tuned corresponding method names; asynchronous Isabelle_Process.init -- raw ML toplevel stays active; simplified Isabelle_Process using actors; misc tuning; diff -r dd7dcb9b2637 -r 2b61c5e27399 src/Pure/General/markup.scala --- a/src/Pure/General/markup.scala Tue Aug 10 12:09:53 2010 +0200 +++ b/src/Pure/General/markup.scala Tue Aug 10 12:29:11 2010 +0200 @@ -203,6 +203,7 @@ val ERROR = "error" val DEBUG = "debug" val SYSTEM = "system" + val INPUT = "input" val STDIN = "stdin" val STDOUT = "stdout" val SIGNAL = "signal" diff -r dd7dcb9b2637 -r 2b61c5e27399 src/Pure/Isar/isar_document.scala --- a/src/Pure/Isar/isar_document.scala Tue Aug 10 12:09:53 2010 +0200 +++ b/src/Pure/Isar/isar_document.scala Tue Aug 10 12:29:11 2010 +0200 @@ -38,7 +38,7 @@ /* commands */ def define_command(id: Document.Command_ID, text: String) { - output_sync("Isar.define_command " + Isabelle_Syntax.encode_string(id) + " " + + input("Isar.define_command " + Isabelle_Syntax.encode_string(id) + " " + Isabelle_Syntax.encode_string(text)) } @@ -80,6 +80,6 @@ Isabelle_Syntax.append_string(new_id, buf) buf.append(" ") Isabelle_Syntax.append_list(append_node_edit, edits, buf) - output_sync(buf.toString) + input(buf.toString) } } diff -r dd7dcb9b2637 -r 2b61c5e27399 src/Pure/System/isabelle_process.ML --- a/src/Pure/System/isabelle_process.ML Tue Aug 10 12:09:53 2010 +0200 +++ b/src/Pure/System/isabelle_process.ML Tue Aug 10 12:29:11 2010 +0200 @@ -105,6 +105,10 @@ val _ = quick_and_dirty := true; (* FIXME !? *) val _ = Keyword.status (); val _ = Output.status (Markup.markup Markup.ready ""); - in Isar.toplevel_loop in_stream {init = true, welcome = false, sync = true, secure = true} end; + val _ = + Simple_Thread.fork false (fn () => + (Isar.toplevel_loop in_stream {init = true, welcome = false, sync = true, secure = true}; + quit ())); + in () end; end; diff -r dd7dcb9b2637 -r 2b61c5e27399 src/Pure/System/isabelle_process.scala --- a/src/Pure/System/isabelle_process.scala Tue Aug 10 12:09:53 2010 +0200 +++ b/src/Pure/System/isabelle_process.scala Tue Aug 10 12:29:11 2010 +0200 @@ -38,6 +38,7 @@ kind == Markup.EXIT def is_system(kind: String) = kind == Markup.SYSTEM || + kind == Markup.INPUT || kind == Markup.STDIN || kind == Markup.SIGNAL || kind == Markup.EXIT || @@ -111,7 +112,7 @@ /* signals */ - def interrupt() = synchronized { + def interrupt() = synchronized { // FIXME avoid synchronized if (proc == null) error("Cannot interrupt Isabelle: no process") if (pid == null) put_result(Markup.SYSTEM, "Cannot interrupt: unknown pid") else { @@ -125,7 +126,7 @@ } } - def kill() = synchronized { + def kill() = synchronized { // FIXME avoid synchronized if (proc == 0) error("Cannot kill Isabelle: no process") else { try_close() @@ -138,85 +139,45 @@ } - /* output being piped into the process */ - - private val output = new LinkedBlockingQueue[String] - - private def output_raw(text: String) = synchronized { - if (proc == null) error("Cannot output to Isabelle: no process") - if (closing) error("Cannot output to Isabelle: already closing") - output.put(text) - } - def output_sync(text: String) = - output_raw(" \\<^sync>\n; " + text + " \\<^sync>;\n") - - - def command(text: String) = - output_sync("Isabelle.command " + Isabelle_Syntax.encode_string(text)) + /** stream actors **/ - def command(props: List[(String, String)], text: String) = - output_sync("Isabelle.command " + Isabelle_Syntax.encode_properties(props) + " " + - Isabelle_Syntax.encode_string(text)) - - def ML_val(text: String) = - output_sync("ML_val " + Isabelle_Syntax.encode_string(text)) + /* input */ - def ML_command(text: String) = - output_sync("ML_command " + Isabelle_Syntax.encode_string(text)) - - def close() = synchronized { // FIXME watchdog/timeout - output_raw("\u0000") - closing = true - } + case class Input(cmd: String) + case object Close - def try_close() = synchronized { - if (proc != null && !closing) { - try { close() } - catch { case _: RuntimeException => } - } - } - - - /* commands */ - - private class Command_Thread(fifo: String) extends Thread("isabelle: commands") - { - override def run() - { - val stream = system.fifo_output_stream(fifo) + private def input_actor(name: String, kind: String, stream: => OutputStream): Actor = + Library.thread_actor(name) { val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset)) var finished = false while (!finished) { try { //{{{ - val s = output.take - if (s == "\u0000") { - writer.close - finished = true - } - else { - put_result(Markup.STDIN, s) - writer.write(s) - writer.flush + receive { + case Input(text) => + put_result(kind, text) + writer.write(text) + writer.flush + case Close => + writer.close + finished = true + case bad => System.err.println(name + ": ignoring bad message " + bad) } //}}} } catch { - case e: IOException => put_result(Markup.SYSTEM, "Command thread: " + e.getMessage) + case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage) } } - put_result(Markup.SYSTEM, "Command thread terminated") + put_result(Markup.SYSTEM, name + " terminated") } - } - /* raw stdout */ + /* raw output */ - private class Stdout_Thread(stream: InputStream) extends Thread("isabelle: stdout") - { - override def run() = - { + private def output_actor(name: String, kind: String, stream: => InputStream): Actor = + Library.thread_actor(name) { val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset)) var result = new StringBuilder(100) @@ -232,7 +193,7 @@ else done = true } if (result.length > 0) { - put_result(Markup.STDOUT, result.toString) + put_result(kind, result.toString) result.length = 0 } else { @@ -243,22 +204,19 @@ //}}} } catch { - case e: IOException => put_result(Markup.SYSTEM, "Stdout thread: " + e.getMessage) + case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage) } } - put_result(Markup.SYSTEM, "Stdout thread terminated") + put_result(Markup.SYSTEM, name + " terminated") } - } - /* messages */ + /* message output */ - private class Message_Thread(fifo: String) extends Thread("isabelle: messages") - { - private class Protocol_Error(msg: String) extends Exception(msg) - override def run() - { - val stream = system.fifo_input_stream(fifo) + private class Protocol_Error(msg: String) extends Exception(msg) + + private def message_actor(name: String, stream: InputStream): Actor = + Library.thread_actor(name) { val default_buffer = new Array[Byte](65536) var c = -1 @@ -325,54 +283,83 @@ stream.close try_close() - put_result(Markup.SYSTEM, "Message thread terminated") + put_result(Markup.SYSTEM, name + " terminated") } - } - /** main **/ + /** init **/ + + /* exec process */ - { - /* private communication channels */ + private val in_fifo = system.mk_fifo() + private val out_fifo = system.mk_fifo() + private def rm_fifos() = { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) } - val in_fifo = system.mk_fifo() - val out_fifo = system.mk_fifo() - def rm_fifos() = { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) } + try { + val cmdline = + List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args + proc = system.execute(true, cmdline: _*) + } + catch { + case e: IOException => + rm_fifos() + error("Failed to execute Isabelle process: " + e.getMessage) + } - val command_thread = new Command_Thread(in_fifo) - val message_thread = new Message_Thread(out_fifo) + + /* exit process */ - command_thread.start - message_thread.start + Library.thread_actor("process_exit") { + val rc = proc.waitFor() + Thread.sleep(300) // FIXME property!? + put_result(Markup.SYSTEM, "process_exit terminated") + put_result(Markup.EXIT, rc.toString) + rm_fifos() + } - /* exec process */ + /* I/O actors */ + + private val standard_input = + input_actor("standard_input", Markup.STDIN, proc.getOutputStream) - try { - val cmdline = - List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args - proc = system.execute(true, cmdline: _*) - } - catch { - case e: IOException => - rm_fifos() - error("Failed to execute Isabelle process: " + e.getMessage) - } - new Stdout_Thread(proc.getInputStream).start + private val command_input = + input_actor("command_input", Markup.INPUT, system.fifo_output_stream(in_fifo)) + + output_actor("standard_output", Markup.STDOUT, proc.getInputStream) + message_actor("message_output", system.fifo_input_stream(out_fifo)) + - /* exit */ + /** main methods **/ + + def input_raw(text: String) = standard_input ! Input(text) + + def input(text: String) = synchronized { // FIXME avoid synchronized + if (proc == null) error("Cannot output to Isabelle: no process") + if (closing) error("Cannot output to Isabelle: already closing") + command_input ! Input(" \\<^sync>\n; " + text + " \\<^sync>;\n") + } + + def command(text: String) = input("Isabelle.command " + Isabelle_Syntax.encode_string(text)) - new Thread("isabelle: exit") { - override def run() - { - val rc = proc.waitFor() - Thread.sleep(300) // FIXME property!? - put_result(Markup.SYSTEM, "Exit thread terminated") - put_result(Markup.EXIT, rc.toString) - rm_fifos() - } - }.start + def command(props: List[(String, String)], text: String) = + input("Isabelle.command " + Isabelle_Syntax.encode_properties(props) + " " + + Isabelle_Syntax.encode_string(text)) + + def ML_val(text: String) = input("ML_val " + Isabelle_Syntax.encode_string(text)) + def ML_command(text: String) = input("ML_command " + Isabelle_Syntax.encode_string(text)) + + def close() = synchronized { // FIXME avoid synchronized + command_input ! Close + closing = true + } + + def try_close() = synchronized { + if (proc != null && !closing) { + try { close() } + catch { case _: RuntimeException => } + } } }