src/Pure/System/isabelle_process.scala
author wenzelm
Tue, 01 Sep 2009 21:03:04 +0200
changeset 32474 0818e6b1c8a6
parent 32448 a89f876731c5
child 34100 ea24958c2af5
permissions -rw-r--r--
Isabelle_Process: receiver as Actor, not EventBus; removed misleading Isabelle_Process.parse_message method -- use plain function instead;

/*  Title:      Pure/System/isabelle_process.ML
    Author:     Makarius
    Options:    :folding=explicit:collapseFolds=1:

Isabelle process management -- always reactive due to multi-threaded I/O.
*/

package isabelle

import java.util.concurrent.LinkedBlockingQueue
import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
  InputStream, OutputStream, IOException}

import scala.actors.Actor
import Actor._


object Isabelle_Process
{
  /* results */

  object Kind extends Enumeration {
    //{{{ values and codes
    // internal system notification
    val SYSTEM = Value("SYSTEM")
    // Posix channels/events
    val STDIN = Value("STDIN")
    val STDOUT = Value("STDOUT")
    val SIGNAL = Value("SIGNAL")
    val EXIT = Value("EXIT")
    // Isabelle messages
    val INIT = Value("INIT")
    val STATUS = Value("STATUS")
    val WRITELN = Value("WRITELN")
    val PRIORITY = Value("PRIORITY")
    val TRACING = Value("TRACING")
    val WARNING = Value("WARNING")
    val ERROR = Value("ERROR")
    val DEBUG = Value("DEBUG")
    // messages codes
    val code = Map(
      ('A' : Int) -> Kind.INIT,
      ('B' : Int) -> Kind.STATUS,
      ('C' : Int) -> Kind.WRITELN,
      ('D' : Int) -> Kind.PRIORITY,
      ('E' : Int) -> Kind.TRACING,
      ('F' : Int) -> Kind.WARNING,
      ('G' : Int) -> Kind.ERROR,
      ('H' : Int) -> Kind.DEBUG,
      ('0' : Int) -> Kind.SYSTEM,
      ('1' : Int) -> Kind.STDIN,
      ('2' : Int) -> Kind.STDOUT,
      ('3' : Int) -> Kind.SIGNAL,
      ('4' : Int) -> Kind.EXIT)
    // message markup
    val markup = Map(
      Kind.INIT -> Markup.INIT,
      Kind.STATUS -> Markup.STATUS,
      Kind.WRITELN -> Markup.WRITELN,
      Kind.PRIORITY -> Markup.PRIORITY,
      Kind.TRACING -> Markup.TRACING,
      Kind.WARNING -> Markup.WARNING,
      Kind.ERROR -> Markup.ERROR,
      Kind.DEBUG -> Markup.DEBUG,
      Kind.SYSTEM -> Markup.SYSTEM,
      Kind.STDIN -> Markup.STDIN,
      Kind.STDOUT -> Markup.STDOUT,
      Kind.SIGNAL -> Markup.SIGNAL,
      Kind.EXIT -> Markup.EXIT)
    //}}}
    def is_raw(kind: Value) =
      kind == STDOUT
    def is_control(kind: Value) =
      kind == SYSTEM ||
      kind == SIGNAL ||
      kind == EXIT
    def is_system(kind: Value) =
      kind == SYSTEM ||
      kind == STDIN ||
      kind == SIGNAL ||
      kind == EXIT ||
      kind == STATUS
  }

  class Result(val kind: Kind.Value, val props: List[(String, String)], val result: String) {
    override def toString = {
      val trees = YXML.parse_body_failsafe(result)
      val res =
        if (kind == Kind.STATUS) trees.map(_.toString).mkString
        else trees.flatMap(XML.content(_).mkString).mkString
      if (props.isEmpty)
        kind.toString + " [[" + res + "]]"
      else
        kind.toString + " " +
          (for ((x, y) <- props) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    }
    def is_raw = Kind.is_raw(kind)
    def is_control = Kind.is_control(kind)
    def is_system = Kind.is_system(kind)
  }

