src/Pure/System/isabelle_process.scala
author wenzelm
Wed Sep 22 21:21:04 2010 +0200 (2010-09-22 ago)
changeset 39618 1776b55f8d7a
parent 39591 a43a723753e6
child 39623 6aae022fde9b
permissions -rw-r--r--
tuned message;
     1 /*  Title:      Pure/System/isabelle_process.ML
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.util.concurrent.LinkedBlockingQueue
    11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    12   InputStream, OutputStream, BufferedOutputStream, IOException}
    13 
    14 import scala.actors.Actor
    15 import Actor._
    16 import scala.collection.mutable
    17 
    18 
    19 object Isabelle_Process
    20 {
    21   /* results */
    22 
    23   object Kind
    24   {
    25     val message_markup = Map(
    26       ('A' : Int) -> Markup.INIT,
    27       ('B' : Int) -> Markup.STATUS,
    28       ('C' : Int) -> Markup.REPORT,
    29       ('D' : Int) -> Markup.WRITELN,
    30       ('E' : Int) -> Markup.TRACING,
    31       ('F' : Int) -> Markup.WARNING,
    32       ('G' : Int) -> Markup.ERROR)
    33   }
    34 
    35   class Result(val message: XML.Elem)
    36   {
    37     def kind = message.markup.name
    38     def properties = message.markup.properties
    39     def body = message.body
    40 
    41     def is_init = kind == Markup.INIT
    42     def is_exit = kind == Markup.EXIT
    43     def is_stdout = kind == Markup.STDOUT
    44     def is_system = kind == Markup.SYSTEM
    45     def is_status = kind == Markup.STATUS
    46     def is_report = kind == Markup.REPORT
    47     def is_ready = is_status && {
    48       body match {
    49         case List(XML.Elem(Markup(Markup.READY, _), _)) => true
    50         case _ => false
    51       }}
    52 
    53     override def toString: String =
    54     {
    55       val res =
    56         if (is_status || is_report) message.body.map(_.toString).mkString
    57         else Pretty.string_of(message.body)
    58       if (properties.isEmpty)
    59         kind.toString + " [[" + res + "]]"
    60       else
    61         kind.toString + " " +
    62           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    63     }
    64   }
    65 }
    66 
    67 
    68 class Isabelle_Process(system: Isabelle_System, timeout: Int, receiver: Actor, args: String*)
    69 {
    70   import Isabelle_Process._
    71 
    72 
    73   /* demo constructor */
    74 
    75   def this(args: String*) =
    76     this(new Isabelle_System, 10000,
    77       actor { loop { react { case res => Console.println(res) } } }, args: _*)
    78 
    79 
    80   /* system log */
    81 
    82   private val system_results = new mutable.ListBuffer[String]
    83 
    84   private def system_result(text: String)
    85   {
    86     synchronized { system_results += text }
    87     receiver ! new Result(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))
    88   }
    89 
    90   def syslog(): List[String] = synchronized { system_results.toList }
    91 
    92 
    93   /* results */
    94 
    95   private val xml_cache = new XML.Cache(131071)
    96 
    97   private def put_result(kind: String, props: List[(String, String)], body: XML.Body)
    98   {
    99     if (kind == Markup.INIT) rm_fifos()
   100     val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
   101     xml_cache.cache_tree(msg)((message: XML.Tree) =>
   102       receiver ! new Result(message.asInstanceOf[XML.Elem]))
   103   }
   104 
   105   private def put_result(kind: String, text: String)
   106   {
   107     put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
   108   }
   109 
   110 
   111   /* input actors */
   112 
   113   private case class Input_Text(text: String)
   114   private case class Input_Chunks(chunks: List[Array[Byte]])
   115 
   116   private case object Close
   117   private def close(p: (Thread, Actor))
   118   {
   119     if (p != null && p._1.isAlive) {
   120       p._2 ! Close
   121       p._1.join
   122     }
   123   }
   124 
   125   @volatile private var standard_input: (Thread, Actor) = null
   126   @volatile private var command_input: (Thread, Actor) = null
   127 
   128 
   129   /** process manager **/
   130 
   131   private val in_fifo = system.mk_fifo()
   132   private val out_fifo = system.mk_fifo()
   133   private def rm_fifos() { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
   134 
   135   private val process =
   136     try {
   137       val cmdline =
   138         List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
   139       new system.Managed_Process(true, cmdline: _*)
   140     }
   141     catch { case e: IOException => rm_fifos(); throw(e) }
   142 
   143   val process_result =
   144     Simple_Thread.future("process_result") { process.join }
   145 
   146   private def terminate_process()
   147   {
   148     try { process.terminate }
   149     catch { case e: IOException => system_result("Failed to terminate Isabelle: " + e.getMessage) }
   150   }
   151 
   152   private val process_manager = Simple_Thread.fork("process_manager")
   153   {
   154     val (startup_failed, startup_output) =
   155     {
   156       val expired = System.currentTimeMillis() + timeout
   157       val result = new StringBuilder(100)
   158 
   159       var finished: Option[Boolean] = None
   160       while (finished.isEmpty && System.currentTimeMillis() <= expired) {
   161         while (finished.isEmpty && process.stdout.ready) {
   162           val c = process.stdout.read
   163           if (c == 2) finished = Some(true)
   164           else result += c.toChar
   165         }
   166         if (process_result.is_finished) finished = Some(false)
   167         else Thread.sleep(10)
   168       }
   169       (finished.isEmpty || !finished.get, result.toString)
   170     }
   171     system_result(startup_output)
   172 
   173     if (startup_failed) {
   174       put_result(Markup.EXIT, "127")
   175       process.stdin.close
   176       Thread.sleep(300)
   177       terminate_process()
   178       process_result.join
   179     }
   180     else {
   181       // rendezvous
   182       val command_stream = system.fifo_output_stream(in_fifo)
   183       val message_stream = system.fifo_input_stream(out_fifo)
   184 
   185       standard_input = stdin_actor()
   186       val stdout = stdout_actor()
   187       command_input = input_actor(command_stream)
   188       val message = message_actor(message_stream)
   189 
   190       val rc = process_result.join
   191       system_result("process terminated")
   192       for ((thread, _) <- List(standard_input, stdout, command_input, message)) thread.join
   193       system_result("process_manager terminated")
   194       put_result(Markup.EXIT, rc.toString)
   195     }
   196     rm_fifos()
   197   }
   198 
   199 
   200   /* management methods */
   201 
   202   def join() { process_manager.join() }
   203 
   204   def interrupt()
   205   {
   206     try { process.interrupt }
   207     catch { case e: IOException => system_result("Failed to interrupt Isabelle: " + e.getMessage) }
   208   }
   209 
   210   def terminate()
   211   {
   212     close()
   213     system_result("Terminating Isabelle process")
   214     terminate_process()
   215   }
   216 
   217 
   218 
   219   /** stream actors **/
   220 
   221   /* raw stdin */
   222 
   223   private def stdin_actor(): (Thread, Actor) =
   224   {
   225     val name = "standard_input"
   226     Simple_Thread.actor(name) {
   227       var finished = false
   228       while (!finished) {
   229         try {
   230           //{{{
   231           receive {
   232             case Input_Text(text) =>
   233               process.stdin.write(text)
   234               process.stdin.flush
   235             case Close =>
   236               process.stdin.close
   237               finished = true
   238             case bad => System.err.println(name + ": ignoring bad message " + bad)
   239           }
   240           //}}}
   241         }
   242         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   243       }
   244       system_result(name + " terminated")
   245     }
   246   }
   247 
   248 
   249   /* raw stdout */
   250 
   251   private def stdout_actor(): (Thread, Actor) =
   252   {
   253     val name = "standard_output"
   254     Simple_Thread.actor(name) {
   255       var result = new StringBuilder(100)
   256 
   257       var finished = false
   258       while (!finished) {
   259         try {
   260           //{{{
   261           var c = -1
   262           var done = false
   263           while (!done && (result.length == 0 || process.stdout.ready)) {
   264             c = process.stdout.read
   265             if (c >= 0) result.append(c.asInstanceOf[Char])
   266             else done = true
   267           }
   268           if (result.length > 0) {
   269             put_result(Markup.STDOUT, result.toString)
   270             result.length = 0
   271           }
   272           else {
   273             process.stdout.close
   274             finished = true
   275           }
   276           //}}}
   277         }
   278         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   279       }
   280       system_result(name + " terminated")
   281     }
   282   }
   283 
   284 
   285   /* command input */
   286 
   287   private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
   288   {
   289     val name = "command_input"
   290     Simple_Thread.actor(name) {
   291       val stream = new BufferedOutputStream(raw_stream)
   292       var finished = false
   293       while (!finished) {
   294         try {
   295           //{{{
   296           receive {
   297             case Input_Chunks(chunks) =>
   298               stream.write(Standard_System.string_bytes(
   299                   chunks.map(_.length).mkString("", ",", "\n")));
   300               chunks.foreach(stream.write(_));
   301               stream.flush
   302             case Close =>
   303               stream.close
   304               finished = true
   305             case bad => System.err.println(name + ": ignoring bad message " + bad)
   306           }
   307           //}}}
   308         }
   309         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   310       }
   311       system_result(name + " terminated")
   312     }
   313   }
   314 
   315 
   316   /* message output */
   317 
   318   private def message_actor(stream: InputStream): (Thread, Actor) =
   319   {
   320     class EOF extends Exception
   321     class Protocol_Error(msg: String) extends Exception(msg)
   322 
   323     val name = "message_output"
   324     Simple_Thread.actor(name) {
   325       val default_buffer = new Array[Byte](65536)
   326       var c = -1
   327 
   328       def read_chunk(): XML.Body =
   329       {
   330         //{{{
   331         // chunk size
   332         var n = 0
   333         c = stream.read
   334         if (c == -1) throw new EOF
   335         while (48 <= c && c <= 57) {
   336           n = 10 * n + (c - 48)
   337           c = stream.read
   338         }
   339         if (c != 10) throw new Protocol_Error("bad message chunk header")
   340 
   341         // chunk content
   342         val buf =
   343           if (n <= default_buffer.size) default_buffer
   344           else new Array[Byte](n)
   345 
   346         var i = 0
   347         var m = 0
   348         do {
   349           m = stream.read(buf, i, n - i)
   350           i += m
   351         } while (m > 0 && n > i)
   352 
   353         if (i != n) throw new Protocol_Error("bad message chunk content")
   354 
   355         YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
   356         //}}}
   357       }
   358 
   359       do {
   360         try {
   361           val header = read_chunk()
   362           val body = read_chunk()
   363           header match {
   364             case List(XML.Elem(Markup(name, props), Nil))
   365                 if name.size == 1 && Kind.message_markup.isDefinedAt(name(0)) =>
   366               put_result(Kind.message_markup(name(0)), props, body)
   367             case _ => throw new Protocol_Error("bad header: " + header.toString)
   368           }
   369         }
   370         catch {
   371           case e: IOException => system_result("Cannot read message:\n" + e.getMessage)
   372           case e: Protocol_Error => system_result("Malformed message:\n" + e.getMessage)
   373           case _: EOF =>
   374         }
   375       } while (c != -1)
   376       stream.close
   377 
   378       system_result(name + " terminated")
   379     }
   380   }
   381 
   382 
   383   /** main methods **/
   384 
   385   def input_raw(text: String): Unit = standard_input._2 ! Input_Text(text)
   386 
   387   def input_bytes(name: String, args: Array[Byte]*): Unit =
   388     command_input._2 ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
   389 
   390   def input(name: String, args: String*): Unit =
   391     input_bytes(name, args.map(Standard_System.string_bytes): _*)
   392 
   393   def close(): Unit = { close(command_input); close(standard_input) }
   394 }