src/Pure/System/isabelle_process.scala
changeset 30173 eabece26b89b
parent 29648 ead544f3d6a1
child 31498 be0f7f4f9e12
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/System/isabelle_process.scala	Sat Feb 28 18:00:20 2009 +0100
@@ -0,0 +1,435 @@
+/*  Title:      Pure/System/isabelle_process.ML
+    Author:     Makarius
+    Options:    :folding=explicit:collapseFolds=1:
+
+Isabelle process management -- always reactive due to multi-threaded I/O.
+*/
+
+package isabelle
+
+import java.util.concurrent.LinkedBlockingQueue
+import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
+  InputStream, OutputStream, IOException}
+
+
+object IsabelleProcess {
+
+  /* results */
+
+  object Kind extends Enumeration {
+    //{{{ values and codes
+    // internal system notification
+    val SYSTEM = Value("SYSTEM")
+    // Posix channels/events
+    val STDIN = Value("STDIN")
+    val STDOUT = Value("STDOUT")
+    val SIGNAL = Value("SIGNAL")
+    val EXIT = Value("EXIT")
+    // Isabelle messages
+    val INIT = Value("INIT")
+    val STATUS = Value("STATUS")
+    val WRITELN = Value("WRITELN")
+    val PRIORITY = Value("PRIORITY")
+    val TRACING = Value("TRACING")
+    val WARNING = Value("WARNING")
+    val ERROR = Value("ERROR")
+    val DEBUG = Value("DEBUG")
+    // messages codes
+    val code = Map(
+      ('A' : Int) -> Kind.INIT,
+      ('B' : Int) -> Kind.STATUS,
+      ('C' : Int) -> Kind.WRITELN,
+      ('D' : Int) -> Kind.PRIORITY,
+      ('E' : Int) -> Kind.TRACING,
+      ('F' : Int) -> Kind.WARNING,
+      ('G' : Int) -> Kind.ERROR,
+      ('H' : Int) -> Kind.DEBUG,
+      ('0' : Int) -> Kind.SYSTEM,
+      ('1' : Int) -> Kind.STDIN,
+      ('2' : Int) -> Kind.STDOUT,
+      ('3' : Int) -> Kind.SIGNAL,
+      ('4' : Int) -> Kind.EXIT)
+    // message markup
+    val markup = Map(
+      Kind.INIT -> Markup.INIT,
+      Kind.STATUS -> Markup.STATUS,
+      Kind.WRITELN -> Markup.WRITELN,
+      Kind.PRIORITY -> Markup.PRIORITY,
+      Kind.TRACING -> Markup.TRACING,
+      Kind.WARNING -> Markup.WARNING,
+      Kind.ERROR -> Markup.ERROR,
+      Kind.DEBUG -> Markup.DEBUG,
+      Kind.SYSTEM -> Markup.SYSTEM,
+      Kind.STDIN -> Markup.STDIN,
+      Kind.STDOUT -> Markup.STDOUT,
+      Kind.SIGNAL -> Markup.SIGNAL,
+      Kind.EXIT -> Markup.EXIT)
+    //}}}
+    def is_raw(kind: Value) =
+      kind == STDOUT
+    def is_control(kind: Value) =
+      kind == SYSTEM ||
+      kind == SIGNAL ||
+      kind == EXIT
+    def is_system(kind: Value) =
+      kind == SYSTEM ||
+      kind == STDIN ||
+      kind == SIGNAL ||
+      kind == EXIT ||
+      kind == STATUS
+  }
+
+  class Result(val kind: Kind.Value, val props: List[(String, String)], val result: String) {
+    override def toString = {
+      val trees = YXML.parse_body_failsafe(result)
+      val res =
+        if (kind == Kind.STATUS) trees.map(_.toString).mkString
+        else trees.flatMap(XML.content(_).mkString).mkString
+      if (props.isEmpty)
+        kind.toString + " [[" + res + "]]"
+      else
+        kind.toString + " " +
+          (for ((x, y) <- props) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
+    }
+    def is_raw = Kind.is_raw(kind)
+    def is_control = Kind.is_control(kind)
+    def is_system = Kind.is_system(kind)
+  }
+
+  def parse_message(isabelle_system: IsabelleSystem, result: Result) =
+  {
+    XML.Elem(Markup.MESSAGE, (Markup.CLASS, Kind.markup(result.kind)) :: result.props,
+      YXML.parse_body_failsafe(isabelle_system.symbols.decode(result.result)))
+  }
+}
+
+
+class IsabelleProcess(isabelle_system: IsabelleSystem,
+  results: EventBus[IsabelleProcess.Result], args: String*)
+{
+  import IsabelleProcess._
+
+
+  /* demo constructor */
+
+  def this(args: String*) =
+    this(new IsabelleSystem, new EventBus[IsabelleProcess.Result] + Console.println, args: _*)
+
+
+  /* process information */
+
+  @volatile private var proc: Process = null
+  @volatile private var closing = false
+  @volatile private var pid: String = null
+  @volatile private var the_session: String = null
+  def session = the_session
+
+
+  /* results */
+
+  def parse_message(result: Result): XML.Tree =
+    IsabelleProcess.parse_message(isabelle_system, result)
+
+  private val result_queue = new LinkedBlockingQueue[Result]
+
+  private def put_result(kind: Kind.Value, props: List[(String, String)], result: String)
+  {
+    if (kind == Kind.INIT) {
+      val map = Map(props: _*)
+      if (map.isDefinedAt(Markup.PID)) pid = map(Markup.PID)
+      if (map.isDefinedAt(Markup.SESSION)) the_session = map(Markup.SESSION)
+    }
+    result_queue.put(new Result(kind, props, result))
+  }
+
+  private class ResultThread extends Thread("isabelle: results") {
+    override def run() = {
+      var finished = false
+      while (!finished) {
+        val result =
+          try { result_queue.take }
+          catch { case _: NullPointerException => null }
+
+        if (result != null) {
+          results.event(result)
+          if (result.kind == Kind.EXIT) finished = true
+        }
+        else finished = true
+      }
+    }
+  }
+
+
+  /* signals */
+
+  def interrupt() = synchronized {
+    if (proc == null) error("Cannot interrupt Isabelle: no process")
+    if (pid == null) put_result(Kind.SYSTEM, Nil, "Cannot interrupt: unknown pid")
+    else {
+      try {
+        if (isabelle_system.execute(true, "kill", "-INT", pid).waitFor == 0)
+          put_result(Kind.SIGNAL, Nil, "INT")
+        else
+          put_result(Kind.SYSTEM, Nil, "Cannot interrupt: kill command failed")
+      }
+      catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
+    }
+  }
+
+  def kill() = synchronized {
+    if (proc == 0) error("Cannot kill Isabelle: no process")
+    else {
+      try_close()
+      Thread.sleep(500)
+      put_result(Kind.SIGNAL, Nil, "KILL")
+      proc.destroy
+      proc = null
+      pid = null
+    }
+  }
+
+
+  /* output being piped into the process */
+
+  private val output = new LinkedBlockingQueue[String]
+
+  private def output_raw(text: String) = synchronized {
+    if (proc == null) error("Cannot output to Isabelle: no process")
+    if (closing) error("Cannot output to Isabelle: already closing")
+    output.put(text)
+  }
+
+  def output_sync(text: String) =
+    output_raw(" \\<^sync>\n; " + text + " \\<^sync>;\n")
+
+
+  def command(text: String) =
+    output_sync("Isabelle.command " + IsabelleSyntax.encode_string(text))
+
+  def command(props: List[(String, String)], text: String) =
+    output_sync("Isabelle.command " + IsabelleSyntax.encode_properties(props) + " " +
+      IsabelleSyntax.encode_string(text))
+
+  def ML(text: String) =
+    output_sync("ML_val " + IsabelleSyntax.encode_string(text))
+
+  def close() = synchronized {    // FIXME watchdog/timeout
+    output_raw("\u0000")
+    closing = true
+  }
+
+  def try_close() = synchronized {
+    if (proc != null && !closing) {
+      try { close() }
+      catch { case _: RuntimeException => }
+    }
+  }
+
+
+  /* stdin */
+
+  private class StdinThread(out_stream: OutputStream) extends Thread("isabelle: stdin") {
+    override def run() = {
+      val writer = new BufferedWriter(new OutputStreamWriter(out_stream, isabelle_system.charset))
+      var finished = false
+      while (!finished) {
+        try {
+          //{{{
+          val s = output.take
+          if (s == "\u0000") {
+            writer.close
+            finished = true
+          }
+          else {
+            put_result(Kind.STDIN, Nil, s)
+            writer.write(s)
+            writer.flush
+          }
+          //}}}
+        }
+        catch {
+          case e: IOException => put_result(Kind.SYSTEM, Nil, "Stdin thread: " + e.getMessage)
+        }
+      }
+      put_result(Kind.SYSTEM, Nil, "Stdin thread terminated")
+    }
+  }
+
+
+  /* stdout */
+
+  private class StdoutThread(in_stream: InputStream) extends Thread("isabelle: stdout") {
+    override def run() = {
+      val reader = new BufferedReader(new InputStreamReader(in_stream, isabelle_system.charset))
+      var result = new StringBuilder(100)
+
+      var finished = false
+      while (!finished) {
+        try {
+          //{{{
+          var c = -1
+          var done = false
+          while (!done && (result.length == 0 || reader.ready)) {
+            c = reader.read
+            if (c >= 0) result.append(c.asInstanceOf[Char])
+            else done = true
+          }
+          if (result.length > 0) {
+            put_result(Kind.STDOUT, Nil, result.toString)
+            result.length = 0
+          }
+          else {
+            reader.close
+            finished = true
+            try_close()
+          }
+          //}}}
+        }
+        catch {
+          case e: IOException => put_result(Kind.SYSTEM, Nil, "Stdout thread: " + e.getMessage)
+        }
+      }
+      put_result(Kind.SYSTEM, Nil, "Stdout thread terminated")
+    }
+  }
+
+
+  /* messages */
+
+  private class MessageThread(fifo: String) extends Thread("isabelle: messages") {
+    override def run() = {
+      val reader = isabelle_system.fifo_reader(fifo)
+      var kind: Kind.Value = null
+      var props: List[(String, String)] = Nil
+      var result = new StringBuilder
+
+      var finished = false
+      while (!finished) {
+        try {
+          if (kind == null) {
+            //{{{ Char mode -- resync
+            var c = -1
+            do {
+              c = reader.read
+              if (c >= 0 && c != 2) result.append(c.asInstanceOf[Char])
+            } while (c >= 0 && c != 2)
+
+            if (result.length > 0) {
+              put_result(Kind.SYSTEM, Nil, "Malformed message:\n" + result.toString)
+              result.length = 0
+            }
+            if (c < 0) {
+              reader.close
+              finished = true
+              try_close()
+            }
+            else {
+              c = reader.read
+              if (Kind.code.isDefinedAt(c)) kind = Kind.code(c)
+              else kind = null
+            }
+            //}}}
+          }
+          else {
+            //{{{ Line mode
+            val line = reader.readLine
+            if (line == null) {
+              reader.close
+              finished = true
+              try_close()
+            }
+            else {
+              val len = line.length
+              // property
+              if (line.endsWith("\u0002,")) {
+                val i = line.indexOf('=')
+                if (i > 0) {
+                  val name = line.substring(0, i)
+                  val value = line.substring(i + 1, len - 2)
+                  props = (name, value) :: props
+                }
+              }
+              // last text line
+              else if (line.endsWith("\u0002.")) {
+                result.append(line.substring(0, len - 2))
+                put_result(kind, props.reverse, result.toString)
+                kind = null
+                props = Nil
+                result.length = 0
+              }
+              // text line
+              else {
+                result.append(line)
+                result.append('\n')
+              }
+            }
+            //}}}
+          }
+        }
+        catch {
+          case e: IOException => put_result(Kind.SYSTEM, Nil, "Message thread: " + e.getMessage)
+        }
+      }
+      put_result(Kind.SYSTEM, Nil, "Message thread terminated")
+    }
+  }
+
+
+
+  /** main **/
+
+  {
+    /* isabelle version */
+
+    {
+      val (msg, rc) = isabelle_system.isabelle_tool("version")
+      if (rc != 0) error("Version check failed -- bad Isabelle installation:\n" + msg)
+      put_result(Kind.SYSTEM, Nil, msg)
+    }
+
+
+    /* messages */
+
+    val message_fifo = isabelle_system.mk_fifo()
+    def rm_fifo() = isabelle_system.rm_fifo(message_fifo)
+
+    val message_thread = new MessageThread(message_fifo)
+    message_thread.start
+
+    new ResultThread().start
+
+
+    /* exec process */
+
+    try {
+      val cmdline =
+        List(isabelle_system.getenv_strict("ISABELLE_PROCESS"), "-W", message_fifo) ++ args
+      proc = isabelle_system.execute(true, cmdline: _*)
+    }
+    catch {
+      case e: IOException =>
+        rm_fifo()
+        error("Failed to execute Isabelle process: " + e.getMessage)
+    }
+
+
+    /* stdin/stdout */
+
+    new StdinThread(proc.getOutputStream).start
+    new StdoutThread(proc.getInputStream).start
+
+
+    /* exit */
+
+    new Thread("isabelle: exit") {
+      override def run() = {
+        val rc = proc.waitFor()
+        Thread.sleep(300)
+        put_result(Kind.SYSTEM, Nil, "Exit thread terminated")
+        put_result(Kind.EXIT, Nil, Integer.toString(rc))
+        rm_fifo()
+      }
+    }.start
+
+  }
+}