# HG changeset patch # User wenzelm # Date 1281480288 -7200 # Node ID 71bb3c273dd1f51a3faf891940290ce467dd3636 # Parent cd6906d9199f28d7486bcb52c07e570a5d63ae32 native Isabelle_Process commands, based on efficient byte channel protocol for string lists; misc clarification of proc/pid state, eliminated closing flag; misc tuning and simplification; diff -r cd6906d9199f -r 71bb3c273dd1 src/Pure/System/isabelle_process.ML --- a/src/Pure/System/isabelle_process.ML Wed Aug 11 00:42:40 2010 +0200 +++ b/src/Pure/System/isabelle_process.ML Wed Aug 11 00:44:48 2010 +0200 @@ -11,6 +11,9 @@ signature ISABELLE_PROCESS = sig val isabelle_processN: string + val add_command: string -> (string list -> unit) -> unit + val command: string -> string list -> unit + val crashes: exn list Unsynchronized.ref val init: string -> string -> unit end; @@ -25,6 +28,28 @@ val _ = Markup.add_mode isabelle_processN YXML.output_markup; +(* commands *) + +local + +val global_commands = Unsynchronized.ref (Symtab.empty: (string list -> unit) Symtab.table); + +in + +fun add_command name cmd = CRITICAL (fn () => + Unsynchronized.change global_commands (fn cmds => + (if not (Symtab.defined cmds name) then () + else warning ("Redefining Isabelle process command " ^ quote name); + Symtab.update (name, cmd) cmds))); + +fun command name args = + (case Symtab.lookup (! global_commands) name of + NONE => error ("Undefined Isabelle process command " ^ quote name) + | SOME cmd => cmd args); + +end; + + (* message markup *) local @@ -94,6 +119,53 @@ end; +(* protocol loop *) + +val crashes = Unsynchronized.ref ([]: exn list); + +local + +fun recover crash = + (CRITICAL (fn () => Unsynchronized.change crashes (cons crash)); + warning "Recovering from Isabelle process crash -- see also Isabelle_Process.crashes"); + +fun read_chunk stream len = + let + val n = + (case Int.fromString len of + SOME n => n + | NONE => error ("Isabelle process: malformed chunk header " ^ quote len)); + val chunk = TextIO.inputN (stream, n); + val m = size chunk; + in + if m = n then chunk + else error ("Isabelle process: bad chunk (" ^ string_of_int m ^ " vs. " ^ string_of_int n ^ ")") + end; + +fun read_command stream = + (case TextIO.inputLine stream of + NONE => raise Runtime.TERMINATE + | SOME line => map (read_chunk stream) (space_explode "," line)); + +fun run_command name args = + Runtime.debugging (command name) args + handle exn => + error ("Isabelle process command failure: " ^ name ^ "\n" ^ ML_Compiler.exn_message exn); + +in + +fun loop stream = + let val continue = + (case read_command stream of + [] => (Output.error_msg "Isabelle process: no input"; true) + | name :: args => (run_command name args; true)) + handle Runtime.TERMINATE => false + | exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true); + in if continue then loop stream else () end; + +end; + + (* init *) fun init in_fifo out_fifo = @@ -105,10 +177,8 @@ val _ = quick_and_dirty := true; (* FIXME !? *) val _ = Keyword.status (); val _ = Output.status (Markup.markup Markup.ready ""); - val _ = - Simple_Thread.fork false (fn () => - (Isar.toplevel_loop in_stream {init = true, welcome = false, sync = true, secure = true}; - quit ())); + val _ = Context.set_thread_data NONE; + val _ = Simple_Thread.fork false (fn () => (loop in_stream; quit ())); in () end; end; diff -r cd6906d9199f -r 71bb3c273dd1 src/Pure/System/isabelle_process.scala --- a/src/Pure/System/isabelle_process.scala Wed Aug 11 00:42:40 2010 +0200 +++ b/src/Pure/System/isabelle_process.scala Wed Aug 11 00:44:48 2010 +0200 @@ -9,7 +9,7 @@ import java.util.concurrent.LinkedBlockingQueue import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter, - InputStream, OutputStream, IOException} + InputStream, OutputStream, BufferedOutputStream, IOException} import scala.actors.Actor import Actor._ @@ -89,9 +89,8 @@ /* process information */ - @volatile private var proc: Process = null - @volatile private var closing = false - @volatile private var pid: String = null + @volatile private var proc: Option[Process] = None + @volatile private var pid: Option[String] = None /* results */ @@ -99,7 +98,7 @@ private def put_result(kind: String, props: List[(String, String)], body: List[XML.Tree]) { if (kind == Markup.INIT) { - for ((Markup.PID, p) <- props) pid = p + for ((Markup.PID, p) <- props) pid = Some(p) } receiver ! new Result(XML.Elem(Markup(kind, props), body)) } @@ -112,29 +111,34 @@ /* signals */ - def interrupt() = synchronized { // FIXME avoid synchronized - if (proc == null) error("Cannot interrupt Isabelle: no process") - if (pid == null) put_result(Markup.SYSTEM, "Cannot interrupt: unknown pid") - else { - try { - if (system.execute(true, "kill", "-INT", pid).waitFor == 0) - put_result(Markup.SIGNAL, "INT") - else - put_result(Markup.SYSTEM, "Cannot interrupt: kill command failed") + def interrupt() + { + if (proc.isEmpty) put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: no process") + else + pid match { + case None => put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: unknowd pid") + case Some(i) => + try { + if (system.execute(true, "kill", "-INT", i).waitFor == 0) + put_result(Markup.SIGNAL, "INT") + else + put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: kill command failed") + } + catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) } } - catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) } - } } - def kill() = synchronized { // FIXME avoid synchronized - if (proc == 0) error("Cannot kill Isabelle: no process") - else { - try_close() - Thread.sleep(500) // FIXME property!? - put_result(Markup.SIGNAL, "KILL") - proc.destroy - proc = null - pid = null + def kill() + { + proc match { + case None => put_result(Markup.SYSTEM, "Cannot kill Isabelle: no process") + case Some(p) => + close() + Thread.sleep(500) // FIXME !? + put_result(Markup.SIGNAL, "KILL") + p.destroy + proc = None + pid = None } } @@ -142,12 +146,14 @@ /** stream actors **/ - /* input */ - - case class Input(cmd: String) + case class Input_Text(text: String) + case class Input_Chunks(chunks: List[Array[Byte]]) case object Close - private def input_actor(name: String, kind: String, stream: => OutputStream): Actor = + + /* raw stdin */ + + private def stdin_actor(name: String, stream: OutputStream): Actor = Library.thread_actor(name) { val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset)) var finished = false @@ -155,8 +161,8 @@ try { //{{{ receive { - case Input(text) => - put_result(kind, text) + case Input_Text(text) => + // FIXME echo input?! writer.write(text) writer.flush case Close => @@ -174,9 +180,9 @@ } - /* raw output */ + /* raw stdout */ - private def output_actor(name: String, kind: String, stream: => InputStream): Actor = + private def stdout_actor(name: String, stream: InputStream): Actor = Library.thread_actor(name) { val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset)) var result = new StringBuilder(100) @@ -193,13 +199,43 @@ else done = true } if (result.length > 0) { - put_result(kind, result.toString) + put_result(Markup.STDOUT, result.toString) result.length = 0 } else { reader.close finished = true - try_close() + close() + } + //}}} + } + catch { + case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage) + } + } + put_result(Markup.SYSTEM, name + " terminated") + } + + + /* command input */ + + private def input_actor(name: String, raw_stream: OutputStream): Actor = + Library.thread_actor(name) { + val stream = new BufferedOutputStream(raw_stream) + var finished = false + while (!finished) { + try { + //{{{ + receive { + case Input_Chunks(chunks) => + stream.write(Standard_System.string_bytes( + chunks.map(_.length).mkString("", ",", "\n"))); + chunks.foreach(stream.write(_)); + stream.flush + case Close => + stream.close + finished = true + case bad => System.err.println(name + ": ignoring bad message " + bad) } //}}} } @@ -281,7 +317,7 @@ } } while (c != -1) stream.close - try_close() + close() put_result(Markup.SYSTEM, name + " terminated") } @@ -299,7 +335,7 @@ try { val cmdline = List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args - proc = system.execute(true, cmdline: _*) + proc = Some(system.execute(true, cmdline: _*)) } catch { case e: IOException => @@ -308,49 +344,37 @@ } + /* I/O actors */ + + private val standard_input = stdin_actor("standard_input", proc.get.getOutputStream) + stdout_actor("standard_output", proc.get.getInputStream) + + private val command_input = input_actor("command_input", system.fifo_output_stream(in_fifo)) + message_actor("message_output", system.fifo_input_stream(out_fifo)) + + /* exit process */ Library.thread_actor("process_exit") { - val rc = proc.waitFor() - Thread.sleep(300) // FIXME property!? - put_result(Markup.SYSTEM, "process_exit terminated") - put_result(Markup.EXIT, rc.toString) + proc match { + case None => + case Some(p) => + val rc = p.waitFor() + Thread.sleep(300) // FIXME property!? + put_result(Markup.SYSTEM, "process_exit terminated") + put_result(Markup.EXIT, rc.toString) + } rm_fifos() } - /* I/O actors */ - - private val standard_input = - input_actor("standard_input", Markup.STDIN, proc.getOutputStream) - - private val command_input = - input_actor("command_input", Markup.INPUT, system.fifo_output_stream(in_fifo)) - - output_actor("standard_output", Markup.STDOUT, proc.getInputStream) - message_actor("message_output", system.fifo_input_stream(out_fifo)) - - /** main methods **/ - def input_raw(text: String) = standard_input ! Input(text) - - def input(text: String) = synchronized { // FIXME avoid synchronized - if (proc == null) error("Cannot output to Isabelle: no process") - if (closing) error("Cannot output to Isabelle: already closing") - command_input ! Input(" \\<^sync>\n; " + text + " \\<^sync>;\n") - } + def input_raw(text: String): Unit = standard_input ! Input_Text(text) - def close() = synchronized { // FIXME avoid synchronized - command_input ! Close - closing = true - } + def input(name: String, args: String*): Unit = + command_input ! Input_Chunks((name :: args.toList).map(Standard_System.string_bytes)) - def try_close() = synchronized { - if (proc != null && !closing) { - try { close() } - catch { case _: RuntimeException => } - } - } + def close(): Unit = command_input ! Close }