src/Pure/System/isabelle_process.scala
author wenzelm
Sat Jul 09 21:53:27 2011 +0200 (2011-07-09 ago)
changeset 43721 fad8634cee62
parent 43695 5130dfe1b7be
child 43745 562e35bc351e
permissions -rw-r--r--
echo prover input via raw_messages, for improved protocol tracing;
     1 /*  Title:      Pure/System/isabelle_process.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.lang.System
    11 import java.util.concurrent.LinkedBlockingQueue
    12 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    13   InputStream, OutputStream, BufferedOutputStream, IOException}
    14 
    15 import scala.actors.Actor
    16 import Actor._
    17 
    18 
    19 object Isabelle_Process
    20 {
    21   /* results */
    22 
    23   object Kind
    24   {
    25     val message_markup = Map(
    26       ('A' : Int) -> Markup.INIT,
    27       ('B' : Int) -> Markup.STATUS,
    28       ('C' : Int) -> Markup.REPORT,
    29       ('D' : Int) -> Markup.WRITELN,
    30       ('E' : Int) -> Markup.TRACING,
    31       ('F' : Int) -> Markup.WARNING,
    32       ('G' : Int) -> Markup.ERROR)
    33   }
    34 
    35   abstract class Message
    36 
    37   class Input(name: String, args: List[String]) extends Message
    38   {
    39     override def toString: String =
    40       XML.Elem(Markup(Markup.PROVER_COMMAND, List((Markup.NAME, name))),
    41         args.map(s =>
    42           List(XML.Text("\n"), XML.elem(Markup.PROVER_ARG, YXML.parse_body(s)))).flatten).toString
    43   }
    44 
    45   class Result(val message: XML.Elem) extends Message
    46   {
    47     def kind = message.markup.name
    48     def properties = message.markup.properties
    49     def body = message.body
    50 
    51     def is_init = kind == Markup.INIT
    52     def is_exit = kind == Markup.EXIT
    53     def is_stdout = kind == Markup.STDOUT
    54     def is_system = kind == Markup.SYSTEM
    55     def is_status = kind == Markup.STATUS
    56     def is_report = kind == Markup.REPORT
    57     def is_ready = Isar_Document.is_ready(message)
    58     def is_syslog = is_init || is_exit || is_system || is_ready
    59 
    60     override def toString: String =
    61     {
    62       val res =
    63         if (is_status || is_report) message.body.map(_.toString).mkString
    64         else Pretty.string_of(message.body)
    65       if (properties.isEmpty)
    66         kind.toString + " [[" + res + "]]"
    67       else
    68         kind.toString + " " +
    69           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    70     }
    71   }
    72 }
    73 
    74 
    75 class Isabelle_Process(timeout: Time, receiver: Actor, args: String*)
    76 {
    77   import Isabelle_Process._
    78 
    79 
    80   /* demo constructor */
    81 
    82   def this(args: String*) =
    83     this(Time.seconds(10), actor { loop { react { case res => Console.println(res) } } }, args: _*)
    84 
    85 
    86   /* results */
    87 
    88   private def system_result(text: String)
    89   {
    90     receiver ! new Result(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))
    91   }
    92 
    93   private val xml_cache = new XML.Cache(131071)
    94 
    95   private def put_result(kind: String, props: List[(String, String)], body: XML.Body)
    96   {
    97     if (kind == Markup.INIT) rm_fifos()
    98     val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
    99     xml_cache.cache_tree(msg)((message: XML.Tree) =>
   100       receiver ! new Result(message.asInstanceOf[XML.Elem]))
   101   }
   102 
   103   private def put_result(kind: String, text: String)
   104   {
   105     put_result(kind, Nil, List(XML.Text(Symbol.decode(text))))
   106   }
   107 
   108 
   109   /* input actors */
   110 
   111   private case class Input_Text(text: String)
   112   private case class Input_Chunks(chunks: List[Array[Byte]])
   113 
   114   private case object Close
   115   private def close(p: (Thread, Actor))
   116   {
   117     if (p != null && p._1.isAlive) {
   118       p._2 ! Close
   119       p._1.join
   120     }
   121   }
   122 
   123   @volatile private var standard_input: (Thread, Actor) = null
   124   @volatile private var command_input: (Thread, Actor) = null
   125 
   126 
   127   /** process manager **/
   128 
   129   private val in_fifo = Isabelle_System.mk_fifo()
   130   private val out_fifo = Isabelle_System.mk_fifo()
   131   private def rm_fifos() { Isabelle_System.rm_fifo(in_fifo); Isabelle_System.rm_fifo(out_fifo) }
   132 
   133   private val process =
   134     try {
   135       val cmdline =
   136         List(Isabelle_System.getenv_strict("ISABELLE_PROCESS"), "-W",
   137           in_fifo + ":" + out_fifo) ++ args
   138       new Isabelle_System.Managed_Process(true, cmdline: _*)
   139     }
   140     catch { case e: IOException => rm_fifos(); throw(e) }
   141 
   142   val process_result =
   143     Simple_Thread.future("process_result") { process.join }
   144 
   145   private def terminate_process()
   146   {
   147     try { process.terminate }
   148     catch { case e: IOException => system_result("Failed to terminate Isabelle: " + e.getMessage) }
   149   }
   150 
   151   private val process_manager = Simple_Thread.fork("process_manager")
   152   {
   153     val (startup_failed, startup_output) =
   154     {
   155       val expired = System.currentTimeMillis() + timeout.ms
   156       val result = new StringBuilder(100)
   157 
   158       var finished: Option[Boolean] = None
   159       while (finished.isEmpty && System.currentTimeMillis() <= expired) {
   160         while (finished.isEmpty && process.stdout.ready) {
   161           val c = process.stdout.read
   162           if (c == 2) finished = Some(true)
   163           else result += c.toChar
   164         }
   165         if (process_result.is_finished) finished = Some(false)
   166         else Thread.sleep(10)
   167       }
   168       (finished.isEmpty || !finished.get, result.toString.trim)
   169     }
   170     system_result(startup_output)
   171 
   172     if (startup_failed) {
   173       put_result(Markup.EXIT, "Return code: 127")
   174       process.stdin.close
   175       Thread.sleep(300)
   176       terminate_process()
   177       process_result.join
   178     }
   179     else {
   180       // rendezvous
   181       val command_stream = Isabelle_System.fifo_output_stream(in_fifo)
   182       val message_stream = Isabelle_System.fifo_input_stream(out_fifo)
   183 
   184       standard_input = stdin_actor()
   185       val stdout = stdout_actor()
   186       command_input = input_actor(command_stream)
   187       val message = message_actor(message_stream)
   188 
   189       val rc = process_result.join
   190       system_result("process terminated")
   191       for ((thread, _) <- List(standard_input, stdout, command_input, message)) thread.join
   192       system_result("process_manager terminated")
   193       put_result(Markup.EXIT, "Return code: " + rc.toString)
   194     }
   195     rm_fifos()
   196   }
   197 
   198 
   199   /* management methods */
   200 
   201   def join() { process_manager.join() }
   202 
   203   def interrupt()
   204   {
   205     try { process.interrupt }
   206     catch { case e: IOException => system_result("Failed to interrupt Isabelle: " + e.getMessage) }
   207   }
   208 
   209   def terminate()
   210   {
   211     close()
   212     system_result("Terminating Isabelle process")
   213     terminate_process()
   214   }
   215 
   216 
   217 
   218   /** stream actors **/
   219 
   220   /* raw stdin */
   221 
   222   private def stdin_actor(): (Thread, Actor) =
   223   {
   224     val name = "standard_input"
   225     Simple_Thread.actor(name) {
   226       var finished = false
   227       while (!finished) {
   228         try {
   229           //{{{
   230           receive {
   231             case Input_Text(text) =>
   232               process.stdin.write(text)
   233               process.stdin.flush
   234             case Close =>
   235               process.stdin.close
   236               finished = true
   237             case bad => System.err.println(name + ": ignoring bad message " + bad)
   238           }
   239           //}}}
   240         }
   241         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   242       }
   243       system_result(name + " terminated")
   244     }
   245   }
   246 
   247 
   248   /* raw stdout */
   249 
   250   private def stdout_actor(): (Thread, Actor) =
   251   {
   252     val name = "standard_output"
   253     Simple_Thread.actor(name) {
   254       var result = new StringBuilder(100)
   255 
   256       var finished = false
   257       while (!finished) {
   258         try {
   259           //{{{
   260           var c = -1
   261           var done = false
   262           while (!done && (result.length == 0 || process.stdout.ready)) {
   263             c = process.stdout.read
   264             if (c >= 0) result.append(c.asInstanceOf[Char])
   265             else done = true
   266           }
   267           if (result.length > 0) {
   268             put_result(Markup.STDOUT, result.toString)
   269             result.length = 0
   270           }
   271           else {
   272             process.stdout.close
   273             finished = true
   274           }
   275           //}}}
   276         }
   277         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   278       }
   279       system_result(name + " terminated")
   280     }
   281   }
   282 
   283 
   284   /* command input */
   285 
   286   private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
   287   {
   288     val name = "command_input"
   289     Simple_Thread.actor(name) {
   290       val stream = new BufferedOutputStream(raw_stream)
   291       var finished = false
   292       while (!finished) {
   293         try {
   294           //{{{
   295           receive {
   296             case Input_Chunks(chunks) =>
   297               stream.write(Standard_System.string_bytes(
   298                   chunks.map(_.length).mkString("", ",", "\n")));
   299               chunks.foreach(stream.write(_));
   300               stream.flush
   301             case Close =>
   302               stream.close
   303               finished = true
   304             case bad => System.err.println(name + ": ignoring bad message " + bad)
   305           }
   306           //}}}
   307         }
   308         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   309       }
   310       system_result(name + " terminated")
   311     }
   312   }
   313 
   314 
   315   /* message output */
   316 
   317   private def message_actor(stream: InputStream): (Thread, Actor) =
   318   {
   319     class EOF extends Exception
   320     class Protocol_Error(msg: String) extends Exception(msg)
   321 
   322     val name = "message_output"
   323     Simple_Thread.actor(name) {
   324       val default_buffer = new Array[Byte](65536)
   325       var c = -1
   326 
   327       def read_chunk(): XML.Body =
   328       {
   329         //{{{
   330         // chunk size
   331         var n = 0
   332         c = stream.read
   333         if (c == -1) throw new EOF
   334         while (48 <= c && c <= 57) {
   335           n = 10 * n + (c - 48)
   336           c = stream.read
   337         }
   338         if (c != 10) throw new Protocol_Error("bad message chunk header")
   339 
   340         // chunk content
   341         val buf =
   342           if (n <= default_buffer.size) default_buffer
   343           else new Array[Byte](n)
   344 
   345         var i = 0
   346         var m = 0
   347         do {
   348           m = stream.read(buf, i, n - i)
   349           if (m != -1) i += m
   350         } while (m != -1 && n > i)
   351 
   352         if (i != n) throw new Protocol_Error("bad message chunk content")
   353 
   354         YXML.parse_body_failsafe(YXML.decode_chars(Symbol.decode, buf, 0, n))
   355         //}}}
   356       }
   357 
   358       do {
   359         try {
   360           val header = read_chunk()
   361           val body = read_chunk()
   362           header match {
   363             case List(XML.Elem(Markup(name, props), Nil))
   364                 if name.size == 1 && Kind.message_markup.isDefinedAt(name(0)) =>
   365               put_result(Kind.message_markup(name(0)), props, body)
   366             case _ => throw new Protocol_Error("bad header: " + header.toString)
   367           }
   368         }
   369         catch {
   370           case e: IOException => system_result("Cannot read message:\n" + e.getMessage)
   371           case e: Protocol_Error => system_result("Malformed message:\n" + e.getMessage)
   372           case _: EOF =>
   373         }
   374       } while (c != -1)
   375       stream.close
   376 
   377       system_result(name + " terminated")
   378     }
   379   }
   380 
   381 
   382   /** main methods **/
   383 
   384   def input_raw(text: String): Unit = standard_input._2 ! Input_Text(text)
   385 
   386   def input_bytes(name: String, args: Array[Byte]*): Unit =
   387     command_input._2 ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
   388 
   389   def input(name: String, args: String*): Unit =
   390   {
   391     receiver ! new Input(name, args.toList)
   392     input_bytes(name, args.map(Standard_System.string_bytes): _*)
   393   }
   394 
   395   def close(): Unit = { close(command_input); close(standard_input) }
   396 }