src/Pure/System/isabelle_process.scala
author wenzelm
Sun Sep 19 22:40:22 2010 +0200 (2010-09-19 ago)
changeset 39528 c01d89d18ff0
parent 39526 f1296795a8dc
child 39530 16adc476348f
permissions -rw-r--r--
refined Isabelle_Process startup: emit \002 before rendezvous on fifos, more robust treatment of startup failure with timeout, do not quit() after main loop;
tuned;
     1 /*  Title:      Pure/System/isabelle_process.ML
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.util.concurrent.LinkedBlockingQueue
    11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    12   InputStream, OutputStream, BufferedOutputStream, IOException}
    13 
    14 import scala.actors.Actor
    15 import Actor._
    16 
    17 
    18 object Isabelle_Process
    19 {
    20   /* results */
    21 
    22   object Kind
    23   {
    24     val message_markup = Map(
    25       ('A' : Int) -> Markup.INIT,
    26       ('B' : Int) -> Markup.STATUS,
    27       ('C' : Int) -> Markup.REPORT,
    28       ('D' : Int) -> Markup.WRITELN,
    29       ('E' : Int) -> Markup.TRACING,
    30       ('F' : Int) -> Markup.WARNING,
    31       ('G' : Int) -> Markup.ERROR)
    32   }
    33 
    34   class Result(val message: XML.Elem)
    35   {
    36     def kind = message.markup.name
    37     def properties = message.markup.properties
    38     def body = message.body
    39 
    40     def is_init = kind == Markup.INIT
    41     def is_exit = kind == Markup.EXIT
    42     def is_stdout = kind == Markup.STDOUT
    43     def is_system = kind == Markup.SYSTEM
    44     def is_status = kind == Markup.STATUS
    45     def is_report = kind == Markup.REPORT
    46     def is_ready = is_status && body == List(XML.Elem(Markup.Ready, Nil))
    47 
    48     override def toString: String =
    49     {
    50       val res =
    51         if (is_status || is_report) message.body.map(_.toString).mkString
    52         else Pretty.string_of(message.body)
    53       if (properties.isEmpty)
    54         kind.toString + " [[" + res + "]]"
    55       else
    56         kind.toString + " " +
    57           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    58     }
    59   }
    60 }
    61 
    62 
    63 class Isabelle_Process(system: Isabelle_System, timeout: Int, receiver: Actor, args: String*)
    64 {
    65   import Isabelle_Process._
    66 
    67 
    68   /* demo constructor */
    69 
    70   def this(args: String*) =
    71     this(new Isabelle_System, 10000,
    72       actor { loop { react { case res => Console.println(res) } } }, args: _*)
    73 
    74 
    75   /* input actors */
    76 
    77   private case class Input_Text(text: String)
    78   private case class Input_Chunks(chunks: List[Array[Byte]])
    79 
    80   private case object Close
    81   private def close(a: Actor) { if (a != null) a ! Close }
    82 
    83   @volatile private var standard_input: Actor = null
    84   @volatile private var command_input: Actor = null
    85 
    86 
    87   /* process manager */
    88 
    89   private val in_fifo = system.mk_fifo()
    90   private val out_fifo = system.mk_fifo()
    91   private def rm_fifos() { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
    92 
    93   private val proc =
    94     try {
    95       val cmdline =
    96         List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
    97       system.execute(true, cmdline: _*)
    98     }
    99     catch { case e: IOException => rm_fifos(); throw(e) }
   100 
   101   private val stdout_reader =
   102     new BufferedReader(new InputStreamReader(proc.getInputStream, Standard_System.charset))
   103 
   104   private val stdin_writer =
   105     new BufferedWriter(new OutputStreamWriter(proc.getOutputStream, Standard_System.charset))
   106 
   107   Simple_Thread.actor("process_manager") {
   108     val (startup_failed, startup_output) =
   109     {
   110       val expired = System.currentTimeMillis() + timeout
   111       val result = new StringBuilder(100)
   112 
   113       var finished = false
   114       while (!finished && System.currentTimeMillis() <= expired) {
   115         while (!finished && stdout_reader.ready) {
   116           val c = stdout_reader.read
   117           if (c == 2) finished = true
   118           else result += c.toChar
   119         }
   120         Thread.sleep(10)
   121       }
   122       (!finished, result.toString)
   123     }
   124     if (startup_failed) {
   125       put_result(Markup.STDOUT, startup_output)
   126       put_result(Markup.EXIT, "127")
   127       stdin_writer.close
   128       Thread.sleep(300)  // FIXME !?
   129       proc.destroy  // FIXME reliable!?
   130     }
   131     else {
   132       put_result(Markup.SYSTEM, startup_output)
   133 
   134       standard_input = stdin_actor()
   135       stdout_actor()
   136       command_input = input_actor()
   137       message_actor()
   138 
   139       val rc = proc.waitFor()
   140       Thread.sleep(300)  // FIXME !?
   141       system_result("Isabelle process terminated")
   142       put_result(Markup.EXIT, rc.toString)
   143     }
   144     rm_fifos()
   145   }
   146 
   147 
   148   /* results */
   149 
   150   private def system_result(text: String)
   151   {
   152     receiver ! new Result(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))
   153   }
   154 
   155 
   156   private val xml_cache = new XML.Cache(131071)
   157 
   158   private def put_result(kind: String, props: List[(String, String)], body: XML.Body)
   159   {
   160     if (pid.isEmpty && kind == Markup.INIT) {
   161       props.find(_._1 == Markup.PID).map(_._1) match {
   162         case None => system_result("Bad Isabelle process initialization: missing pid")
   163         case p => pid = p
   164       }
   165     }
   166 
   167     val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
   168     xml_cache.cache_tree(msg)((message: XML.Tree) =>
   169       receiver ! new Result(message.asInstanceOf[XML.Elem]))
   170   }
   171 
   172   private def put_result(kind: String, text: String)
   173   {
   174     put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
   175   }
   176 
   177 
   178   /* signals */
   179 
   180   @volatile private var pid: Option[String] = None
   181 
   182   def interrupt()
   183   {
   184     pid match {
   185       case None => system_result("Cannot interrupt Isabelle: unknowd pid")
   186       case Some(i) =>
   187         try {
   188           if (system.execute(true, "kill", "-INT", i).waitFor == 0)
   189             system_result("Interrupt Isabelle")
   190           else
   191             system_result("Cannot interrupt Isabelle: kill command failed")
   192         }
   193         catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
   194     }
   195   }
   196 
   197   def kill()
   198   {
   199     val running =
   200       try { proc.exitValue; false }
   201       catch { case e: java.lang.IllegalThreadStateException => true }
   202     if (running) {
   203       close()
   204       Thread.sleep(500)  // FIXME !?
   205       system_result("Kill Isabelle")
   206       proc.destroy
   207     }
   208   }
   209 
   210 
   211 
   212   /** stream actors **/
   213 
   214   /* raw stdin */
   215 
   216   private def stdin_actor(): Actor =
   217   {
   218     val name = "standard_input"
   219     Simple_Thread.actor(name) {
   220       var finished = false
   221       while (!finished) {
   222         try {
   223           //{{{
   224           receive {
   225             case Input_Text(text) =>
   226               stdin_writer.write(text)
   227               stdin_writer.flush
   228             case Close =>
   229               stdin_writer.close
   230               finished = true
   231             case bad => System.err.println(name + ": ignoring bad message " + bad)
   232           }
   233           //}}}
   234         }
   235         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   236       }
   237       system_result(name + " terminated")
   238     }
   239   }
   240 
   241 
   242   /* raw stdout */
   243 
   244   private def stdout_actor(): Actor =
   245   {
   246     val name = "standard_output"
   247     Simple_Thread.actor(name) {
   248       var result = new StringBuilder(100)
   249 
   250       var finished = false
   251       while (!finished) {
   252         try {
   253           //{{{
   254           var c = -1
   255           var done = false
   256           while (!done && (result.length == 0 || stdout_reader.ready)) {
   257             c = stdout_reader.read
   258             if (c >= 0) result.append(c.asInstanceOf[Char])
   259             else done = true
   260           }
   261           if (result.length > 0) {
   262             put_result(Markup.STDOUT, result.toString)
   263             result.length = 0
   264           }
   265           else {
   266             stdout_reader.close
   267             finished = true
   268           }
   269           //}}}
   270         }
   271         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   272       }
   273       system_result(name + " terminated")
   274     }
   275   }
   276 
   277 
   278   /* command input */
   279 
   280   private def input_actor(): Actor =
   281   {
   282     val name = "command_input"
   283     Simple_Thread.actor(name) {
   284       val stream = new BufferedOutputStream(system.fifo_output_stream(in_fifo))
   285       var finished = false
   286       while (!finished) {
   287         try {
   288           //{{{
   289           receive {
   290             case Input_Chunks(chunks) =>
   291               stream.write(Standard_System.string_bytes(
   292                   chunks.map(_.length).mkString("", ",", "\n")));
   293               chunks.foreach(stream.write(_));
   294               stream.flush
   295             case Close =>
   296               stream.close
   297               finished = true
   298             case bad => System.err.println(name + ": ignoring bad message " + bad)
   299           }
   300           //}}}
   301         }
   302         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   303       }
   304       system_result(name + " terminated")
   305     }
   306   }
   307 
   308 
   309   /* message output */
   310 
   311   private def message_actor(): Actor =
   312   {
   313     class EOF extends Exception
   314     class Protocol_Error(msg: String) extends Exception(msg)
   315 
   316     val name = "message_output"
   317     Simple_Thread.actor(name) {
   318       val stream = system.fifo_input_stream(out_fifo)
   319       val default_buffer = new Array[Byte](65536)
   320       var c = -1
   321 
   322       def read_chunk(): XML.Body =
   323       {
   324         //{{{
   325         // chunk size
   326         var n = 0
   327         c = stream.read
   328         if (c == -1) throw new EOF
   329         while (48 <= c && c <= 57) {
   330           n = 10 * n + (c - 48)
   331           c = stream.read
   332         }
   333         if (c != 10) throw new Protocol_Error("bad message chunk header")
   334 
   335         // chunk content
   336         val buf =
   337           if (n <= default_buffer.size) default_buffer
   338           else new Array[Byte](n)
   339 
   340         var i = 0
   341         var m = 0
   342         do {
   343           m = stream.read(buf, i, n - i)
   344           i += m
   345         } while (m > 0 && n > i)
   346 
   347         if (i != n) throw new Protocol_Error("bad message chunk content")
   348 
   349         YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
   350         //}}}
   351       }
   352 
   353       do {
   354         try {
   355           val header = read_chunk()
   356           val body = read_chunk()
   357           header match {
   358             case List(XML.Elem(Markup(name, props), Nil))
   359                 if name.size == 1 && Kind.message_markup.isDefinedAt(name(0)) =>
   360               put_result(Kind.message_markup(name(0)), props, body)
   361             case _ => throw new Protocol_Error("bad header: " + header.toString)
   362           }
   363         }
   364         catch {
   365           case e: IOException => system_result("Cannot read message:\n" + e.getMessage)
   366           case e: Protocol_Error => system_result("Malformed message:\n" + e.getMessage)
   367           case _: EOF =>
   368         }
   369       } while (c != -1)
   370       stream.close
   371 
   372       system_result(name + " terminated")
   373     }
   374   }
   375 
   376 
   377   /** main methods **/
   378 
   379   def input_raw(text: String): Unit = standard_input ! Input_Text(text)
   380 
   381   def input_bytes(name: String, args: Array[Byte]*): Unit =
   382     command_input ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
   383 
   384   def input(name: String, args: String*): Unit =
   385     input_bytes(name, args.map(Standard_System.string_bytes): _*)
   386 
   387   def close(): Unit = { close(command_input); close(standard_input) }
   388 }