src/Pure/System/isabelle_process.scala
author wenzelm
Mon Sep 20 21:20:06 2010 +0200 (2010-09-20 ago)
changeset 39572 bb3469024b6a
parent 39530 16adc476348f
child 39573 a874ca3f5474
permissions -rw-r--r--
added Isabelle_Process.syslog;
refined Isabelle_Process.process_manager: startup_output via syslog, explicit join of auxiliary threads;
tuned;
     1 /*  Title:      Pure/System/isabelle_process.ML
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.util.concurrent.LinkedBlockingQueue
    11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    12   InputStream, OutputStream, BufferedOutputStream, IOException}
    13 
    14 import scala.actors.Actor
    15 import Actor._
    16 import scala.collection.mutable
    17 
    18 
    19 object Isabelle_Process
    20 {
    21   /* results */
    22 
    23   object Kind
    24   {
    25     val message_markup = Map(
    26       ('A' : Int) -> Markup.INIT,
    27       ('B' : Int) -> Markup.STATUS,
    28       ('C' : Int) -> Markup.REPORT,
    29       ('D' : Int) -> Markup.WRITELN,
    30       ('E' : Int) -> Markup.TRACING,
    31       ('F' : Int) -> Markup.WARNING,
    32       ('G' : Int) -> Markup.ERROR)
    33   }
    34 
    35   class Result(val message: XML.Elem)
    36   {
    37     def kind = message.markup.name
    38     def properties = message.markup.properties
    39     def body = message.body
    40 
    41     def is_init = kind == Markup.INIT
    42     def is_exit = kind == Markup.EXIT
    43     def is_stdout = kind == Markup.STDOUT
    44     def is_system = kind == Markup.SYSTEM
    45     def is_status = kind == Markup.STATUS
    46     def is_report = kind == Markup.REPORT
    47     def is_ready = is_status && body == List(XML.Elem(Markup.Ready, Nil))
    48 
    49     override def toString: String =
    50     {
    51       val res =
    52         if (is_status || is_report) message.body.map(_.toString).mkString
    53         else Pretty.string_of(message.body)
    54       if (properties.isEmpty)
    55         kind.toString + " [[" + res + "]]"
    56       else
    57         kind.toString + " " +
    58           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    59     }
    60   }
    61 }
    62 
    63 
    64 class Isabelle_Process(system: Isabelle_System, timeout: Int, receiver: Actor, args: String*)
    65 {
    66   import Isabelle_Process._
    67 
    68 
    69   /* demo constructor */
    70 
    71   def this(args: String*) =
    72     this(new Isabelle_System, 10000,
    73       actor { loop { react { case res => Console.println(res) } } }, args: _*)
    74 
    75 
    76   /* input actors */
    77 
    78   private case class Input_Text(text: String)
    79   private case class Input_Chunks(chunks: List[Array[Byte]])
    80 
    81   private case object Close
    82   private def close(a: Actor) { if (a != null) a ! Close }
    83 
    84   @volatile private var standard_input: Actor = null
    85   @volatile private var command_input: Actor = null
    86 
    87 
    88   /* process manager */
    89 
    90   private val in_fifo = system.mk_fifo()
    91   private val out_fifo = system.mk_fifo()
    92   private def rm_fifos() { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
    93 
    94   private val proc =
    95     try {
    96       val cmdline =
    97         List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
    98       system.execute(true, cmdline: _*)
    99     }
   100     catch { case e: IOException => rm_fifos(); throw(e) }
   101 
   102   private val stdout_reader =
   103     new BufferedReader(new InputStreamReader(proc.getInputStream, Standard_System.charset))
   104 
   105   private val stdin_writer =
   106     new BufferedWriter(new OutputStreamWriter(proc.getOutputStream, Standard_System.charset))
   107 
   108   private val (process_manager, _) = Simple_Thread.actor("process_manager")
   109   {
   110     val (startup_failed, startup_output) =
   111     {
   112       val expired = System.currentTimeMillis() + timeout
   113       val result = new StringBuilder(100)
   114 
   115       var finished = false
   116       while (!finished && System.currentTimeMillis() <= expired) {
   117         while (!finished && stdout_reader.ready) {
   118           val c = stdout_reader.read
   119           if (c == 2) finished = true
   120           else result += c.toChar
   121         }
   122         Thread.sleep(10)
   123       }
   124       (!finished, result.toString)
   125     }
   126     system_result(startup_output)
   127 
   128     if (startup_failed) {
   129       put_result(Markup.EXIT, "127")
   130       stdin_writer.close
   131       Thread.sleep(300)  // FIXME !?
   132       proc.destroy  // FIXME unreliable
   133     }
   134     else {
   135       // rendezvous
   136       val command_stream = system.fifo_output_stream(in_fifo)
   137       val message_stream = system.fifo_input_stream(out_fifo)
   138 
   139       val stdin = stdin_actor(); standard_input = stdin._2
   140       val stdout = stdout_actor()
   141       val input = input_actor(command_stream); command_input = input._2
   142       val message = message_actor(message_stream)
   143 
   144       val rc = proc.waitFor()
   145       system_result("Isabelle process terminated")
   146       for ((thread, _) <- List(stdin, stdout, input, message)) thread.join
   147       system_result("process_manager terminated")
   148       put_result(Markup.EXIT, rc.toString)
   149     }
   150     rm_fifos()
   151   }
   152 
   153   def join() { process_manager.join() }
   154 
   155 
   156   /* system log */
   157 
   158   private val system_results = new mutable.ListBuffer[String]
   159 
   160   private def system_result(text: String)
   161   {
   162     synchronized { system_results += text }
   163     receiver ! new Result(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))
   164   }
   165 
   166   def syslog(): List[String] = synchronized { system_results.toList }
   167 
   168 
   169   /* results */
   170 
   171   private val xml_cache = new XML.Cache(131071)
   172 
   173   private def put_result(kind: String, props: List[(String, String)], body: XML.Body)
   174   {
   175     if (pid.isEmpty && kind == Markup.INIT) {
   176       rm_fifos()
   177       props.find(_._1 == Markup.PID).map(_._1) match {
   178         case None => system_result("Bad Isabelle process initialization: missing pid")
   179         case p => pid = p
   180       }
   181     }
   182 
   183     val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
   184     xml_cache.cache_tree(msg)((message: XML.Tree) =>
   185       receiver ! new Result(message.asInstanceOf[XML.Elem]))
   186   }
   187 
   188   private def put_result(kind: String, text: String)
   189   {
   190     put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
   191   }
   192 
   193 
   194   /* signals */
   195 
   196   @volatile private var pid: Option[String] = None
   197 
   198   def interrupt()
   199   {
   200     pid match {
   201       case None => system_result("Cannot interrupt Isabelle: unknowd pid")
   202       case Some(i) =>
   203         try {
   204           if (system.execute(true, "kill", "-INT", i).waitFor == 0)
   205             system_result("Interrupt Isabelle")
   206           else
   207             system_result("Cannot interrupt Isabelle: kill command failed")
   208         }
   209         catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
   210     }
   211   }
   212 
   213   def kill()
   214   {
   215     val running =
   216       try { proc.exitValue; false }
   217       catch { case e: java.lang.IllegalThreadStateException => true }
   218     if (running) {
   219       close()
   220       Thread.sleep(500)  // FIXME !?
   221       system_result("Kill Isabelle")
   222       proc.destroy
   223     }
   224   }
   225 
   226 
   227 
   228   /** stream actors **/
   229 
   230   /* raw stdin */
   231 
   232   private def stdin_actor(): (Thread, Actor) =
   233   {
   234     val name = "standard_input"
   235     Simple_Thread.actor(name) {
   236       var finished = false
   237       while (!finished) {
   238         try {
   239           //{{{
   240           receive {
   241             case Input_Text(text) =>
   242               stdin_writer.write(text)
   243               stdin_writer.flush
   244             case Close =>
   245               stdin_writer.close
   246               finished = true
   247             case bad => System.err.println(name + ": ignoring bad message " + bad)
   248           }
   249           //}}}
   250         }
   251         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   252       }
   253       system_result(name + " terminated")
   254     }
   255   }
   256 
   257 
   258   /* raw stdout */
   259 
   260   private def stdout_actor(): (Thread, Actor) =
   261   {
   262     val name = "standard_output"
   263     Simple_Thread.actor(name) {
   264       var result = new StringBuilder(100)
   265 
   266       var finished = false
   267       while (!finished) {
   268         try {
   269           //{{{
   270           var c = -1
   271           var done = false
   272           while (!done && (result.length == 0 || stdout_reader.ready)) {
   273             c = stdout_reader.read
   274             if (c >= 0) result.append(c.asInstanceOf[Char])
   275             else done = true
   276           }
   277           if (result.length > 0) {
   278             put_result(Markup.STDOUT, result.toString)
   279             result.length = 0
   280           }
   281           else {
   282             stdout_reader.close
   283             finished = true
   284           }
   285           //}}}
   286         }
   287         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   288       }
   289       system_result(name + " terminated")
   290     }
   291   }
   292 
   293 
   294   /* command input */
   295 
   296   private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
   297   {
   298     val name = "command_input"
   299     Simple_Thread.actor(name) {
   300       val stream = new BufferedOutputStream(raw_stream)
   301       var finished = false
   302       while (!finished) {
   303         try {
   304           //{{{
   305           receive {
   306             case Input_Chunks(chunks) =>
   307               stream.write(Standard_System.string_bytes(
   308                   chunks.map(_.length).mkString("", ",", "\n")));
   309               chunks.foreach(stream.write(_));
   310               stream.flush
   311             case Close =>
   312               stream.close
   313               finished = true
   314             case bad => System.err.println(name + ": ignoring bad message " + bad)
   315           }
   316           //}}}
   317         }
   318         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   319       }
   320       system_result(name + " terminated")
   321     }
   322   }
   323 
   324 
   325   /* message output */
   326 
   327   private def message_actor(stream: InputStream): (Thread, Actor) =
   328   {
   329     class EOF extends Exception
   330     class Protocol_Error(msg: String) extends Exception(msg)
   331 
   332     val name = "message_output"
   333     Simple_Thread.actor(name) {
   334       val default_buffer = new Array[Byte](65536)
   335       var c = -1
   336 
   337       def read_chunk(): XML.Body =
   338       {
   339         //{{{
   340         // chunk size
   341         var n = 0
   342         c = stream.read
   343         if (c == -1) throw new EOF
   344         while (48 <= c && c <= 57) {
   345           n = 10 * n + (c - 48)
   346           c = stream.read
   347         }
   348         if (c != 10) throw new Protocol_Error("bad message chunk header")
   349 
   350         // chunk content
   351         val buf =
   352           if (n <= default_buffer.size) default_buffer
   353           else new Array[Byte](n)
   354 
   355         var i = 0
   356         var m = 0
   357         do {
   358           m = stream.read(buf, i, n - i)
   359           i += m
   360         } while (m > 0 && n > i)
   361 
   362         if (i != n) throw new Protocol_Error("bad message chunk content")
   363 
   364         YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
   365         //}}}
   366       }
   367 
   368       do {
   369         try {
   370           val header = read_chunk()
   371           val body = read_chunk()
   372           header match {
   373             case List(XML.Elem(Markup(name, props), Nil))
   374                 if name.size == 1 && Kind.message_markup.isDefinedAt(name(0)) =>
   375               put_result(Kind.message_markup(name(0)), props, body)
   376             case _ => throw new Protocol_Error("bad header: " + header.toString)
   377           }
   378         }
   379         catch {
   380           case e: IOException => system_result("Cannot read message:\n" + e.getMessage)
   381           case e: Protocol_Error => system_result("Malformed message:\n" + e.getMessage)
   382           case _: EOF =>
   383         }
   384       } while (c != -1)
   385       stream.close
   386 
   387       system_result(name + " terminated")
   388     }
   389   }
   390 
   391 
   392   /** main methods **/
   393 
   394   def input_raw(text: String): Unit = standard_input ! Input_Text(text)
   395 
   396   def input_bytes(name: String, args: Array[Byte]*): Unit =
   397     command_input ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
   398 
   399   def input(name: String, args: String*): Unit =
   400     input_bytes(name, args.map(Standard_System.string_bytes): _*)
   401 
   402   def close(): Unit = { close(command_input); close(standard_input) }
   403 }