# HG changeset patch # User wenzelm # Date 1219504971 -7200 # Node ID 6eb0327c0b9bdafc863155d92f288fd6b3c9e592 # Parent 2638b611d3ce871b2cb3657c35fa33d153b2558e Isabelle process management -- always reactive due to multi-threaded I/O. diff -r 2638b611d3ce -r 6eb0327c0b9b src/Pure/Tools/isabelle_process.scala --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/Pure/Tools/isabelle_process.scala Sat Aug 23 17:22:51 2008 +0200 @@ -0,0 +1,337 @@ +/* Title: Pure/Tools/isabelle_process.ML :folding=explicit:collapseFolds=1: + ID: $Id$ + Author: Makarius + +Isabelle process management -- always reactive due to multi-threaded I/O. +*/ + +package isabelle + +import java.util.Properties +import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter, IOException} + +import scala.collection.mutable.{SynchronizedQueue, ArrayBuffer} +import isabelle.{Symbol, XML, YXML} + + +class IsabelleProcess(args: String*) { + + class IsabelleProcessException(msg: String) extends Exception + + + /* process information */ + + private var proc: Process = null + private var pid: String = null + private var closing = false + private var session: String = null + + + /* encoding */ + + private val charset = "UTF-8" + private val symbols = new Symbol.Interpretation + + + /* results */ + + object Kind extends Enumeration { + //{{{ values + // Posix channels/events + val STDIN = Value("STDIN") + val STDOUT = Value("STDOUT") + val STDERR = Value("STDERR") + val SIGNAL = Value("SIGNAL") + val EXIT = Value("EXIT") + // Isabelle messages + val WRITELN = Value("WRITELN") + val PRIORITY = Value("PRIORITY") + val TRACING = Value("TRACING") + val WARNING = Value("WARNING") + val ERROR = Value("ERROR") + val DEBUG = Value("DEBUG") + val PROMPT = Value("PROMPT") + val INIT = Value("INIT") + val STATUS = Value("STATUS") + // internal system notification + val SYSTEM = Value("SYSTEM") + //}}} + def is_raw(kind: Value) = + kind == STDOUT || + kind == STDERR + def is_system(kind: Value) = + kind == STDIN || + kind == SIGNAL || + kind == EXIT || + kind == PROMPT || + kind == STATUS || + kind == SYSTEM + } + + class Result(kind: Kind.Value, props: Properties, result: String) { + //{{{ + override def toString = { + if (props == null) kind.toString + " [[" + result + "]]" + else kind.toString + " " + props.toString + " [[" + result + "]]" + } + + private var the_tree: XML.Tree = null + def tree() = synchronized { + if (the_tree == null) the_tree = YXML.parse(symbols.decode(result)); + the_tree + } + //}}} + } + + val results = new SynchronizedQueue[Result] + + private def put_result(kind: Kind.Value, props: Properties, result: String) { + results += new Result(kind, props, result) + } + + + /* output being piped into the process */ + + val output = new SynchronizedQueue[String] + + def output_raw(text: String) = synchronized { + if (proc == null) throw new IsabelleProcessException("Cannot output: no process") + if (closing) throw new IsabelleProcessException("Cannot output: already closing") + output.enqueue(text) + } + + def output_sync(text: String) = output_raw(" \\<^sync>\n; " + text + " \\<^sync>;\n") + + def output_command(text: String) = + output_sync("Isabelle.command " + IsabelleSyntax.encode_string(text)) + + def output_command(props: Properties, text: String) = + output_sync("Isabelle.command " + IsabelleSyntax.encode_properties(props) + " " + + IsabelleSyntax.encode_string(text)) + + def output_ML(text: String) = + output_sync("ML_val " + IsabelleSyntax.encode_string(text)) + + def close() = synchronized { // FIXME watchdog/timeout + output_raw("\u0000") + closing = true + } + + def try_close() = synchronized { + if (proc != null && !closing) { + try { close() } + catch { case _: IsabelleProcessException => () } + } + } + + + /* stdin */ + + private class StdinThread(writer: BufferedWriter) extends Thread { + override def run() = { + var finished = false + while (!finished) { + try { + //{{{ + val s = output.dequeue + if (s == "\u0000") { + writer.close + finished = true + } + else { + put_result(Kind.STDIN, null, s) + writer.write(s) + writer.flush + } + //}}} + } + catch { + case e: IOException => put_result(Kind.SYSTEM, null, "Stdin thread: " + e.getMessage) + } + } + put_result(Kind.SYSTEM, null, "Stdin thread terminated") + } + } + + + /* stdout */ + + private class StdoutThread(reader: BufferedReader) extends Thread { + override def run() = { + var kind = Kind.STDOUT + var props: Properties = null + var result = new StringBuilder + + var finished = false + while (!finished) { + try { + if (kind == Kind.STDOUT) { + //{{{ Char mode + var c = -1 + var done = false + while (!done && (result.length == 0 || reader.ready)) { + c = reader.read + if (c > 0 && c != 2) result.append(c.asInstanceOf[Char]) + else done = true + } + if (result.length > 0) { + put_result(kind, null, result.toString) + result.length = 0 + } + if (c == -1) { + reader.close + finished = true + try_close() + } + else if (c == 2) { + reader.read match { + case 'A' => kind = Kind.WRITELN + case 'B' => kind = Kind.PRIORITY + case 'C' => kind = Kind.TRACING + case 'D' => kind = Kind.WARNING + case 'E' => kind = Kind.ERROR + case 'F' => kind = Kind.DEBUG + case 'G' => kind = Kind.PROMPT + case 'H' => kind = Kind.INIT + case 'I' => kind = Kind.STATUS + case _ => kind = Kind.STDOUT + } + props = null + } + //}}} + } + else { + //{{{ Line mode + val line = reader.readLine + if (line == null) { + reader.close + finished = true + try_close() + } + else { + val len = line.length + // property + if (line.endsWith("\u0002,")) { + val i = line.indexOf('=') + if (i > 0) { + val name = line.substring(0, i) + val value = line.substring(i + 1, len - 2) + if (props == null) props = new Properties + if (!props.containsKey(name)) { + props.setProperty(name, value) + } + } + } + // last text line + else if (line.endsWith("\u0002.")) { + if (kind == Kind.INIT && props != null) { + pid = props.getProperty("pid") + session = props.getProperty("session") + } + result.append(line.substring(0, len - 2)) + put_result(kind, props, result.toString) + result.length = 0 + kind = Kind.STDOUT + } + // text line + else { + result.append(line) + result.append('\n') + } + } + //}}} + } + } catch { + case e: IOException => put_result(Kind.SYSTEM, null, "Stdout thread: " + e.getMessage) + } + } + put_result(Kind.SYSTEM, null, "Stdout thread terminated") + } + } + + + /* stderr */ + + private class StderrThread(reader: BufferedReader) extends Thread { + override def run() = { + var result = new StringBuilder(100) + + var finished = false + while (!finished) { + try { + //{{{ + var c = -1 + var done = false + while (!done && (result.length == 0 || reader.ready)) { + c = reader.read + if (c > 0) result.append(c.asInstanceOf[Char]) + else done = true + } + if (result.length > 0) { + put_result(Kind.STDERR, null, result.toString) + result.length = 0 + } + else { + reader.close + finished = true + try_close() + } + //}}} + } catch { + case e: IOException => put_result(Kind.SYSTEM, null, "Stderr thread: " + e.getMessage) + } + } + put_result(Kind.SYSTEM, null, "Stderr thread terminated") + } + } + + + /* exit */ + + private class ExitThread extends Thread { + override def run() = { + val rc = proc.waitFor + Thread.sleep(300) + put_result(Kind.SYSTEM, null, "Exit thread terminated") + put_result(Kind.EXIT, null, Integer.toString(rc)) + proc = null // FIXME race!? + } + } + + + + /** main **/ + + { + /* command line */ + + val cmdline = { + val cmdline = new ArrayBuffer[String] + + IsabelleSystem.shell_prefix match { + case None => () + case Some(prefix) => cmdline + prefix + } + cmdline + IsabelleSystem.getenv_strict("ISABELLE_HOME") + "/bin/isabelle-process" + cmdline + "-W" + cmdline + "-m"; cmdline + "full_markup" // FIXME + for (arg <- args) cmdline + arg + cmdline.toArray + } + + try { proc = Runtime.getRuntime.exec(cmdline) } + catch { + case e: IOException => throw new IsabelleProcessException(e.getMessage) + } + + + /* process control via threads */ + + new StdinThread(new BufferedWriter(new OutputStreamWriter(proc.getOutputStream, charset))).start + new StdoutThread(new BufferedReader(new InputStreamReader(proc.getInputStream, charset))).start + new StderrThread(new BufferedReader(new InputStreamReader(proc.getErrorStream, charset))).start + new ExitThread().start + } + +} +