  def parse_message(isabelle_system: Isabelle_System, result: Result) =
  {
    XML.Elem(Markup.MESSAGE, (Markup.CLASS, Kind.markup(result.kind)) :: result.props,
      YXML.parse_body_failsafe(isabelle_system.symbols.decode(result.result)))
  }
}


class Isabelle_Process(isabelle_system: Isabelle_System, receiver: Actor, args: String*)
{
  import Isabelle_Process._


  /* demo constructor */

  def this(args: String*) =
    this(new Isabelle_System,
      new Actor { def act = loop { react { case res => Console.println(res) } } }.start, args: _*)


  /* process information */

  @volatile private var proc: Process = null
  @volatile private var closing = false
  @volatile private var pid: String = null
  @volatile private var the_session: String = null
  def session = the_session


  /* results */

  private def put_result(kind: Kind.Value, props: List[(String, String)], result: String)
  {
    if (kind == Kind.INIT) {
      val map = Map(props: _*)
      if (map.isDefinedAt(Markup.PID)) pid = map(Markup.PID)
      if (map.isDefinedAt(Markup.SESSION)) the_session = map(Markup.SESSION)
    }
    receiver ! new Result(kind, props, result)
  }


  /* signals */

  def interrupt() = synchronized {
    if (proc == null) error("Cannot interrupt Isabelle: no process")
    if (pid == null) put_result(Kind.SYSTEM, Nil, "Cannot interrupt: unknown pid")
    else {
      try {
        if (isabelle_system.execute(true, "kill", "-INT", pid).waitFor == 0)
          put_result(Kind.SIGNAL, Nil, "INT")
        else
          put_result(Kind.SYSTEM, Nil, "Cannot interrupt: kill command failed")
      }
      catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
    }
  }

  def kill() = synchronized {
    if (proc == 0) error("Cannot kill Isabelle: no process")
    else {
      try_close()
      Thread.sleep(500)
      put_result(Kind.SIGNAL, Nil, "KILL")
      proc.destroy
      proc = null
      pid = null
    }
  }


  /* output being piped into the process */

  private val output = new LinkedBlockingQueue[String]

  private def output_raw(text: String) = synchronized {
    if (proc == null) error("Cannot output to Isabelle: no process")
    if (closing) error("Cannot output to Isabelle: already closing")
    output.put(text)
  }

  def output_sync(text: String) =
    output_raw(" \\<^sync>\n; " + text + " \\<^sync>;\n")


  def command(text: String) =
    output_sync("Isabelle.command " + Isabelle_Syntax.encode_string(text))

  def command(props: List[(String, String)], text: String) =
    output_sync("Isabelle.command " + Isabelle_Syntax.encode_properties(props) + " " +
      Isabelle_Syntax.encode_string(text))

  def ML(text: String) =
    output_sync("ML_val " + Isabelle_Syntax.encode_string(text))

  def close() = synchronized {    // FIXME watchdog/timeout
    output_raw("\u0000")
    closing = true
  }

  def try_close() = synchronized {
    if (proc != null && !closing) {
      try { close() }
      catch { case _: RuntimeException => }
    }
  }


  /* stdin */

  private class StdinThread(out_stream: OutputStream) extends Thread("isabelle: stdin") {
    override def run() = {
      val writer = new BufferedWriter(new OutputStreamWriter(out_stream, Isabelle_System.charset))
      var finished = false
      while (!finished) {
        try {
          //{{{
          val s = output.take
          if (s == "\u0000") {
            writer.close
            finished = true
          }
          else {
            put_result(Kind.STDIN, Nil, s)
            writer.write(s)
            writer.flush
          }
          //}}}
        }
        catch {
          case e: IOException => put_result(Kind.SYSTEM, Nil, "Stdin thread: " + e.getMessage)
        }
      }
      put_result(Kind.SYSTEM, Nil, "Stdin thread terminated")
    }
  }


  /* stdout */

