src/Pure/System/isabelle_process.scala
author wenzelm
Mon Oct 17 11:24:22 2011 +0200 (2011-10-17 ago)
changeset 45158 db4bf4fb5492
parent 45075 6c66e268f8eb
child 45633 2cb7e34f6096
permissions -rw-r--r--
always use sockets on Windows/Cygwin;
discontinued special raw_dump facility;
     1 /*  Title:      Pure/System/isabelle_process.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.lang.System
    11 import java.util.concurrent.LinkedBlockingQueue
    12 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    13   InputStream, OutputStream, BufferedOutputStream, IOException}
    14 
    15 import scala.actors.Actor
    16 import Actor._
    17 
    18 
    19 object Isabelle_Process
    20 {
    21   /* results */
    22 
    23   object Kind
    24   {
    25     val message_markup = Map(
    26       ('A' : Int) -> Markup.INIT,
    27       ('B' : Int) -> Markup.STATUS,
    28       ('C' : Int) -> Markup.REPORT,
    29       ('D' : Int) -> Markup.WRITELN,
    30       ('E' : Int) -> Markup.TRACING,
    31       ('F' : Int) -> Markup.WARNING,
    32       ('G' : Int) -> Markup.ERROR,
    33       ('H' : Int) -> Markup.RAW)
    34   }
    35 
    36   sealed abstract class Message
    37 
    38   class Input(name: String, args: List[String]) extends Message
    39   {
    40     override def toString: String =
    41       XML.Elem(Markup(Markup.PROVER_COMMAND, List((Markup.NAME, name))),
    42         args.map(s =>
    43           List(XML.Text("\n"), XML.elem(Markup.PROVER_ARG, YXML.parse_body(s)))).flatten).toString
    44   }
    45 
    46   class Result(val message: XML.Elem) extends Message
    47   {
    48     def kind: String = message.markup.name
    49     def properties: Properties.T = message.markup.properties
    50     def body: XML.Body = message.body
    51 
    52     def is_init = kind == Markup.INIT
    53     def is_exit = kind == Markup.EXIT
    54     def is_stdout = kind == Markup.STDOUT
    55     def is_system = kind == Markup.SYSTEM
    56     def is_status = kind == Markup.STATUS
    57     def is_report = kind == Markup.REPORT
    58     def is_raw = kind == Markup.RAW
    59     def is_ready = Isar_Document.is_ready(message)
    60     def is_syslog = is_init || is_exit || is_system || is_ready
    61 
    62     override def toString: String =
    63     {
    64       val res =
    65         if (is_status || is_report) message.body.map(_.toString).mkString
    66         else if (is_raw) "..."
    67         else Pretty.string_of(message.body)
    68       if (properties.isEmpty)
    69         kind.toString + " [[" + res + "]]"
    70       else
    71         kind.toString + " " +
    72           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    73     }
    74   }
    75 }
    76 
    77 
    78 class Isabelle_Process(
    79     timeout: Time = Time.seconds(25),
    80     receiver: Isabelle_Process.Message => Unit = Console.println(_),
    81     args: List[String] = Nil)
    82 {
    83   import Isabelle_Process._
    84 
    85 
    86   /* results */
    87 
    88   private def system_result(text: String)
    89   {
    90     receiver(new Result(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text)))))
    91   }
    92 
    93   private val xml_cache = new XML.Cache()
    94 
    95   private def put_result(kind: String, props: Properties.T, body: XML.Body)
    96   {
    97     if (kind == Markup.INIT) system_channel.accepted()
    98     if (kind == Markup.RAW)
    99       receiver(new Result(XML.Elem(Markup(kind, props), body)))
   100     else {
   101       val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
   102       receiver(new Result(xml_cache.cache_tree(msg).asInstanceOf[XML.Elem]))
   103     }
   104   }
   105 
   106   private def put_result(kind: String, text: String)
   107   {
   108     put_result(kind, Nil, List(XML.Text(Symbol.decode(text))))
   109   }
   110 
   111 
   112   /* input actors */
   113 
   114   private case class Input_Text(text: String)
   115   private case class Input_Chunks(chunks: List[Array[Byte]])
   116 
   117   private case object Close
   118   private def close(p: (Thread, Actor))
   119   {
   120     if (p != null && p._1.isAlive) {
   121       p._2 ! Close
   122       p._1.join
   123     }
   124   }
   125 
   126   @volatile private var standard_input: (Thread, Actor) = null
   127   @volatile private var command_input: (Thread, Actor) = null
   128 
   129 
   130   /** process manager **/
   131 
   132   private val system_channel = System_Channel()
   133 
   134   private val process =
   135     try {
   136       val cmdline =
   137         Isabelle_System.getenv_strict("ISABELLE_PROCESS") ::
   138           (system_channel.isabelle_args ::: args)
   139       new Isabelle_System.Managed_Process(true, cmdline: _*)
   140     }
   141     catch { case e: IOException => system_channel.accepted(); throw(e) }
   142 
   143   val process_result =
   144     Simple_Thread.future("process_result") { process.join }
   145 
   146   private def terminate_process()
   147   {
   148     try { process.terminate }
   149     catch { case e: IOException => system_result("Failed to terminate Isabelle: " + e.getMessage) }
   150   }
   151 
   152   private val process_manager = Simple_Thread.fork("process_manager")
   153   {
   154     val (startup_failed, startup_output) =
   155     {
   156       val expired = System.currentTimeMillis() + timeout.ms
   157       val result = new StringBuilder(100)
   158 
   159       var finished: Option[Boolean] = None
   160       while (finished.isEmpty && System.currentTimeMillis() <= expired) {
   161         while (finished.isEmpty && process.stdout.ready) {
   162           val c = process.stdout.read
   163           if (c == 2) finished = Some(true)
   164           else result += c.toChar
   165         }
   166         if (process_result.is_finished) finished = Some(false)
   167         else Thread.sleep(10)
   168       }
   169       (finished.isEmpty || !finished.get, result.toString.trim)
   170     }
   171     system_result(startup_output)
   172 
   173     if (startup_failed) {
   174       put_result(Markup.EXIT, "Return code: 127")
   175       process.stdin.close
   176       Thread.sleep(300)
   177       terminate_process()
   178       process_result.join
   179     }
   180     else {
   181       val (command_stream, message_stream) = system_channel.rendezvous()
   182 
   183       standard_input = stdin_actor()
   184       val stdout = stdout_actor()
   185       command_input = input_actor(command_stream)
   186       val message = message_actor(message_stream)
   187 
   188       val rc = process_result.join
   189       system_result("process terminated")
   190       for ((thread, _) <- List(standard_input, stdout, command_input, message)) thread.join
   191       system_result("process_manager terminated")
   192       put_result(Markup.EXIT, "Return code: " + rc.toString)
   193     }
   194     system_channel.accepted()
   195   }
   196 
   197 
   198   /* management methods */
   199 
   200   def join() { process_manager.join() }
   201 
   202   def terminate()
   203   {
   204     close()
   205     system_result("Terminating Isabelle process")
   206     terminate_process()
   207   }
   208 
   209 
   210 
   211   /** stream actors **/
   212 
   213   /* raw stdin */
   214 
   215   private def stdin_actor(): (Thread, Actor) =
   216   {
   217     val name = "standard_input"
   218     Simple_Thread.actor(name) {
   219       var finished = false
   220       while (!finished) {
   221         try {
   222           //{{{
   223           receive {
   224             case Input_Text(text) =>
   225               process.stdin.write(text)
   226               process.stdin.flush
   227             case Close =>
   228               process.stdin.close
   229               finished = true
   230             case bad => System.err.println(name + ": ignoring bad message " + bad)
   231           }
   232           //}}}
   233         }
   234         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   235       }
   236       system_result(name + " terminated")
   237     }
   238   }
   239 
   240 
   241   /* raw stdout */
   242 
   243   private def stdout_actor(): (Thread, Actor) =
   244   {
   245     val name = "standard_output"
   246     Simple_Thread.actor(name) {
   247       var result = new StringBuilder(100)
   248 
   249       var finished = false
   250       while (!finished) {
   251         try {
   252           //{{{
   253           var c = -1
   254           var done = false
   255           while (!done && (result.length == 0 || process.stdout.ready)) {
   256             c = process.stdout.read
   257             if (c >= 0) result.append(c.asInstanceOf[Char])
   258             else done = true
   259           }
   260           if (result.length > 0) {
   261             put_result(Markup.STDOUT, result.toString)
   262             result.length = 0
   263           }
   264           else {
   265             process.stdout.close
   266             finished = true
   267           }
   268           //}}}
   269         }
   270         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   271       }
   272       system_result(name + " terminated")
   273     }
   274   }
   275 
   276 
   277   /* command input */
   278 
   279   private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
   280   {
   281     val name = "command_input"
   282     Simple_Thread.actor(name) {
   283       val stream = new BufferedOutputStream(raw_stream)
   284       var finished = false
   285       while (!finished) {
   286         try {
   287           //{{{
   288           receive {
   289             case Input_Chunks(chunks) =>
   290               stream.write(Standard_System.string_bytes(
   291                   chunks.map(_.length).mkString("", ",", "\n")));
   292               chunks.foreach(stream.write(_));
   293               stream.flush
   294             case Close =>
   295               stream.close
   296               finished = true
   297             case bad => System.err.println(name + ": ignoring bad message " + bad)
   298           }
   299           //}}}
   300         }
   301         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   302       }
   303       system_result(name + " terminated")
   304     }
   305   }
   306 
   307 
   308   /* message output */
   309 
   310   private def message_actor(stream: InputStream): (Thread, Actor) =
   311   {
   312     class EOF extends Exception
   313     class Protocol_Error(msg: String) extends Exception(msg)
   314 
   315     val name = "message_output"
   316     Simple_Thread.actor(name) {
   317       val default_buffer = new Array[Byte](65536)
   318       var c = -1
   319 
   320       def read_chunk(decode: Boolean): XML.Body =
   321       {
   322         //{{{
   323         // chunk size
   324         var n = 0
   325         c = stream.read
   326         if (c == -1) throw new EOF
   327         while (48 <= c && c <= 57) {
   328           n = 10 * n + (c - 48)
   329           c = stream.read
   330         }
   331         if (c != 10) throw new Protocol_Error("bad message chunk header")
   332 
   333         // chunk content
   334         val buf =
   335           if (n <= default_buffer.size) default_buffer
   336           else new Array[Byte](n)
   337 
   338         var i = 0
   339         var m = 0
   340         do {
   341           m = stream.read(buf, i, n - i)
   342           if (m != -1) i += m
   343         } while (m != -1 && n > i)
   344 
   345         if (i != n) throw new Protocol_Error("bad message chunk content")
   346 
   347         if (decode)
   348           YXML.parse_body_failsafe(Standard_System.decode_chars(Symbol.decode, buf, 0, n))
   349         else List(XML.Text(Standard_System.decode_chars(s => s, buf, 0, n).toString))
   350         //}}}
   351       }
   352 
   353       do {
   354         try {
   355           val header = read_chunk(true)
   356           header match {
   357             case List(XML.Elem(Markup(name, props), Nil))
   358                 if name.size == 1 && Kind.message_markup.isDefinedAt(name(0)) =>
   359               val kind = Kind.message_markup(name(0))
   360               val body = read_chunk(kind != Markup.RAW)
   361               put_result(kind, props, body)
   362             case _ =>
   363               read_chunk(false)
   364               throw new Protocol_Error("bad header: " + header.toString)
   365           }
   366         }
   367         catch {
   368           case e: IOException => system_result("Cannot read message:\n" + e.getMessage)
   369           case e: Protocol_Error => system_result("Malformed message:\n" + e.getMessage)
   370           case _: EOF =>
   371         }
   372       } while (c != -1)
   373       stream.close
   374 
   375       system_result(name + " terminated")
   376     }
   377   }
   378 
   379 
   380   /** main methods **/
   381 
   382   def input_raw(text: String): Unit = standard_input._2 ! Input_Text(text)
   383 
   384   def input_bytes(name: String, args: Array[Byte]*): Unit =
   385     command_input._2 ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
   386 
   387   def input(name: String, args: String*): Unit =
   388   {
   389     receiver(new Input(name, args.toList))
   390     input_bytes(name, args.map(Standard_System.string_bytes): _*)
   391   }
   392 
   393   def close(): Unit = { close(command_input); close(standard_input) }
   394 }