# HG changeset patch # User wenzelm # Date 1284928822 -7200 # Node ID c01d89d18ff0082997bf106e6adec0bc7f0626f7 # Parent f03a9c57760aadaea800bcaf6b44c1283d1b4292 refined Isabelle_Process startup: emit \002 before rendezvous on fifos, more robust treatment of startup failure with timeout, do not quit() after main loop; tuned; diff -r f03a9c57760a -r c01d89d18ff0 src/Pure/System/isabelle_process.ML --- a/src/Pure/System/isabelle_process.ML Sun Sep 19 22:20:48 2010 +0200 +++ b/src/Pure/System/isabelle_process.ML Sun Sep 19 22:40:22 2010 +0200 @@ -3,6 +3,16 @@ Isabelle process wrapper, based on private fifos for maximum robustness and performance. + +Startup phases: + . raw Posix process startup with uncontrolled output on stdout/stderr + . stdout \002: ML running + .. stdin/stdout/stderr freely available (raw ML loop) + .. protocol thread initialization + ... switch to in_fifo/out_fifo channels (rendezvous via open) + ... out_fifo INIT(pid): channels ready + ... out_fifo STATUS(keywords) + ... out_fifo READY: main loop ready *) signature ISABELLE_PROCESS = @@ -166,17 +176,21 @@ (* init *) -fun init in_fifo out_fifo = +fun init in_fifo out_fifo = ignore (Simple_Thread.fork false (fn () => let + val _ = OS.Process.sleep (Time.fromMilliseconds 500); (*yield to raw ML toplevel*) + val _ = Output.std_output Symbol.STX; + + val _ = quick_and_dirty := true; (* FIXME !? *) + val _ = Context.set_thread_data NONE; val _ = Unsynchronized.change print_mode (fold (update op =) [isabelle_processN, Keyword.keyword_statusN, Pretty.symbolicN]); + val (in_stream, out_stream) = setup_channels in_fifo out_fifo; val _ = init_message out_stream; - val _ = quick_and_dirty := true; (* FIXME !? *) val _ = Keyword.status (); val _ = Output.status (Markup.markup Markup.ready ""); - val _ = Context.set_thread_data NONE; - val _ = Simple_Thread.fork false (fn () => (loop in_stream; quit ())); - in () end; + in loop in_stream end)); end; + diff -r f03a9c57760a -r c01d89d18ff0 src/Pure/System/isabelle_process.scala --- a/src/Pure/System/isabelle_process.scala Sun Sep 19 22:20:48 2010 +0200 +++ b/src/Pure/System/isabelle_process.scala Sun Sep 19 22:40:22 2010 +0200 @@ -60,7 +60,7 @@ } -class Isabelle_Process(system: Isabelle_System, receiver: Actor, args: String*) +class Isabelle_Process(system: Isabelle_System, timeout: Int, receiver: Actor, args: String*) { import Isabelle_Process._ @@ -68,14 +68,81 @@ /* demo constructor */ def this(args: String*) = - this(new Isabelle_System, + this(new Isabelle_System, 10000, actor { loop { react { case res => Console.println(res) } } }, args: _*) - /* process information */ + /* input actors */ + + private case class Input_Text(text: String) + private case class Input_Chunks(chunks: List[Array[Byte]]) + + private case object Close + private def close(a: Actor) { if (a != null) a ! Close } + + @volatile private var standard_input: Actor = null + @volatile private var command_input: Actor = null + + + /* process manager */ + + private val in_fifo = system.mk_fifo() + private val out_fifo = system.mk_fifo() + private def rm_fifos() { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) } + + private val proc = + try { + val cmdline = + List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args + system.execute(true, cmdline: _*) + } + catch { case e: IOException => rm_fifos(); throw(e) } + + private val stdout_reader = + new BufferedReader(new InputStreamReader(proc.getInputStream, Standard_System.charset)) + + private val stdin_writer = + new BufferedWriter(new OutputStreamWriter(proc.getOutputStream, Standard_System.charset)) - @volatile private var proc: Option[Process] = None - @volatile private var pid: Option[String] = None + Simple_Thread.actor("process_manager") { + val (startup_failed, startup_output) = + { + val expired = System.currentTimeMillis() + timeout + val result = new StringBuilder(100) + + var finished = false + while (!finished && System.currentTimeMillis() <= expired) { + while (!finished && stdout_reader.ready) { + val c = stdout_reader.read + if (c == 2) finished = true + else result += c.toChar + } + Thread.sleep(10) + } + (!finished, result.toString) + } + if (startup_failed) { + put_result(Markup.STDOUT, startup_output) + put_result(Markup.EXIT, "127") + stdin_writer.close + Thread.sleep(300) // FIXME !? + proc.destroy // FIXME reliable!? + } + else { + put_result(Markup.SYSTEM, startup_output) + + standard_input = stdin_actor() + stdout_actor() + command_input = input_actor() + message_actor() + + val rc = proc.waitFor() + Thread.sleep(300) // FIXME !? + system_result("Isabelle process terminated") + put_result(Markup.EXIT, rc.toString) + } + rm_fifos() + } /* results */ @@ -110,34 +177,33 @@ /* signals */ + @volatile private var pid: Option[String] = None + def interrupt() { - if (proc.isEmpty) system_result("Cannot interrupt Isabelle: no process") - else - pid match { - case None => system_result("Cannot interrupt Isabelle: unknowd pid") - case Some(i) => - try { - if (system.execute(true, "kill", "-INT", i).waitFor == 0) - system_result("Interrupt Isabelle") - else - system_result("Cannot interrupt Isabelle: kill command failed") - } - catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) } - } + pid match { + case None => system_result("Cannot interrupt Isabelle: unknowd pid") + case Some(i) => + try { + if (system.execute(true, "kill", "-INT", i).waitFor == 0) + system_result("Interrupt Isabelle") + else + system_result("Cannot interrupt Isabelle: kill command failed") + } + catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) } + } } def kill() { - proc match { - case None => system_result("Cannot kill Isabelle: no process") - case Some(p) => - close() - Thread.sleep(500) // FIXME !? - system_result("Kill Isabelle") - p.destroy - proc = None - pid = None + val running = + try { proc.exitValue; false } + catch { case e: java.lang.IllegalThreadStateException => true } + if (running) { + close() + Thread.sleep(500) // FIXME !? + system_result("Kill Isabelle") + proc.destroy } } @@ -145,26 +211,22 @@ /** stream actors **/ - private case class Input_Text(text: String) - private case class Input_Chunks(chunks: List[Array[Byte]]) - private case object Close - - /* raw stdin */ - private def stdin_actor(name: String, stream: OutputStream): Actor = + private def stdin_actor(): Actor = + { + val name = "standard_input" Simple_Thread.actor(name) { - val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset)) var finished = false while (!finished) { try { //{{{ receive { case Input_Text(text) => - writer.write(text) - writer.flush + stdin_writer.write(text) + stdin_writer.flush case Close => - writer.close + stdin_writer.close finished = true case bad => System.err.println(name + ": ignoring bad message " + bad) } @@ -174,13 +236,15 @@ } system_result(name + " terminated") } + } /* raw stdout */ - private def stdout_actor(name: String, stream: InputStream): Actor = + private def stdout_actor(): Actor = + { + val name = "standard_output" Simple_Thread.actor(name) { - val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset)) var result = new StringBuilder(100) var finished = false @@ -189,8 +253,8 @@ //{{{ var c = -1 var done = false - while (!done && (result.length == 0 || reader.ready)) { - c = reader.read + while (!done && (result.length == 0 || stdout_reader.ready)) { + c = stdout_reader.read if (c >= 0) result.append(c.asInstanceOf[Char]) else done = true } @@ -199,9 +263,8 @@ result.length = 0 } else { - reader.close + stdout_reader.close finished = true - close() } //}}} } @@ -209,13 +272,16 @@ } system_result(name + " terminated") } + } /* command input */ - private def input_actor(name: String, fifo: String): Actor = + private def input_actor(): Actor = + { + val name = "command_input" Simple_Thread.actor(name) { - val stream = new BufferedOutputStream(system.fifo_output_stream(fifo)) // FIXME potentially blocking forever + val stream = new BufferedOutputStream(system.fifo_output_stream(in_fifo)) var finished = false while (!finished) { try { @@ -237,17 +303,19 @@ } system_result(name + " terminated") } + } /* message output */ - private def message_actor(name: String, fifo: String): Actor = + private def message_actor(): Actor = { class EOF extends Exception class Protocol_Error(msg: String) extends Exception(msg) + val name = "message_output" Simple_Thread.actor(name) { - val stream = system.fifo_input_stream(fifo) // FIXME potentially blocking forever + val stream = system.fifo_input_stream(out_fifo) val default_buffer = new Array[Byte](65536) var c = -1 @@ -300,55 +368,12 @@ } } while (c != -1) stream.close - close() system_result(name + " terminated") } } - - /** init **/ - - /* exec process */ - - private val in_fifo = system.mk_fifo() - private val out_fifo = system.mk_fifo() - private def rm_fifos() { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) } - - try { - val cmdline = - List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args - proc = Some(system.execute(true, cmdline: _*)) - } - catch { case e: IOException => rm_fifos(); throw(e) } - - - /* I/O actors */ - - private val command_input = input_actor("command_input", in_fifo) - message_actor("message_output", out_fifo) - - private val standard_input = stdin_actor("standard_input", proc.get.getOutputStream) - stdout_actor("standard_output", proc.get.getInputStream) - - - /* exit process */ - - Simple_Thread.actor("process_exit") { - proc match { - case None => - case Some(p) => - val rc = p.waitFor() - Thread.sleep(300) // FIXME property!? - system_result("Isabelle process terminated") - put_result(Markup.EXIT, rc.toString) - } - rm_fifos() - } - - - /** main methods **/ def input_raw(text: String): Unit = standard_input ! Input_Text(text) @@ -359,5 +384,5 @@ def input(name: String, args: String*): Unit = input_bytes(name, args.map(Standard_System.string_bytes): _*) - def close(): Unit = { standard_input ! Close; command_input ! Close } + def close(): Unit = { close(command_input); close(standard_input) } } diff -r f03a9c57760a -r c01d89d18ff0 src/Pure/System/session.scala --- a/src/Pure/System/session.scala Sun Sep 19 22:20:48 2010 +0200 +++ b/src/Pure/System/session.scala Sun Sep 19 22:40:22 2010 +0200 @@ -270,7 +270,7 @@ case Started(timeout, args) => if (prover == null) { - prover = new Isabelle_Process(system, self, args:_*) with Isar_Document + prover = new Isabelle_Process(system, timeout, self, args:_*) with Isar_Document val origin = sender val opt_err = prover_startup(timeout) if (opt_err.isDefined) prover = null