  private class StdoutThread(in_stream: InputStream) extends Thread("isabelle: stdout") {
    override def run() = {
      val reader = new BufferedReader(new InputStreamReader(in_stream, Isabelle_System.charset))
      var result = new StringBuilder(100)

      var finished = false
      while (!finished) {
        try {
          //{{{
          var c = -1
          var done = false
          while (!done && (result.length == 0 || reader.ready)) {
            c = reader.read
            if (c >= 0) result.append(c.asInstanceOf[Char])
            else done = true
          }
          if (result.length > 0) {
            put_result(Kind.STDOUT, Nil, result.toString)
            result.length = 0
          }
          else {
            reader.close
            finished = true
            try_close()
          }
          //}}}
        }
        catch {
          case e: IOException => put_result(Kind.SYSTEM, Nil, "Stdout thread: " + e.getMessage)
        }
      }
      put_result(Kind.SYSTEM, Nil, "Stdout thread terminated")
    }
  }


  /* messages */

  private class MessageThread(fifo: String) extends Thread("isabelle: messages") {
    override def run() = {
      val reader = isabelle_system.fifo_reader(fifo)
      var kind: Kind.Value = null
      var props: List[(String, String)] = Nil
      var result = new StringBuilder

      var finished = false
      while (!finished) {
        try {
          if (kind == null) {
            //{{{ Char mode -- resync
            var c = -1
            do {
              c = reader.read
              if (c >= 0 && c != 2) result.append(c.asInstanceOf[Char])
            } while (c >= 0 && c != 2)

            if (result.length > 0) {
              put_result(Kind.SYSTEM, Nil, "Malformed message:\n" + result.toString)
              result.length = 0
            }
            if (c < 0) {
              reader.close
              finished = true
              try_close()
            }
            else {
              c = reader.read
              if (Kind.code.isDefinedAt(c)) kind = Kind.code(c)
              else kind = null
            }
            //}}}
          }
          else {
            //{{{ Line mode
            val line = reader.readLine
            if (line == null) {
              reader.close
              finished = true
              try_close()
            }
            else {
              val len = line.length
              // property
              if (line.endsWith("\u0002,")) {
                val i = line.indexOf('=')
                if (i > 0) {
                  val name = line.substring(0, i)
                  val value = line.substring(i + 1, len - 2)
                  props = (name, value) :: props
                }
              }
              // last text line
              else if (line.endsWith("\u0002.")) {
                result.append(line.substring(0, len - 2))
                put_result(kind, props.reverse, result.toString)
                kind = null
                props = Nil
                result.length = 0
              }
              // text line
              else {
                result.append(line)
                result.append('\n')
              }
            }
            //}}}
          }
        }
        catch {
          case e: IOException => put_result(Kind.SYSTEM, Nil, "Message thread: " + e.getMessage)
        }
      }
      put_result(Kind.SYSTEM, Nil, "Message thread terminated")
    }
  }



  /** main **/

  {
    /* isabelle version */

    {
      val (msg, rc) = isabelle_system.isabelle_tool("version")
      if (rc != 0) error("Version check failed -- bad Isabelle installation:\n" + msg)
      put_result(Kind.SYSTEM, Nil, msg)
    }


    /* messages */

    val message_fifo = isabelle_system.mk_fifo()
    def rm_fifo() = isabelle_system.rm_fifo(message_fifo)

    val message_thread = new MessageThread(message_fifo)
    message_thread.start


    /* exec process */

    try {
      val cmdline =
        List(isabelle_system.getenv_strict("ISABELLE_PROCESS"), "-W", message_fifo) ++ args
      proc = isabelle_system.execute(true, cmdline: _*)
    }
    catch {
      case e: IOException =>
        rm_fifo()
        error("Failed to execute Isabelle process: " + e.getMessage)
    }


    /* stdin/stdout */

    new StdinThread(proc.getOutputStream).start
    new StdoutThread(proc.getInputStream).start


    /* exit */

    new Thread("isabelle: exit") {
      override def run() = {
        val rc = proc.waitFor()
        Thread.sleep(300)
        put_result(Kind.SYSTEM, Nil, "Exit thread terminated")
        put_result(Kind.EXIT, Nil, Integer.toString(rc))
        rm_fifo()
      }
    }.start

  }
}