src/Pure/System/isabelle_process.scala
author wenzelm
Thu Dec 01 14:29:14 2011 +0100 (2011-12-01 ago)
changeset 45709 87017fcbad83
parent 45672 a497c5d4a523
child 46121 30a69cd8a9a0
permissions -rw-r--r--
clarified modules (again) -- NB: both Document and Protocol are specific to this particular prover;
     1 /*  Title:      Pure/System/isabelle_process.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.lang.System
    11 import java.util.concurrent.LinkedBlockingQueue
    12 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    13   InputStream, OutputStream, BufferedOutputStream, IOException}
    14 
    15 import scala.actors.Actor
    16 import Actor._
    17 
    18 
    19 object Isabelle_Process
    20 {
    21   /* results */
    22 
    23   object Kind
    24   {
    25     val message_markup = Map(
    26       ('A' : Int) -> Isabelle_Markup.INIT,
    27       ('B' : Int) -> Isabelle_Markup.STATUS,
    28       ('C' : Int) -> Isabelle_Markup.REPORT,
    29       ('D' : Int) -> Isabelle_Markup.WRITELN,
    30       ('E' : Int) -> Isabelle_Markup.TRACING,
    31       ('F' : Int) -> Isabelle_Markup.WARNING,
    32       ('G' : Int) -> Isabelle_Markup.ERROR,
    33       ('H' : Int) -> Isabelle_Markup.RAW)
    34   }
    35 
    36   sealed abstract class Message
    37 
    38   class Input(name: String, args: List[String]) extends Message
    39   {
    40     override def toString: String =
    41       XML.Elem(Markup(Isabelle_Markup.PROVER_COMMAND, List((Markup.NAME, name))),
    42         args.map(s =>
    43           List(XML.Text("\n"),
    44             XML.elem(Isabelle_Markup.PROVER_ARG, YXML.parse_body(s)))).flatten).toString
    45   }
    46 
    47   class Result(val message: XML.Elem) extends Message
    48   {
    49     def kind: String = message.markup.name
    50     def properties: Properties.T = message.markup.properties
    51     def body: XML.Body = message.body
    52 
    53     def is_init = kind == Isabelle_Markup.INIT
    54     def is_exit = kind == Isabelle_Markup.EXIT
    55     def is_stdout = kind == Isabelle_Markup.STDOUT
    56     def is_stderr = kind == Isabelle_Markup.STDERR
    57     def is_system = kind == Isabelle_Markup.SYSTEM
    58     def is_status = kind == Isabelle_Markup.STATUS
    59     def is_report = kind == Isabelle_Markup.REPORT
    60     def is_raw = kind == Isabelle_Markup.RAW
    61     def is_ready = Protocol.is_ready(message)
    62     def is_syslog = is_init || is_exit || is_system || is_ready || is_stderr
    63 
    64     override def toString: String =
    65     {
    66       val res =
    67         if (is_status || is_report) message.body.map(_.toString).mkString
    68         else if (is_raw) "..."
    69         else Pretty.string_of(message.body)
    70       if (properties.isEmpty)
    71         kind.toString + " [[" + res + "]]"
    72       else
    73         kind.toString + " " +
    74           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    75     }
    76   }
    77 }
    78 
    79 
    80 class Isabelle_Process(
    81     timeout: Time = Time.seconds(25),
    82     receiver: Isabelle_Process.Message => Unit = Console.println(_),
    83     args: List[String] = Nil)
    84 {
    85   import Isabelle_Process._
    86 
    87 
    88   /* results */
    89 
    90   private def system_result(text: String)
    91   {
    92     receiver(new Result(XML.Elem(Markup(Isabelle_Markup.SYSTEM, Nil), List(XML.Text(text)))))
    93   }
    94 
    95   private val xml_cache = new XML.Cache()
    96 
    97   private def put_result(kind: String, props: Properties.T, body: XML.Body)
    98   {
    99     if (kind == Isabelle_Markup.INIT) system_channel.accepted()
   100     if (kind == Isabelle_Markup.RAW)
   101       receiver(new Result(XML.Elem(Markup(kind, props), body)))
   102     else {
   103       val msg = XML.Elem(Markup(kind, props), Protocol.clean_message(body))
   104       receiver(new Result(xml_cache.cache_tree(msg).asInstanceOf[XML.Elem]))
   105     }
   106   }
   107 
   108   private def put_result(kind: String, text: String)
   109   {
   110     put_result(kind, Nil, List(XML.Text(Symbol.decode(text))))
   111   }
   112 
   113 
   114   /* input actors */
   115 
   116   private case class Input_Text(text: String)
   117   private case class Input_Chunks(chunks: List[Array[Byte]])
   118 
   119   private case object Close
   120   private def close(p: (Thread, Actor))
   121   {
   122     if (p != null && p._1.isAlive) {
   123       p._2 ! Close
   124       p._1.join
   125     }
   126   }
   127 
   128   @volatile private var standard_input: (Thread, Actor) = null
   129   @volatile private var command_input: (Thread, Actor) = null
   130 
   131 
   132   /** process manager **/
   133 
   134   private val system_channel = System_Channel()
   135 
   136   private val process =
   137     try {
   138       val cmdline =
   139         Isabelle_System.getenv_strict("ISABELLE_PROCESS") ::
   140           (system_channel.isabelle_args ::: args)
   141       new Isabelle_System.Managed_Process(false, cmdline: _*)
   142     }
   143     catch { case e: IOException => system_channel.accepted(); throw(e) }
   144 
   145   val process_result =
   146     Simple_Thread.future("process_result") { process.join }
   147 
   148   private def terminate_process()
   149   {
   150     try { process.terminate }
   151     catch { case e: IOException => system_result("Failed to terminate Isabelle: " + e.getMessage) }
   152   }
   153 
   154   private val process_manager = Simple_Thread.fork("process_manager")
   155   {
   156     val (startup_failed, startup_output) =
   157     {
   158       val expired = System.currentTimeMillis() + timeout.ms
   159       val result = new StringBuilder(100)
   160 
   161       var finished: Option[Boolean] = None
   162       while (finished.isEmpty && System.currentTimeMillis() <= expired) {
   163         while (finished.isEmpty && process.stdout.ready) {
   164           val c = process.stdout.read
   165           if (c == 2) finished = Some(true)
   166           else result += c.toChar
   167         }
   168         if (process_result.is_finished) finished = Some(false)
   169         else Thread.sleep(10)
   170       }
   171       (finished.isEmpty || !finished.get, result.toString.trim)
   172     }
   173     system_result(startup_output)
   174 
   175     if (startup_failed) {
   176       put_result(Isabelle_Markup.EXIT, "Return code: 127")
   177       process.stdin.close
   178       Thread.sleep(300)
   179       terminate_process()
   180       process_result.join
   181     }
   182     else {
   183       val (command_stream, message_stream) = system_channel.rendezvous()
   184 
   185       standard_input = stdin_actor()
   186       val stdout = raw_output_actor(false)
   187       val stderr = raw_output_actor(true)
   188       command_input = input_actor(command_stream)
   189       val message = message_actor(message_stream)
   190 
   191       val rc = process_result.join
   192       system_result("process terminated")
   193       for ((thread, _) <- List(standard_input, stdout, stderr, command_input, message))
   194         thread.join
   195       system_result("process_manager terminated")
   196       put_result(Isabelle_Markup.EXIT, "Return code: " + rc.toString)
   197     }
   198     system_channel.accepted()
   199   }
   200 
   201 
   202   /* management methods */
   203 
   204   def join() { process_manager.join() }
   205 
   206   def terminate()
   207   {
   208     close()
   209     system_result("Terminating Isabelle process")
   210     terminate_process()
   211   }
   212 
   213 
   214 
   215   /** stream actors **/
   216 
   217   /* raw stdin */
   218 
   219   private def stdin_actor(): (Thread, Actor) =
   220   {
   221     val name = "standard_input"
   222     Simple_Thread.actor(name) {
   223       var finished = false
   224       while (!finished) {
   225         try {
   226           //{{{
   227           receive {
   228             case Input_Text(text) =>
   229               process.stdin.write(text)
   230               process.stdin.flush
   231             case Close =>
   232               process.stdin.close
   233               finished = true
   234             case bad => System.err.println(name + ": ignoring bad message " + bad)
   235           }
   236           //}}}
   237         }
   238         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   239       }
   240       system_result(name + " terminated")
   241     }
   242   }
   243 
   244 
   245   /* raw output */
   246 
   247   private def raw_output_actor(err: Boolean): (Thread, Actor) =
   248   {
   249     val (name, reader, markup) =
   250       if (err) ("standard_error", process.stderr, Isabelle_Markup.STDERR)
   251       else ("standard_output", process.stdout, Isabelle_Markup.STDOUT)
   252 
   253     Simple_Thread.actor(name) {
   254       var result = new StringBuilder(100)
   255 
   256       var finished = false
   257       while (!finished) {
   258         try {
   259           //{{{
   260           var c = -1
   261           var done = false
   262           while (!done && (result.length == 0 || reader.ready)) {
   263             c = reader.read
   264             if (c >= 0) result.append(c.asInstanceOf[Char])
   265             else done = true
   266           }
   267           if (result.length > 0) {
   268             put_result(markup, result.toString)
   269             result.length = 0
   270           }
   271           else {
   272             reader.close
   273             finished = true
   274           }
   275           //}}}
   276         }
   277         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   278       }
   279       system_result(name + " terminated")
   280     }
   281   }
   282 
   283 
   284   /* command input */
   285 
   286   private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
   287   {
   288     val name = "command_input"
   289     Simple_Thread.actor(name) {
   290       val stream = new BufferedOutputStream(raw_stream)
   291       var finished = false
   292       while (!finished) {
   293         try {
   294           //{{{
   295           receive {
   296             case Input_Chunks(chunks) =>
   297               stream.write(Standard_System.string_bytes(
   298                   chunks.map(_.length).mkString("", ",", "\n")));
   299               chunks.foreach(stream.write(_));
   300               stream.flush
   301             case Close =>
   302               stream.close
   303               finished = true
   304             case bad => System.err.println(name + ": ignoring bad message " + bad)
   305           }
   306           //}}}
   307         }
   308         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   309       }
   310       system_result(name + " terminated")
   311     }
   312   }
   313 
   314 
   315   /* message output */
   316 
   317   private def message_actor(stream: InputStream): (Thread, Actor) =
   318   {
   319     class EOF extends Exception
   320     class Protocol_Error(msg: String) extends Exception(msg)
   321 
   322     val name = "message_output"
   323     Simple_Thread.actor(name) {
   324       val default_buffer = new Array[Byte](65536)
   325       var c = -1
   326 
   327       def read_chunk(decode: Boolean): XML.Body =
   328       {
   329         //{{{
   330         // chunk size
   331         var n = 0
   332         c = stream.read
   333         if (c == -1) throw new EOF
   334         while (48 <= c && c <= 57) {
   335           n = 10 * n + (c - 48)
   336           c = stream.read
   337         }
   338         if (c != 10) throw new Protocol_Error("bad message chunk header")
   339 
   340         // chunk content
   341         val buf =
   342           if (n <= default_buffer.size) default_buffer
   343           else new Array[Byte](n)
   344 
   345         var i = 0
   346         var m = 0
   347         do {
   348           m = stream.read(buf, i, n - i)
   349           if (m != -1) i += m
   350         } while (m != -1 && n > i)
   351 
   352         if (i != n) throw new Protocol_Error("bad message chunk content")
   353 
   354         if (decode)
   355           YXML.parse_body_failsafe(Standard_System.decode_chars(Symbol.decode, buf, 0, n))
   356         else List(XML.Text(Standard_System.decode_chars(s => s, buf, 0, n).toString))
   357         //}}}
   358       }
   359 
   360       do {
   361         try {
   362           val header = read_chunk(true)
   363           header match {
   364             case List(XML.Elem(Markup(name, props), Nil))
   365                 if name.size == 1 && Kind.message_markup.isDefinedAt(name(0)) =>
   366               val kind = Kind.message_markup(name(0))
   367               val body = read_chunk(kind != Isabelle_Markup.RAW)
   368               put_result(kind, props, body)
   369             case _ =>
   370               read_chunk(false)
   371               throw new Protocol_Error("bad header: " + header.toString)
   372           }
   373         }
   374         catch {
   375           case e: IOException => system_result("Cannot read message:\n" + e.getMessage)
   376           case e: Protocol_Error => system_result("Malformed message:\n" + e.getMessage)
   377           case _: EOF =>
   378         }
   379       } while (c != -1)
   380       stream.close
   381 
   382       system_result(name + " terminated")
   383     }
   384   }
   385 
   386 
   387   /** main methods **/
   388 
   389   def input_raw(text: String): Unit = standard_input._2 ! Input_Text(text)
   390 
   391   def input_bytes(name: String, args: Array[Byte]*): Unit =
   392     command_input._2 ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
   393 
   394   def input(name: String, args: String*): Unit =
   395   {
   396     receiver(new Input(name, args.toList))
   397     input_bytes(name, args.map(Standard_System.string_bytes): _*)
   398   }
   399 
   400   def close(): Unit = { close(command_input); close(standard_input) }
   401 }