src/Pure/System/isabelle_process.scala
author wenzelm
Fri May 02 20:01:45 2014 +0200 (2014-05-02)
changeset 56831 e3ccf0809d51
parent 56794 a7c5c35b7125
child 57915 448325de6e4f
permissions -rw-r--r--
prefer scala.Console with its support for thread-local redirection;
     1 /*  Title:      Pure/System/isabelle_process.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import java.io.{InputStream, OutputStream, BufferedOutputStream, IOException}
    12 
    13 
    14 class Isabelle_Process(
    15   receiver: Prover.Message => Unit = Console.println(_),
    16   prover_args: List[String] = Nil)
    17 {
    18   /* text and tree data */
    19 
    20   def encode(s: String): String = Symbol.encode(s)
    21   def decode(s: String): String = Symbol.decode(s)
    22 
    23   val xml_cache = new XML.Cache()
    24 
    25 
    26   /* output */
    27 
    28   private def system_output(text: String)
    29   {
    30     receiver(new Prover.Output(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text)))))
    31   }
    32 
    33   private def protocol_output(props: Properties.T, bytes: Bytes)
    34   {
    35     receiver(new Prover.Protocol_Output(props, bytes))
    36   }
    37 
    38   private def output(kind: String, props: Properties.T, body: XML.Body)
    39   {
    40     if (kind == Markup.INIT) system_channel.accepted()
    41 
    42     val main = XML.Elem(Markup(kind, props), Protocol.clean_message(body))
    43     val reports = Protocol.message_reports(props, body)
    44     for (msg <- main :: reports) receiver(new Prover.Output(xml_cache.elem(msg)))
    45   }
    46 
    47   private def exit_message(rc: Int)
    48   {
    49     output(Markup.EXIT, Markup.Return_Code(rc), List(XML.Text("Return code: " + rc.toString)))
    50   }
    51 
    52 
    53 
    54   /** process manager **/
    55 
    56   def command_line(channel: System_Channel, args: List[String]): List[String] =
    57     Isabelle_System.getenv_strict("ISABELLE_PROCESS") :: (channel.isabelle_args ::: args)
    58 
    59   private val system_channel = System_Channel()
    60 
    61   private val process =
    62     try {
    63       val cmdline = command_line(system_channel, prover_args)
    64       new Isabelle_System.Managed_Process(null, null, false, cmdline: _*)
    65     }
    66     catch { case e: IOException => system_channel.accepted(); throw(e) }
    67 
    68   private val (_, process_result) =
    69     Simple_Thread.future("process_result") { process.join }
    70 
    71   private def terminate_process()
    72   {
    73     try { process.terminate }
    74     catch { case e: IOException => system_output("Failed to terminate Isabelle: " + e.getMessage) }
    75   }
    76 
    77   private val process_manager = Simple_Thread.fork("process_manager")
    78   {
    79     val (startup_failed, startup_errors) =
    80     {
    81       var finished: Option[Boolean] = None
    82       val result = new StringBuilder(100)
    83       while (finished.isEmpty && (process.stderr.ready || !process_result.is_finished)) {
    84         while (finished.isEmpty && process.stderr.ready) {
    85           try {
    86             val c = process.stderr.read
    87             if (c == 2) finished = Some(true)
    88             else result += c.toChar
    89           }
    90           catch { case _: IOException => finished = Some(false) }
    91         }
    92         Thread.sleep(10)
    93       }
    94       (finished.isEmpty || !finished.get, result.toString.trim)
    95     }
    96     if (startup_errors != "") system_output(startup_errors)
    97 
    98     process.stdin.close
    99     if (startup_failed) {
   100       terminate_process()
   101       process_result.join
   102       exit_message(127)
   103     }
   104     else {
   105       val (command_stream, message_stream) = system_channel.rendezvous()
   106 
   107       command_input_init(command_stream)
   108       val stdout = physical_output(false)
   109       val stderr = physical_output(true)
   110       val message = message_output(message_stream)
   111 
   112       val rc = process_result.join
   113       system_output("process terminated")
   114       command_input_close()
   115       for (thread <- List(stdout, stderr, message)) thread.join
   116       system_output("process_manager terminated")
   117       exit_message(rc)
   118     }
   119     system_channel.accepted()
   120   }
   121 
   122 
   123   /* management methods */
   124 
   125   def join() { process_manager.join() }
   126 
   127   def interrupt()
   128   {
   129     try { process.interrupt }
   130     catch { case e: IOException => system_output("Failed to interrupt Isabelle: " + e.getMessage) }
   131   }
   132 
   133   def terminate()
   134   {
   135     command_input_close()
   136     system_output("Terminating Isabelle process")
   137     terminate_process()
   138   }
   139 
   140 
   141 
   142   /** process streams **/
   143 
   144   /* command input */
   145 
   146   private var command_input: Option[Consumer_Thread[List[Bytes]]] = None
   147 
   148   private def command_input_close(): Unit = command_input.foreach(_.shutdown)
   149 
   150   private def command_input_init(raw_stream: OutputStream)
   151   {
   152     val name = "command_input"
   153     val stream = new BufferedOutputStream(raw_stream)
   154     command_input =
   155       Some(
   156         Consumer_Thread.fork(name)(
   157           consume =
   158             {
   159               case chunks =>
   160                 try {
   161                   Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream)
   162                   chunks.foreach(_.write(stream))
   163                   stream.flush
   164                   true
   165                 }
   166                 catch { case e: IOException => system_output(name + ": " + e.getMessage); false }
   167             },
   168           finish = { case () => stream.close; system_output(name + " terminated") }
   169         )
   170       )
   171   }
   172 
   173 
   174   /* physical output */
   175 
   176   private def physical_output(err: Boolean): Thread =
   177   {
   178     val (name, reader, markup) =
   179       if (err) ("standard_error", process.stderr, Markup.STDERR)
   180       else ("standard_output", process.stdout, Markup.STDOUT)
   181 
   182     Simple_Thread.fork(name) {
   183       try {
   184         var result = new StringBuilder(100)
   185         var finished = false
   186         while (!finished) {
   187           //{{{
   188           var c = -1
   189           var done = false
   190           while (!done && (result.length == 0 || reader.ready)) {
   191             c = reader.read
   192             if (c >= 0) result.append(c.asInstanceOf[Char])
   193             else done = true
   194           }
   195           if (result.length > 0) {
   196             output(markup, Nil, List(XML.Text(decode(result.toString))))
   197             result.length = 0
   198           }
   199           else {
   200             reader.close
   201             finished = true
   202           }
   203           //}}}
   204         }
   205       }
   206       catch { case e: IOException => system_output(name + ": " + e.getMessage) }
   207       system_output(name + " terminated")
   208     }
   209   }
   210 
   211 
   212   /* message output */
   213 
   214   private def message_output(stream: InputStream): Thread =
   215   {
   216     class EOF extends Exception
   217     class Protocol_Error(msg: String) extends Exception(msg)
   218 
   219     val name = "message_output"
   220     Simple_Thread.fork(name) {
   221       val default_buffer = new Array[Byte](65536)
   222       var c = -1
   223 
   224       def read_int(): Int =
   225       //{{{
   226       {
   227         var n = 0
   228         c = stream.read
   229         if (c == -1) throw new EOF
   230         while (48 <= c && c <= 57) {
   231           n = 10 * n + (c - 48)
   232           c = stream.read
   233         }
   234         if (c != 10)
   235           throw new Protocol_Error("malformed header: expected integer followed by newline")
   236         else n
   237       }
   238       //}}}
   239 
   240       def read_chunk_bytes(): (Array[Byte], Int) =
   241       //{{{
   242       {
   243         val n = read_int()
   244         val buf =
   245           if (n <= default_buffer.size) default_buffer
   246           else new Array[Byte](n)
   247 
   248         var i = 0
   249         var m = 0
   250         do {
   251           m = stream.read(buf, i, n - i)
   252           if (m != -1) i += m
   253         }
   254         while (m != -1 && n > i)
   255 
   256         if (i != n)
   257           throw new Protocol_Error("bad chunk (unexpected EOF after " + i + " of " + n + " bytes)")
   258 
   259         (buf, n)
   260       }
   261       //}}}
   262 
   263       def read_chunk(): XML.Body =
   264       {
   265         val (buf, n) = read_chunk_bytes()
   266         YXML.parse_body_failsafe(UTF8.decode_chars(decode, buf, 0, n))
   267       }
   268 
   269       try {
   270         do {
   271           try {
   272             val header = read_chunk()
   273             header match {
   274               case List(XML.Elem(Markup(name, props), Nil)) =>
   275                 val kind = name.intern
   276                 if (kind == Markup.PROTOCOL) {
   277                   val (buf, n) = read_chunk_bytes()
   278                   protocol_output(props, Bytes(buf, 0, n))
   279                 }
   280                 else {
   281                   val body = read_chunk()
   282                   output(kind, props, body)
   283                 }
   284               case _ =>
   285                 read_chunk()
   286                 throw new Protocol_Error("bad header: " + header.toString)
   287             }
   288           }
   289           catch { case _: EOF => }
   290         }
   291         while (c != -1)
   292       }
   293       catch {
   294         case e: IOException => system_output("Cannot read message:\n" + e.getMessage)
   295         case e: Protocol_Error => system_output("Malformed message:\n" + e.getMessage)
   296       }
   297       stream.close
   298 
   299       system_output(name + " terminated")
   300     }
   301   }
   302 
   303 
   304 
   305   /** protocol commands **/
   306 
   307   def protocol_command_bytes(name: String, args: Bytes*): Unit =
   308     command_input match {
   309       case Some(thread) => thread.send(Bytes(name) :: args.toList)
   310       case None => error("Uninitialized command input thread")
   311     }
   312 
   313   def protocol_command(name: String, args: String*)
   314   {
   315     receiver(new Prover.Input(name, args.toList))
   316     protocol_command_bytes(name, args.map(Bytes(_)): _*)
   317   }
   318 }