src/Pure/System/isabelle_process.scala
author wenzelm
Fri Nov 25 18:37:14 2011 +0100 (2011-11-25 ago)
changeset 45633 2cb7e34f6096
parent 45158 db4bf4fb5492
child 45666 d83797ef0d2d
permissions -rw-r--r--
retain stderr and include it in syslog, which is buffered and thus increases the chance that users see remains from crashes etc.;
     1 /*  Title:      Pure/System/isabelle_process.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.lang.System
    11 import java.util.concurrent.LinkedBlockingQueue
    12 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    13   InputStream, OutputStream, BufferedOutputStream, IOException}
    14 
    15 import scala.actors.Actor
    16 import Actor._
    17 
    18 
    19 object Isabelle_Process
    20 {
    21   /* results */
    22 
    23   object Kind
    24   {
    25     val message_markup = Map(
    26       ('A' : Int) -> Markup.INIT,
    27       ('B' : Int) -> Markup.STATUS,
    28       ('C' : Int) -> Markup.REPORT,
    29       ('D' : Int) -> Markup.WRITELN,
    30       ('E' : Int) -> Markup.TRACING,
    31       ('F' : Int) -> Markup.WARNING,
    32       ('G' : Int) -> Markup.ERROR,
    33       ('H' : Int) -> Markup.RAW)
    34   }
    35 
    36   sealed abstract class Message
    37 
    38   class Input(name: String, args: List[String]) extends Message
    39   {
    40     override def toString: String =
    41       XML.Elem(Markup(Markup.PROVER_COMMAND, List((Markup.NAME, name))),
    42         args.map(s =>
    43           List(XML.Text("\n"), XML.elem(Markup.PROVER_ARG, YXML.parse_body(s)))).flatten).toString
    44   }
    45 
    46   class Result(val message: XML.Elem) extends Message
    47   {
    48     def kind: String = message.markup.name
    49     def properties: Properties.T = message.markup.properties
    50     def body: XML.Body = message.body
    51 
    52     def is_init = kind == Markup.INIT
    53     def is_exit = kind == Markup.EXIT
    54     def is_stdout = kind == Markup.STDOUT
    55     def is_stderr = kind == Markup.STDERR
    56     def is_system = kind == Markup.SYSTEM
    57     def is_status = kind == Markup.STATUS
    58     def is_report = kind == Markup.REPORT
    59     def is_raw = kind == Markup.RAW
    60     def is_ready = Isar_Document.is_ready(message)
    61     def is_syslog = is_init || is_exit || is_system || is_ready || is_stderr
    62 
    63     override def toString: String =
    64     {
    65       val res =
    66         if (is_status || is_report) message.body.map(_.toString).mkString
    67         else if (is_raw) "..."
    68         else Pretty.string_of(message.body)
    69       if (properties.isEmpty)
    70         kind.toString + " [[" + res + "]]"
    71       else
    72         kind.toString + " " +
    73           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    74     }
    75   }
    76 }
    77 
    78 
    79 class Isabelle_Process(
    80     timeout: Time = Time.seconds(25),
    81     receiver: Isabelle_Process.Message => Unit = Console.println(_),
    82     args: List[String] = Nil)
    83 {
    84   import Isabelle_Process._
    85 
    86 
    87   /* results */
    88 
    89   private def system_result(text: String)
    90   {
    91     receiver(new Result(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text)))))
    92   }
    93 
    94   private val xml_cache = new XML.Cache()
    95 
    96   private def put_result(kind: String, props: Properties.T, body: XML.Body)
    97   {
    98     if (kind == Markup.INIT) system_channel.accepted()
    99     if (kind == Markup.RAW)
   100       receiver(new Result(XML.Elem(Markup(kind, props), body)))
   101     else {
   102       val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
   103       receiver(new Result(xml_cache.cache_tree(msg).asInstanceOf[XML.Elem]))
   104     }
   105   }
   106 
   107   private def put_result(kind: String, text: String)
   108   {
   109     put_result(kind, Nil, List(XML.Text(Symbol.decode(text))))
   110   }
   111 
   112 
   113   /* input actors */
   114 
   115   private case class Input_Text(text: String)
   116   private case class Input_Chunks(chunks: List[Array[Byte]])
   117 
   118   private case object Close
   119   private def close(p: (Thread, Actor))
   120   {
   121     if (p != null && p._1.isAlive) {
   122       p._2 ! Close
   123       p._1.join
   124     }
   125   }
   126 
   127   @volatile private var standard_input: (Thread, Actor) = null
   128   @volatile private var command_input: (Thread, Actor) = null
   129 
   130 
   131   /** process manager **/
   132 
   133   private val system_channel = System_Channel()
   134 
   135   private val process =
   136     try {
   137       val cmdline =
   138         Isabelle_System.getenv_strict("ISABELLE_PROCESS") ::
   139           (system_channel.isabelle_args ::: args)
   140       new Isabelle_System.Managed_Process(false, cmdline: _*)
   141     }
   142     catch { case e: IOException => system_channel.accepted(); throw(e) }
   143 
   144   val process_result =
   145     Simple_Thread.future("process_result") { process.join }
   146 
   147   private def terminate_process()
   148   {
   149     try { process.terminate }
   150     catch { case e: IOException => system_result("Failed to terminate Isabelle: " + e.getMessage) }
   151   }
   152 
   153   private val process_manager = Simple_Thread.fork("process_manager")
   154   {
   155     val (startup_failed, startup_output) =
   156     {
   157       val expired = System.currentTimeMillis() + timeout.ms
   158       val result = new StringBuilder(100)
   159 
   160       var finished: Option[Boolean] = None
   161       while (finished.isEmpty && System.currentTimeMillis() <= expired) {
   162         while (finished.isEmpty && process.stdout.ready) {
   163           val c = process.stdout.read
   164           if (c == 2) finished = Some(true)
   165           else result += c.toChar
   166         }
   167         if (process_result.is_finished) finished = Some(false)
   168         else Thread.sleep(10)
   169       }
   170       (finished.isEmpty || !finished.get, result.toString.trim)
   171     }
   172     system_result(startup_output)
   173 
   174     if (startup_failed) {
   175       put_result(Markup.EXIT, "Return code: 127")
   176       process.stdin.close
   177       Thread.sleep(300)
   178       terminate_process()
   179       process_result.join
   180     }
   181     else {
   182       val (command_stream, message_stream) = system_channel.rendezvous()
   183 
   184       standard_input = stdin_actor()
   185       val stdout = raw_output_actor(false)
   186       val stderr = raw_output_actor(true)
   187       command_input = input_actor(command_stream)
   188       val message = message_actor(message_stream)
   189 
   190       val rc = process_result.join
   191       system_result("process terminated")
   192       for ((thread, _) <- List(standard_input, stdout, stderr, command_input, message))
   193         thread.join
   194       system_result("process_manager terminated")
   195       put_result(Markup.EXIT, "Return code: " + rc.toString)
   196     }
   197     system_channel.accepted()
   198   }
   199 
   200 
   201   /* management methods */
   202 
   203   def join() { process_manager.join() }
   204 
   205   def terminate()
   206   {
   207     close()
   208     system_result("Terminating Isabelle process")
   209     terminate_process()
   210   }
   211 
   212 
   213 
   214   /** stream actors **/
   215 
   216   /* raw stdin */
   217 
   218   private def stdin_actor(): (Thread, Actor) =
   219   {
   220     val name = "standard_input"
   221     Simple_Thread.actor(name) {
   222       var finished = false
   223       while (!finished) {
   224         try {
   225           //{{{
   226           receive {
   227             case Input_Text(text) =>
   228               process.stdin.write(text)
   229               process.stdin.flush
   230             case Close =>
   231               process.stdin.close
   232               finished = true
   233             case bad => System.err.println(name + ": ignoring bad message " + bad)
   234           }
   235           //}}}
   236         }
   237         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   238       }
   239       system_result(name + " terminated")
   240     }
   241   }
   242 
   243 
   244   /* raw output */
   245 
   246   private def raw_output_actor(err: Boolean): (Thread, Actor) =
   247   {
   248     val (name, reader, markup) =
   249       if (err) ("standard_error", process.stderr, Markup.STDERR)
   250       else ("standard_output", process.stdout, Markup.STDOUT)
   251 
   252     Simple_Thread.actor(name) {
   253       var result = new StringBuilder(100)
   254 
   255       var finished = false
   256       while (!finished) {
   257         try {
   258           //{{{
   259           var c = -1
   260           var done = false
   261           while (!done && (result.length == 0 || reader.ready)) {
   262             c = reader.read
   263             if (c >= 0) result.append(c.asInstanceOf[Char])
   264             else done = true
   265           }
   266           if (result.length > 0) {
   267             put_result(markup, result.toString)
   268             result.length = 0
   269           }
   270           else {
   271             reader.close
   272             finished = true
   273           }
   274           //}}}
   275         }
   276         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   277       }
   278       system_result(name + " terminated")
   279     }
   280   }
   281 
   282 
   283   /* command input */
   284 
   285   private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
   286   {
   287     val name = "command_input"
   288     Simple_Thread.actor(name) {
   289       val stream = new BufferedOutputStream(raw_stream)
   290       var finished = false
   291       while (!finished) {
   292         try {
   293           //{{{
   294           receive {
   295             case Input_Chunks(chunks) =>
   296               stream.write(Standard_System.string_bytes(
   297                   chunks.map(_.length).mkString("", ",", "\n")));
   298               chunks.foreach(stream.write(_));
   299               stream.flush
   300             case Close =>
   301               stream.close
   302               finished = true
   303             case bad => System.err.println(name + ": ignoring bad message " + bad)
   304           }
   305           //}}}
   306         }
   307         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   308       }
   309       system_result(name + " terminated")
   310     }
   311   }
   312 
   313 
   314   /* message output */
   315 
   316   private def message_actor(stream: InputStream): (Thread, Actor) =
   317   {
   318     class EOF extends Exception
   319     class Protocol_Error(msg: String) extends Exception(msg)
   320 
   321     val name = "message_output"
   322     Simple_Thread.actor(name) {
   323       val default_buffer = new Array[Byte](65536)
   324       var c = -1
   325 
   326       def read_chunk(decode: Boolean): XML.Body =
   327       {
   328         //{{{
   329         // chunk size
   330         var n = 0
   331         c = stream.read
   332         if (c == -1) throw new EOF
   333         while (48 <= c && c <= 57) {
   334           n = 10 * n + (c - 48)
   335           c = stream.read
   336         }
   337         if (c != 10) throw new Protocol_Error("bad message chunk header")
   338 
   339         // chunk content
   340         val buf =
   341           if (n <= default_buffer.size) default_buffer
   342           else new Array[Byte](n)
   343 
   344         var i = 0
   345         var m = 0
   346         do {
   347           m = stream.read(buf, i, n - i)
   348           if (m != -1) i += m
   349         } while (m != -1 && n > i)
   350 
   351         if (i != n) throw new Protocol_Error("bad message chunk content")
   352 
   353         if (decode)
   354           YXML.parse_body_failsafe(Standard_System.decode_chars(Symbol.decode, buf, 0, n))
   355         else List(XML.Text(Standard_System.decode_chars(s => s, buf, 0, n).toString))
   356         //}}}
   357       }
   358 
   359       do {
   360         try {
   361           val header = read_chunk(true)
   362           header match {
   363             case List(XML.Elem(Markup(name, props), Nil))
   364                 if name.size == 1 && Kind.message_markup.isDefinedAt(name(0)) =>
   365               val kind = Kind.message_markup(name(0))
   366               val body = read_chunk(kind != Markup.RAW)
   367               put_result(kind, props, body)
   368             case _ =>
   369               read_chunk(false)
   370               throw new Protocol_Error("bad header: " + header.toString)
   371           }
   372         }
   373         catch {
   374           case e: IOException => system_result("Cannot read message:\n" + e.getMessage)
   375           case e: Protocol_Error => system_result("Malformed message:\n" + e.getMessage)
   376           case _: EOF =>
   377         }
   378       } while (c != -1)
   379       stream.close
   380 
   381       system_result(name + " terminated")
   382     }
   383   }
   384 
   385 
   386   /** main methods **/
   387 
   388   def input_raw(text: String): Unit = standard_input._2 ! Input_Text(text)
   389 
   390   def input_bytes(name: String, args: Array[Byte]*): Unit =
   391     command_input._2 ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
   392 
   393   def input(name: String, args: String*): Unit =
   394   {
   395     receiver(new Input(name, args.toList))
   396     input_bytes(name, args.map(Standard_System.string_bytes): _*)
   397   }
   398 
   399   def close(): Unit = { close(command_input); close(standard_input) }
   400 }