# HG changeset patch # User wenzelm # Date 1407861403 -7200 # Node ID 2c2c24dbf0a49c99d6679c1e8729469125842f88 # Parent 448325de6e4fd86078faa1cb323ef71048d2b11d generic process wrapping in Prover; clarified module arrangement; diff -r 448325de6e4f -r 2c2c24dbf0a4 src/Pure/PIDE/protocol.scala --- a/src/Pure/PIDE/protocol.scala Tue Aug 12 17:28:07 2014 +0200 +++ b/src/Pure/PIDE/protocol.scala Tue Aug 12 18:36:43 2014 +0200 @@ -347,8 +347,25 @@ } -trait Protocol extends Prover +trait Protocol { + /* text */ + + def encode(s: String): String + def decode(s: String): String + + object Encode + { + val string: XML.Encode.T[String] = (s => XML.Encode.string(encode(s))) + } + + + /* protocol commands */ + + def protocol_command_bytes(name: String, args: Bytes*): Unit + def protocol_command(name: String, args: String*): Unit + + /* options */ def options(opts: Options): Unit = diff -r 448325de6e4f -r 2c2c24dbf0a4 src/Pure/PIDE/prover.scala --- a/src/Pure/PIDE/prover.scala Tue Aug 12 17:28:07 2014 +0200 +++ b/src/Pure/PIDE/prover.scala Tue Aug 12 18:36:43 2014 +0200 @@ -1,13 +1,13 @@ /* Title: Pure/PIDE/prover.scala Author: Makarius -General prover operations. +General prover operations and process wrapping. */ package isabelle -import java.io.BufferedReader +import java.io.{InputStream, OutputStream, BufferedReader, BufferedOutputStream, IOException} object Prover @@ -83,44 +83,286 @@ } -trait Prover +abstract class Prover( + receiver: Prover.Message => Unit, + system_process: Prover.System_Process) extends Protocol { - /* text and tree data */ + /* output */ + + val xml_cache: XML.Cache = new XML.Cache() + + private def system_output(text: String) + { + receiver(new Prover.Output(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))) + } + + private def protocol_output(props: Properties.T, bytes: Bytes) + { + receiver(new Prover.Protocol_Output(props, bytes)) + } + + private def output(kind: String, props: Properties.T, body: XML.Body) + { + if (kind == Markup.INIT) system_process.channel.accepted() - def encode(s: String): String - def decode(s: String): String + val main = XML.Elem(Markup(kind, props), Protocol.clean_message(body)) + val reports = Protocol.message_reports(props, body) + for (msg <- main :: reports) receiver(new Prover.Output(xml_cache.elem(msg))) + } + + private def exit_message(rc: Int) + { + output(Markup.EXIT, Markup.Return_Code(rc), List(XML.Text("Return code: " + rc.toString))) + } + + - object Encode + /** process manager **/ + + private val (_, process_result) = + Simple_Thread.future("process_result") { system_process.join } + + private def terminate_process() { - val string: XML.Encode.T[String] = (s => XML.Encode.string(encode(s))) + try { system_process.terminate } + catch { + case exn @ ERROR(_) => system_output("Failed to terminate prover process: " + exn.getMessage) + } } - def xml_cache: XML.Cache + private val process_manager = Simple_Thread.fork("process_manager") + { + val (startup_failed, startup_errors) = + { + var finished: Option[Boolean] = None + val result = new StringBuilder(100) + while (finished.isEmpty && (system_process.stderr.ready || !process_result.is_finished)) { + while (finished.isEmpty && system_process.stderr.ready) { + try { + val c = system_process.stderr.read + if (c == 2) finished = Some(true) + else result += c.toChar + } + catch { case _: IOException => finished = Some(false) } + } + Thread.sleep(10) + } + (finished.isEmpty || !finished.get, result.toString.trim) + } + if (startup_errors != "") system_output(startup_errors) + + if (startup_failed) { + terminate_process() + process_result.join + exit_message(127) + } + else { + val (command_stream, message_stream) = system_process.channel.rendezvous() + + command_input_init(command_stream) + val stdout = physical_output(false) + val stderr = physical_output(true) + val message = message_output(message_stream) + + val rc = process_result.join + system_output("process terminated") + command_input_close() + for (thread <- List(stdout, stderr, message)) thread.join + system_output("process_manager terminated") + exit_message(rc) + } + system_process.channel.accepted() + } + + + /* management methods */ + + def join() { process_manager.join() } + + def terminate() + { + command_input_close() + system_output("Terminating prover process") + terminate_process() + } + + + + /** process streams **/ + + /* command input */ + + private var command_input: Option[Consumer_Thread[List[Bytes]]] = None + + private def command_input_close(): Unit = command_input.foreach(_.shutdown) + + private def command_input_init(raw_stream: OutputStream) + { + val name = "command_input" + val stream = new BufferedOutputStream(raw_stream) + command_input = + Some( + Consumer_Thread.fork(name)( + consume = + { + case chunks => + try { + Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream) + chunks.foreach(_.write(stream)) + stream.flush + true + } + catch { case e: IOException => system_output(name + ": " + e.getMessage); false } + }, + finish = { case () => stream.close; system_output(name + " terminated") } + ) + ) + } - /* process management */ + /* physical output */ + + private def physical_output(err: Boolean): Thread = + { + val (name, reader, markup) = + if (err) ("standard_error", system_process.stderr, Markup.STDERR) + else ("standard_output", system_process.stdout, Markup.STDOUT) - def join(): Unit - def terminate(): Unit - - def protocol_command_bytes(name: String, args: Bytes*): Unit - def protocol_command(name: String, args: String*): Unit + Simple_Thread.fork(name) { + try { + var result = new StringBuilder(100) + var finished = false + while (!finished) { + //{{{ + var c = -1 + var done = false + while (!done && (result.length == 0 || reader.ready)) { + c = reader.read + if (c >= 0) result.append(c.asInstanceOf[Char]) + else done = true + } + if (result.length > 0) { + output(markup, Nil, List(XML.Text(decode(result.toString)))) + result.length = 0 + } + else { + reader.close + finished = true + } + //}}} + } + } + catch { case e: IOException => system_output(name + ": " + e.getMessage) } + system_output(name + " terminated") + } + } - /* PIDE protocol commands */ + /* message output */ + + private def message_output(stream: InputStream): Thread = + { + class EOF extends Exception + class Protocol_Error(msg: String) extends Exception(msg) + + val name = "message_output" + Simple_Thread.fork(name) { + val default_buffer = new Array[Byte](65536) + var c = -1 - def options(opts: Options): Unit + def read_int(): Int = + //{{{ + { + var n = 0 + c = stream.read + if (c == -1) throw new EOF + while (48 <= c && c <= 57) { + n = 10 * n + (c - 48) + c = stream.read + } + if (c != 10) + throw new Protocol_Error("malformed header: expected integer followed by newline") + else n + } + //}}} - def define_blob(digest: SHA1.Digest, bytes: Bytes): Unit - def define_command(command: Command): Unit + def read_chunk_bytes(): (Array[Byte], Int) = + //{{{ + { + val n = read_int() + val buf = + if (n <= default_buffer.size) default_buffer + else new Array[Byte](n) + + var i = 0 + var m = 0 + do { + m = stream.read(buf, i, n - i) + if (m != -1) i += m + } + while (m != -1 && n > i) + + if (i != n) + throw new Protocol_Error("bad chunk (unexpected EOF after " + i + " of " + n + " bytes)") + + (buf, n) + } + //}}} - def discontinue_execution(): Unit - def cancel_exec(id: Document_ID.Exec): Unit + def read_chunk(): XML.Body = + { + val (buf, n) = read_chunk_bytes() + YXML.parse_body_failsafe(UTF8.decode_chars(decode, buf, 0, n)) + } - def update(old_id: Document_ID.Version, new_id: Document_ID.Version, - edits: List[Document.Edit_Command]): Unit - def remove_versions(versions: List[Document.Version]): Unit + try { + do { + try { + val header = read_chunk() + header match { + case List(XML.Elem(Markup(name, props), Nil)) => + val kind = name.intern + if (kind == Markup.PROTOCOL) { + val (buf, n) = read_chunk_bytes() + protocol_output(props, Bytes(buf, 0, n)) + } + else { + val body = read_chunk() + output(kind, props, body) + } + case _ => + read_chunk() + throw new Protocol_Error("bad header: " + header.toString) + } + } + catch { case _: EOF => } + } + while (c != -1) + } + catch { + case e: IOException => system_output("Cannot read message:\n" + e.getMessage) + case e: Protocol_Error => system_output("Malformed message:\n" + e.getMessage) + } + stream.close - def dialog_result(serial: Long, result: String): Unit + system_output(name + " terminated") + } + } + + + + /** protocol commands **/ + + def protocol_command_bytes(name: String, args: Bytes*): Unit = + command_input match { + case Some(thread) => thread.send(Bytes(name) :: args.toList) + case None => error("Uninitialized command input thread") + } + + def protocol_command(name: String, args: String*) + { + receiver(new Prover.Input(name, args.toList)) + protocol_command_bytes(name, args.map(Bytes(_)): _*) + } } diff -r 448325de6e4f -r 2c2c24dbf0a4 src/Pure/PIDE/resources.scala --- a/src/Pure/PIDE/resources.scala Tue Aug 12 17:28:07 2014 +0200 +++ b/src/Pure/PIDE/resources.scala Tue Aug 12 18:36:43 2014 +0200 @@ -126,6 +126,6 @@ /* prover process */ def start_prover(receiver: Prover.Message => Unit, name: String, args: List[String]): Prover = - new Isabelle_Process(receiver, args) with Protocol + new Isabelle_Process(receiver, args) } diff -r 448325de6e4f -r 2c2c24dbf0a4 src/Pure/System/isabelle_process.ML --- a/src/Pure/System/isabelle_process.ML Tue Aug 12 17:28:07 2014 +0200 +++ b/src/Pure/System/isabelle_process.ML Tue Aug 12 18:36:43 2014 +0200 @@ -1,8 +1,7 @@ (* Title: Pure/System/isabelle_process.ML Author: Makarius -Isabelle process wrapper, based on private fifos for maximum -robustness and performance, or local socket for maximum portability. +Isabelle process wrapper. *) signature ISABELLE_PROCESS = diff -r 448325de6e4f -r 2c2c24dbf0a4 src/Pure/System/isabelle_process.scala --- a/src/Pure/System/isabelle_process.scala Tue Aug 12 17:28:07 2014 +0200 +++ b/src/Pure/System/isabelle_process.scala Tue Aug 12 18:36:43 2014 +0200 @@ -1,320 +1,31 @@ /* Title: Pure/System/isabelle_process.scala Author: Makarius - Options: :folding=explicit:collapseFolds=1: -Isabelle process management -- always reactive due to multi-threaded I/O. +Isabelle process wrapper. */ package isabelle -import java.io.{InputStream, OutputStream, BufferedOutputStream, IOException} - - class Isabelle_Process( receiver: Prover.Message => Unit = Console.println(_), - prover_args: List[String] = Nil) + prover_args: List[String] = Nil) extends Prover(receiver, + { + val system_channel = System_Channel() + try { + val cmdline = + Isabelle_System.getenv_strict("ISABELLE_PROCESS") :: + (system_channel.prover_args ::: prover_args) + val process = + new Isabelle_System.Managed_Process(null, null, false, cmdline: _*) with + Prover.System_Process { def channel = system_channel } + process.stdin.close + process + } + catch { case exn @ ERROR(_) => system_channel.accepted(); throw(exn) } + }) { - /* system process -- default implementation */ - - protected val system_process: Prover.System_Process = - { - val system_channel = System_Channel() - try { - val cmdline = - Isabelle_System.getenv_strict("ISABELLE_PROCESS") :: - (system_channel.prover_args ::: prover_args) - val process = - new Isabelle_System.Managed_Process(null, null, false, cmdline: _*) with - Prover.System_Process { def channel = system_channel } - process.stdin.close - process - } - catch { case exn @ ERROR(_) => system_channel.accepted(); throw(exn) } - } - - - /* text and tree data */ - def encode(s: String): String = Symbol.encode(s) def decode(s: String): String = Symbol.decode(s) - - val xml_cache: XML.Cache = new XML.Cache() - - - /* output */ - - private def system_output(text: String) - { - receiver(new Prover.Output(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))) - } - - private def protocol_output(props: Properties.T, bytes: Bytes) - { - receiver(new Prover.Protocol_Output(props, bytes)) - } - - private def output(kind: String, props: Properties.T, body: XML.Body) - { - if (kind == Markup.INIT) system_process.channel.accepted() - - val main = XML.Elem(Markup(kind, props), Protocol.clean_message(body)) - val reports = Protocol.message_reports(props, body) - for (msg <- main :: reports) receiver(new Prover.Output(xml_cache.elem(msg))) - } - - private def exit_message(rc: Int) - { - output(Markup.EXIT, Markup.Return_Code(rc), List(XML.Text("Return code: " + rc.toString))) - } - - - - /** process manager **/ - - private val (_, process_result) = - Simple_Thread.future("process_result") { system_process.join } - - private def terminate_process() - { - try { system_process.terminate } - catch { - case exn @ ERROR(_) => system_output("Failed to terminate prover process: " + exn.getMessage) - } - } - - private val process_manager = Simple_Thread.fork("process_manager") - { - val (startup_failed, startup_errors) = - { - var finished: Option[Boolean] = None - val result = new StringBuilder(100) - while (finished.isEmpty && (system_process.stderr.ready || !process_result.is_finished)) { - while (finished.isEmpty && system_process.stderr.ready) { - try { - val c = system_process.stderr.read - if (c == 2) finished = Some(true) - else result += c.toChar - } - catch { case _: IOException => finished = Some(false) } - } - Thread.sleep(10) - } - (finished.isEmpty || !finished.get, result.toString.trim) - } - if (startup_errors != "") system_output(startup_errors) - - if (startup_failed) { - terminate_process() - process_result.join - exit_message(127) - } - else { - val (command_stream, message_stream) = system_process.channel.rendezvous() - - command_input_init(command_stream) - val stdout = physical_output(false) - val stderr = physical_output(true) - val message = message_output(message_stream) - - val rc = process_result.join - system_output("process terminated") - command_input_close() - for (thread <- List(stdout, stderr, message)) thread.join - system_output("process_manager terminated") - exit_message(rc) - } - system_process.channel.accepted() - } - - - /* management methods */ - - def join() { process_manager.join() } - - def terminate() - { - command_input_close() - system_output("Terminating prover process") - terminate_process() - } - - - - /** process streams **/ - - /* command input */ - - private var command_input: Option[Consumer_Thread[List[Bytes]]] = None - - private def command_input_close(): Unit = command_input.foreach(_.shutdown) - - private def command_input_init(raw_stream: OutputStream) - { - val name = "command_input" - val stream = new BufferedOutputStream(raw_stream) - command_input = - Some( - Consumer_Thread.fork(name)( - consume = - { - case chunks => - try { - Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream) - chunks.foreach(_.write(stream)) - stream.flush - true - } - catch { case e: IOException => system_output(name + ": " + e.getMessage); false } - }, - finish = { case () => stream.close; system_output(name + " terminated") } - ) - ) - } - - - /* physical output */ +} - private def physical_output(err: Boolean): Thread = - { - val (name, reader, markup) = - if (err) ("standard_error", system_process.stderr, Markup.STDERR) - else ("standard_output", system_process.stdout, Markup.STDOUT) - - Simple_Thread.fork(name) { - try { - var result = new StringBuilder(100) - var finished = false - while (!finished) { - //{{{ - var c = -1 - var done = false - while (!done && (result.length == 0 || reader.ready)) { - c = reader.read - if (c >= 0) result.append(c.asInstanceOf[Char]) - else done = true - } - if (result.length > 0) { - output(markup, Nil, List(XML.Text(decode(result.toString)))) - result.length = 0 - } - else { - reader.close - finished = true - } - //}}} - } - } - catch { case e: IOException => system_output(name + ": " + e.getMessage) } - system_output(name + " terminated") - } - } - - - /* message output */ - - private def message_output(stream: InputStream): Thread = - { - class EOF extends Exception - class Protocol_Error(msg: String) extends Exception(msg) - - val name = "message_output" - Simple_Thread.fork(name) { - val default_buffer = new Array[Byte](65536) - var c = -1 - - def read_int(): Int = - //{{{ - { - var n = 0 - c = stream.read - if (c == -1) throw new EOF - while (48 <= c && c <= 57) { - n = 10 * n + (c - 48) - c = stream.read - } - if (c != 10) - throw new Protocol_Error("malformed header: expected integer followed by newline") - else n - } - //}}} - - def read_chunk_bytes(): (Array[Byte], Int) = - //{{{ - { - val n = read_int() - val buf = - if (n <= default_buffer.size) default_buffer - else new Array[Byte](n) - - var i = 0 - var m = 0 - do { - m = stream.read(buf, i, n - i) - if (m != -1) i += m - } - while (m != -1 && n > i) - - if (i != n) - throw new Protocol_Error("bad chunk (unexpected EOF after " + i + " of " + n + " bytes)") - - (buf, n) - } - //}}} - - def read_chunk(): XML.Body = - { - val (buf, n) = read_chunk_bytes() - YXML.parse_body_failsafe(UTF8.decode_chars(decode, buf, 0, n)) - } - - try { - do { - try { - val header = read_chunk() - header match { - case List(XML.Elem(Markup(name, props), Nil)) => - val kind = name.intern - if (kind == Markup.PROTOCOL) { - val (buf, n) = read_chunk_bytes() - protocol_output(props, Bytes(buf, 0, n)) - } - else { - val body = read_chunk() - output(kind, props, body) - } - case _ => - read_chunk() - throw new Protocol_Error("bad header: " + header.toString) - } - } - catch { case _: EOF => } - } - while (c != -1) - } - catch { - case e: IOException => system_output("Cannot read message:\n" + e.getMessage) - case e: Protocol_Error => system_output("Malformed message:\n" + e.getMessage) - } - stream.close - - system_output(name + " terminated") - } - } - - - - /** protocol commands **/ - - def protocol_command_bytes(name: String, args: Bytes*): Unit = - command_input match { - case Some(thread) => thread.send(Bytes(name) :: args.toList) - case None => error("Uninitialized command input thread") - } - - def protocol_command(name: String, args: String*) - { - receiver(new Prover.Input(name, args.toList)) - protocol_command_bytes(name, args.map(Bytes(_)): _*) - } -}