src/Pure/System/isabelle_process.scala
author wenzelm
Sun Sep 04 14:29:15 2011 +0200 (2011-09-04 ago)
changeset 44697 b99dfee76538
parent 43780 2cb2310d68b6
child 44705 089fcaf94c00
permissions -rw-r--r--
pass raw messages through xml_cache actor, which is important to retain ordering of results (e.g. read_command reports before assign, cf. 383c9d758a56);
     1 /*  Title:      Pure/System/isabelle_process.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.lang.System
    11 import java.util.concurrent.LinkedBlockingQueue
    12 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    13   InputStream, OutputStream, BufferedOutputStream, IOException}
    14 
    15 import scala.actors.Actor
    16 import Actor._
    17 
    18 
    19 object Isabelle_Process
    20 {
    21   /* results */
    22 
    23   object Kind
    24   {
    25     val message_markup = Map(
    26       ('A' : Int) -> Markup.INIT,
    27       ('B' : Int) -> Markup.STATUS,
    28       ('C' : Int) -> Markup.REPORT,
    29       ('D' : Int) -> Markup.WRITELN,
    30       ('E' : Int) -> Markup.TRACING,
    31       ('F' : Int) -> Markup.WARNING,
    32       ('G' : Int) -> Markup.ERROR,
    33       ('H' : Int) -> Markup.RAW)
    34   }
    35 
    36   abstract class Message
    37 
    38   class Input(name: String, args: List[String]) extends Message
    39   {
    40     override def toString: String =
    41       XML.Elem(Markup(Markup.PROVER_COMMAND, List((Markup.NAME, name))),
    42         args.map(s =>
    43           List(XML.Text("\n"), XML.elem(Markup.PROVER_ARG, YXML.parse_body(s)))).flatten).toString
    44   }
    45 
    46   class Result(val message: XML.Elem) extends Message
    47   {
    48     def kind: String = message.markup.name
    49     def properties: Properties.T = message.markup.properties
    50     def body: XML.Body = message.body
    51 
    52     def is_init = kind == Markup.INIT
    53     def is_exit = kind == Markup.EXIT
    54     def is_stdout = kind == Markup.STDOUT
    55     def is_system = kind == Markup.SYSTEM
    56     def is_status = kind == Markup.STATUS
    57     def is_report = kind == Markup.REPORT
    58     def is_raw = kind == Markup.RAW
    59     def is_ready = Isar_Document.is_ready(message)
    60     def is_syslog = is_init || is_exit || is_system || is_ready
    61 
    62     override def toString: String =
    63     {
    64       val res =
    65         if (is_status || is_report) message.body.map(_.toString).mkString
    66         else if (is_raw) "..."
    67         else Pretty.string_of(message.body)
    68       if (properties.isEmpty)
    69         kind.toString + " [[" + res + "]]"
    70       else
    71         kind.toString + " " +
    72           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    73     }
    74   }
    75 }
    76 
    77 
    78 class Isabelle_Process(timeout: Time, receiver: Actor, args: String*)
    79 {
    80   import Isabelle_Process._
    81 
    82 
    83   /* demo constructor */
    84 
    85   def this(args: String*) =
    86     this(Time.seconds(10), actor { loop { react { case res => Console.println(res) } } }, args: _*)
    87 
    88 
    89   /* results */
    90 
    91   private def system_result(text: String)
    92   {
    93     receiver ! new Result(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))
    94   }
    95 
    96   private val xml_cache = new XML.Cache()
    97 
    98   private def put_result(kind: String, props: Properties.T, body: XML.Body)
    99   {
   100     if (kind == Markup.INIT) rm_fifos()
   101     if (kind == Markup.RAW)
   102       xml_cache.cache_ignore(
   103         new Result(XML.Elem(Markup(kind, props), body)))((result: Result) => receiver ! result)
   104     else {
   105       val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
   106       xml_cache.cache_tree(msg)((message: XML.Tree) =>
   107         receiver ! new Result(message.asInstanceOf[XML.Elem]))
   108     }
   109   }
   110 
   111   private def put_result(kind: String, text: String)
   112   {
   113     put_result(kind, Nil, List(XML.Text(Symbol.decode(text))))
   114   }
   115 
   116 
   117   /* input actors */
   118 
   119   private case class Input_Text(text: String)
   120   private case class Input_Chunks(chunks: List[Array[Byte]])
   121 
   122   private case object Close
   123   private def close(p: (Thread, Actor))
   124   {
   125     if (p != null && p._1.isAlive) {
   126       p._2 ! Close
   127       p._1.join
   128     }
   129   }
   130 
   131   @volatile private var standard_input: (Thread, Actor) = null
   132   @volatile private var command_input: (Thread, Actor) = null
   133 
   134 
   135   /** process manager **/
   136 
   137   private val in_fifo = Isabelle_System.mk_fifo()
   138   private val out_fifo = Isabelle_System.mk_fifo()
   139   private def rm_fifos() { Isabelle_System.rm_fifo(in_fifo); Isabelle_System.rm_fifo(out_fifo) }
   140 
   141   private val process =
   142     try {
   143       val cmdline =
   144         List(Isabelle_System.getenv_strict("ISABELLE_PROCESS"), "-W",
   145           in_fifo + ":" + out_fifo) ++ args
   146       new Isabelle_System.Managed_Process(true, cmdline: _*)
   147     }
   148     catch { case e: IOException => rm_fifos(); throw(e) }
   149 
   150   val process_result =
   151     Simple_Thread.future("process_result") { process.join }
   152 
   153   private def terminate_process()
   154   {
   155     try { process.terminate }
   156     catch { case e: IOException => system_result("Failed to terminate Isabelle: " + e.getMessage) }
   157   }
   158 
   159   private val process_manager = Simple_Thread.fork("process_manager")
   160   {
   161     val (startup_failed, startup_output) =
   162     {
   163       val expired = System.currentTimeMillis() + timeout.ms
   164       val result = new StringBuilder(100)
   165 
   166       var finished: Option[Boolean] = None
   167       while (finished.isEmpty && System.currentTimeMillis() <= expired) {
   168         while (finished.isEmpty && process.stdout.ready) {
   169           val c = process.stdout.read
   170           if (c == 2) finished = Some(true)
   171           else result += c.toChar
   172         }
   173         if (process_result.is_finished) finished = Some(false)
   174         else Thread.sleep(10)
   175       }
   176       (finished.isEmpty || !finished.get, result.toString.trim)
   177     }
   178     system_result(startup_output)
   179 
   180     if (startup_failed) {
   181       put_result(Markup.EXIT, "Return code: 127")
   182       process.stdin.close
   183       Thread.sleep(300)
   184       terminate_process()
   185       process_result.join
   186     }
   187     else {
   188       // rendezvous
   189       val command_stream = Isabelle_System.fifo_output_stream(in_fifo)
   190       val message_stream = Isabelle_System.fifo_input_stream(out_fifo)
   191 
   192       standard_input = stdin_actor()
   193       val stdout = stdout_actor()
   194       command_input = input_actor(command_stream)
   195       val message = message_actor(message_stream)
   196 
   197       val rc = process_result.join
   198       system_result("process terminated")
   199       for ((thread, _) <- List(standard_input, stdout, command_input, message)) thread.join
   200       system_result("process_manager terminated")
   201       put_result(Markup.EXIT, "Return code: " + rc.toString)
   202     }
   203     rm_fifos()
   204   }
   205 
   206 
   207   /* management methods */
   208 
   209   def join() { process_manager.join() }
   210 
   211   def interrupt()
   212   {
   213     try { process.interrupt }
   214     catch { case e: IOException => system_result("Failed to interrupt Isabelle: " + e.getMessage) }
   215   }
   216 
   217   def terminate()
   218   {
   219     close()
   220     system_result("Terminating Isabelle process")
   221     terminate_process()
   222   }
   223 
   224 
   225 
   226   /** stream actors **/
   227 
   228   /* raw stdin */
   229 
   230   private def stdin_actor(): (Thread, Actor) =
   231   {
   232     val name = "standard_input"
   233     Simple_Thread.actor(name) {
   234       var finished = false
   235       while (!finished) {
   236         try {
   237           //{{{
   238           receive {
   239             case Input_Text(text) =>
   240               process.stdin.write(text)
   241               process.stdin.flush
   242             case Close =>
   243               process.stdin.close
   244               finished = true
   245             case bad => System.err.println(name + ": ignoring bad message " + bad)
   246           }
   247           //}}}
   248         }
   249         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   250       }
   251       system_result(name + " terminated")
   252     }
   253   }
   254 
   255 
   256   /* raw stdout */
   257 
   258   private def stdout_actor(): (Thread, Actor) =
   259   {
   260     val name = "standard_output"
   261     Simple_Thread.actor(name) {
   262       var result = new StringBuilder(100)
   263 
   264       var finished = false
   265       while (!finished) {
   266         try {
   267           //{{{
   268           var c = -1
   269           var done = false
   270           while (!done && (result.length == 0 || process.stdout.ready)) {
   271             c = process.stdout.read
   272             if (c >= 0) result.append(c.asInstanceOf[Char])
   273             else done = true
   274           }
   275           if (result.length > 0) {
   276             put_result(Markup.STDOUT, result.toString)
   277             result.length = 0
   278           }
   279           else {
   280             process.stdout.close
   281             finished = true
   282           }
   283           //}}}
   284         }
   285         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   286       }
   287       system_result(name + " terminated")
   288     }
   289   }
   290 
   291 
   292   /* command input */
   293 
   294   private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
   295   {
   296     val name = "command_input"
   297     Simple_Thread.actor(name) {
   298       val stream = new BufferedOutputStream(raw_stream)
   299       var finished = false
   300       while (!finished) {
   301         try {
   302           //{{{
   303           receive {
   304             case Input_Chunks(chunks) =>
   305               stream.write(Standard_System.string_bytes(
   306                   chunks.map(_.length).mkString("", ",", "\n")));
   307               chunks.foreach(stream.write(_));
   308               stream.flush
   309             case Close =>
   310               stream.close
   311               finished = true
   312             case bad => System.err.println(name + ": ignoring bad message " + bad)
   313           }
   314           //}}}
   315         }
   316         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   317       }
   318       system_result(name + " terminated")
   319     }
   320   }
   321 
   322 
   323   /* message output */
   324 
   325   private def message_actor(stream: InputStream): (Thread, Actor) =
   326   {
   327     class EOF extends Exception
   328     class Protocol_Error(msg: String) extends Exception(msg)
   329 
   330     val name = "message_output"
   331     Simple_Thread.actor(name) {
   332       val default_buffer = new Array[Byte](65536)
   333       var c = -1
   334 
   335       def read_chunk(decode: Boolean): XML.Body =
   336       {
   337         //{{{
   338         // chunk size
   339         var n = 0
   340         c = stream.read
   341         if (c == -1) throw new EOF
   342         while (48 <= c && c <= 57) {
   343           n = 10 * n + (c - 48)
   344           c = stream.read
   345         }
   346         if (c != 10) throw new Protocol_Error("bad message chunk header")
   347 
   348         // chunk content
   349         val buf =
   350           if (n <= default_buffer.size) default_buffer
   351           else new Array[Byte](n)
   352 
   353         var i = 0
   354         var m = 0
   355         do {
   356           m = stream.read(buf, i, n - i)
   357           if (m != -1) i += m
   358         } while (m != -1 && n > i)
   359 
   360         if (i != n) throw new Protocol_Error("bad message chunk content")
   361 
   362         if (decode)
   363           YXML.parse_body_failsafe(Standard_System.decode_chars(Symbol.decode, buf, 0, n))
   364         else List(XML.Text(Standard_System.decode_chars(s => s, buf, 0, n).toString))
   365         //}}}
   366       }
   367 
   368       do {
   369         try {
   370           val header = read_chunk(true)
   371           header match {
   372             case List(XML.Elem(Markup(name, props), Nil))
   373                 if name.size == 1 && Kind.message_markup.isDefinedAt(name(0)) =>
   374               val kind = Kind.message_markup(name(0))
   375               val body = read_chunk(kind != Markup.RAW)
   376               put_result(kind, props, body)
   377             case _ =>
   378               read_chunk(false)
   379               throw new Protocol_Error("bad header: " + header.toString)
   380           }
   381         }
   382         catch {
   383           case e: IOException => system_result("Cannot read message:\n" + e.getMessage)
   384           case e: Protocol_Error => system_result("Malformed message:\n" + e.getMessage)
   385           case _: EOF =>
   386         }
   387       } while (c != -1)
   388       stream.close
   389 
   390       system_result(name + " terminated")
   391     }
   392   }
   393 
   394 
   395   /** main methods **/
   396 
   397   def input_raw(text: String): Unit = standard_input._2 ! Input_Text(text)
   398 
   399   def input_bytes(name: String, args: Array[Byte]*): Unit =
   400     command_input._2 ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
   401 
   402   def input(name: String, args: String*): Unit =
   403   {
   404     receiver ! new Input(name, args.toList)
   405     input_bytes(name, args.map(Standard_System.string_bytes): _*)
   406   }
   407 
   408   def close(): Unit = { close(command_input); close(standard_input) }
   409 }