src/Pure/System/isabelle_process.scala
author wenzelm
Mon Jul 15 10:25:35 2013 +0200 (2013-07-15 ago)
changeset 52655 3b2b1ef13979
parent 52582 31467a4b1466
child 52799 6a4498b048b7
permissions -rw-r--r--
more careful termination of removed execs, leaving running execs undisturbed;
     1 /*  Title:      Pure/System/isabelle_process.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.lang.System
    11 import java.util.concurrent.LinkedBlockingQueue
    12 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    13   InputStream, OutputStream, BufferedOutputStream, IOException}
    14 
    15 import scala.actors.Actor
    16 import Actor._
    17 
    18 
    19 object Isabelle_Process
    20 {
    21   /* messages */
    22 
    23   sealed abstract class Message
    24 
    25   class Input(name: String, args: List[String]) extends Message
    26   {
    27     override def toString: String =
    28       XML.Elem(Markup(Markup.PROVER_COMMAND, List((Markup.NAME, name))),
    29         args.map(s =>
    30           List(XML.Text("\n"), XML.elem(Markup.PROVER_ARG, YXML.parse_body(s)))).flatten).toString
    31   }
    32 
    33   class Output(val message: XML.Elem) extends Message
    34   {
    35     def kind: String = message.markup.name
    36     def properties: Properties.T = message.markup.properties
    37     def body: XML.Body = message.body
    38 
    39     def is_init = kind == Markup.INIT
    40     def is_exit = kind == Markup.EXIT
    41     def is_stdout = kind == Markup.STDOUT
    42     def is_stderr = kind == Markup.STDERR
    43     def is_system = kind == Markup.SYSTEM
    44     def is_status = kind == Markup.STATUS
    45     def is_report = kind == Markup.REPORT
    46     def is_protocol = kind == Markup.PROTOCOL
    47     def is_syslog = is_init || is_exit || is_system || is_stderr
    48 
    49     override def toString: String =
    50     {
    51       val res =
    52         if (is_status || is_report) message.body.map(_.toString).mkString
    53         else if (is_protocol) "..."
    54         else Pretty.string_of(message.body)
    55       if (properties.isEmpty)
    56         kind.toString + " [[" + res + "]]"
    57       else
    58         kind.toString + " " +
    59           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    60     }
    61   }
    62 }
    63 
    64 
    65 class Isabelle_Process(
    66     receiver: Isabelle_Process.Message => Unit = Console.println(_),
    67     args: List[String] = Nil)
    68 {
    69   import Isabelle_Process._
    70 
    71 
    72   /* text representation */
    73 
    74   def encode(s: String): String = Symbol.encode(s)
    75   def decode(s: String): String = Symbol.decode(s)
    76 
    77   object Encode
    78   {
    79     val string: XML.Encode.T[String] = (s => XML.Encode.string(encode(s)))
    80   }
    81 
    82 
    83   /* output */
    84 
    85   val xml_cache = new XML.Cache()
    86 
    87   private def system_output(text: String)
    88   {
    89     receiver(new Output(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text)))))
    90   }
    91 
    92   private def output_message(kind: String, props: Properties.T, body: XML.Body)
    93   {
    94     if (kind == Markup.INIT) system_channel.accepted()
    95     if (kind == Markup.PROTOCOL)
    96       receiver(new Output(XML.Elem(Markup(kind, props), body)))
    97     else {
    98       val main = XML.Elem(Markup(kind, props), Protocol.clean_message(body))
    99       val reports = Protocol.message_reports(props, body)
   100       for (msg <- main :: reports) receiver(new Output(xml_cache.elem(msg)))
   101     }
   102   }
   103 
   104   private def exit_message(rc: Int)
   105   {
   106     output_message(Markup.EXIT, Markup.Return_Code(rc),
   107       List(XML.Text("Return code: " + rc.toString)))
   108   }
   109 
   110 
   111   /* command input actor */
   112 
   113   private case class Input_Chunks(chunks: List[Array[Byte]])
   114 
   115   private case object Close
   116   private def close(p: (Thread, Actor))
   117   {
   118     if (p != null && p._1.isAlive) {
   119       p._2 ! Close
   120       p._1.join
   121     }
   122   }
   123 
   124   @volatile private var command_input: (Thread, Actor) = null
   125 
   126 
   127   /** process manager **/
   128 
   129   private val system_channel = System_Channel()
   130 
   131   private val process =
   132     try {
   133       val cmdline =
   134         Isabelle_System.getenv_strict("ISABELLE_PROCESS") ::
   135           (system_channel.isabelle_args ::: args)
   136       new Isabelle_System.Managed_Process(null, null, false, cmdline: _*)
   137     }
   138     catch { case e: IOException => system_channel.accepted(); throw(e) }
   139 
   140   val (_, process_result) =
   141     Simple_Thread.future("process_result") { process.join }
   142 
   143   private def terminate_process()
   144   {
   145     try { process.terminate }
   146     catch { case e: IOException => system_output("Failed to terminate Isabelle: " + e.getMessage) }
   147   }
   148 
   149   private val process_manager = Simple_Thread.fork("process_manager")
   150   {
   151     val (startup_failed, startup_errors) =
   152     {
   153       var finished: Option[Boolean] = None
   154       val result = new StringBuilder(100)
   155       while (finished.isEmpty && (process.stderr.ready || !process_result.is_finished)) {
   156         while (finished.isEmpty && process.stderr.ready) {
   157           try {
   158             val c = process.stderr.read
   159             if (c == 2) finished = Some(true)
   160             else result += c.toChar
   161           }
   162           catch { case _: IOException => finished = Some(false) }
   163         }
   164         Thread.sleep(10)
   165       }
   166       (finished.isEmpty || !finished.get, result.toString.trim)
   167     }
   168     if (startup_errors != "") system_output(startup_errors)
   169 
   170     process.stdin.close
   171     if (startup_failed) {
   172       exit_message(127)
   173       Thread.sleep(300)
   174       terminate_process()
   175       process_result.join
   176     }
   177     else {
   178       val (command_stream, message_stream) = system_channel.rendezvous()
   179 
   180       val stdout = physical_output_actor(false)
   181       val stderr = physical_output_actor(true)
   182       command_input = input_actor(command_stream)
   183       val message = message_actor(message_stream)
   184 
   185       val rc = process_result.join
   186       system_output("process terminated")
   187       close(command_input)
   188       for ((thread, _) <- List(stdout, stderr, command_input, message))
   189         thread.join
   190       system_output("process_manager terminated")
   191       exit_message(rc)
   192     }
   193     system_channel.accepted()
   194   }
   195 
   196 
   197   /* management methods */
   198 
   199   def join() { process_manager.join() }
   200 
   201   def terminate()
   202   {
   203     close(command_input)
   204     system_output("Terminating Isabelle process")
   205     terminate_process()
   206   }
   207 
   208 
   209 
   210   /** stream actors **/
   211 
   212   /* physical output */
   213 
   214   private def physical_output_actor(err: Boolean): (Thread, Actor) =
   215   {
   216     val (name, reader, markup) =
   217       if (err) ("standard_error", process.stderr, Markup.STDERR)
   218       else ("standard_output", process.stdout, Markup.STDOUT)
   219 
   220     Simple_Thread.actor(name) {
   221       try {
   222         var result = new StringBuilder(100)
   223         var finished = false
   224         while (!finished) {
   225           //{{{
   226           var c = -1
   227           var done = false
   228           while (!done && (result.length == 0 || reader.ready)) {
   229             c = reader.read
   230             if (c >= 0) result.append(c.asInstanceOf[Char])
   231             else done = true
   232           }
   233           if (result.length > 0) {
   234             output_message(markup, Nil, List(XML.Text(decode(result.toString))))
   235             result.length = 0
   236           }
   237           else {
   238             reader.close
   239             finished = true
   240           }
   241           //}}}
   242         }
   243       }
   244       catch { case e: IOException => system_output(name + ": " + e.getMessage) }
   245       system_output(name + " terminated")
   246     }
   247   }
   248 
   249 
   250   /* command input */
   251 
   252   private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
   253   {
   254     val name = "command_input"
   255     Simple_Thread.actor(name) {
   256       try {
   257         val stream = new BufferedOutputStream(raw_stream)
   258         var finished = false
   259         while (!finished) {
   260           //{{{
   261           receive {
   262             case Input_Chunks(chunks) =>
   263               stream.write(UTF8.string_bytes(chunks.map(_.length).mkString("", ",", "\n")))
   264               chunks.foreach(stream.write(_))
   265               stream.flush
   266             case Close =>
   267               stream.close
   268               finished = true
   269             case bad => System.err.println(name + ": ignoring bad message " + bad)
   270           }
   271           //}}}
   272         }
   273       }
   274       catch { case e: IOException => system_output(name + ": " + e.getMessage) }
   275       system_output(name + " terminated")
   276     }
   277   }
   278 
   279 
   280   /* message output */
   281 
   282   private def message_actor(stream: InputStream): (Thread, Actor) =
   283   {
   284     class EOF extends Exception
   285     class Protocol_Error(msg: String) extends Exception(msg)
   286 
   287     val name = "message_output"
   288     Simple_Thread.actor(name) {
   289       val default_buffer = new Array[Byte](65536)
   290       var c = -1
   291 
   292       def read_chunk(do_decode: Boolean): XML.Body =
   293       {
   294         //{{{
   295         // chunk size
   296         var n = 0
   297         c = stream.read
   298         if (c == -1) throw new EOF
   299         while (48 <= c && c <= 57) {
   300           n = 10 * n + (c - 48)
   301           c = stream.read
   302         }
   303         if (c != 10) throw new Protocol_Error("bad message chunk header")
   304 
   305         // chunk content
   306         val buf =
   307           if (n <= default_buffer.size) default_buffer
   308           else new Array[Byte](n)
   309 
   310         var i = 0
   311         var m = 0
   312         do {
   313           m = stream.read(buf, i, n - i)
   314           if (m != -1) i += m
   315         } while (m != -1 && n > i)
   316 
   317         if (i != n) throw new Protocol_Error("bad message chunk content")
   318 
   319         if (do_decode)
   320           YXML.parse_body_failsafe(UTF8.decode_chars(decode, buf, 0, n))
   321         else List(XML.Text(UTF8.decode_chars(s => s, buf, 0, n).toString))
   322         //}}}
   323       }
   324 
   325       try {
   326         do {
   327           try {
   328             //{{{
   329             val header = read_chunk(true)
   330             header match {
   331               case List(XML.Elem(Markup(name, props), Nil)) =>
   332                 val kind = name.intern
   333                 val body = read_chunk(kind != Markup.PROTOCOL)
   334                 output_message(kind, props, body)
   335               case _ =>
   336                 read_chunk(false)
   337                 throw new Protocol_Error("bad header: " + header.toString)
   338             }
   339             //}}}
   340           }
   341           catch { case _: EOF => }
   342         } while (c != -1)
   343       }
   344       catch {
   345         case e: IOException => system_output("Cannot read message:\n" + e.getMessage)
   346         case e: Protocol_Error => system_output("Malformed message:\n" + e.getMessage)
   347       }
   348       stream.close
   349 
   350       system_output(name + " terminated")
   351     }
   352   }
   353 
   354 
   355   /** main methods **/
   356 
   357   def protocol_command_raw(name: String, args: Array[Byte]*): Unit =
   358     command_input._2 ! Input_Chunks(UTF8.string_bytes(name) :: args.toList)
   359 
   360   def protocol_command(name: String, args: String*)
   361   {
   362     receiver(new Input(name, args.toList))
   363     protocol_command_raw(name, args.map(UTF8.string_bytes): _*)
   364   }
   365 
   366   def options(opts: Options): Unit =
   367     protocol_command("Isabelle_Process.options", YXML.string_of_body(opts.encode))
   368 }