src/Pure/System/isabelle_process.scala
author wenzelm
Wed Sep 22 13:47:48 2010 +0200 (2010-09-22 ago)
changeset 39585 00be8711082f
parent 39575 c77b9374f45c
child 39587 f84b70e3bb9c
permissions -rw-r--r--
main Isabelle_Process via Isabelle_System.Managed_Process;
simplified init message: no pid;
misc tuning and simplification;
     1 /*  Title:      Pure/System/isabelle_process.ML
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.util.concurrent.LinkedBlockingQueue
    11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    12   InputStream, OutputStream, BufferedOutputStream, IOException}
    13 
    14 import scala.actors.Actor
    15 import Actor._
    16 import scala.collection.mutable
    17 
    18 
    19 object Isabelle_Process
    20 {
    21   /* results */
    22 
    23   object Kind
    24   {
    25     val message_markup = Map(
    26       ('A' : Int) -> Markup.INIT,
    27       ('B' : Int) -> Markup.STATUS,
    28       ('C' : Int) -> Markup.REPORT,
    29       ('D' : Int) -> Markup.WRITELN,
    30       ('E' : Int) -> Markup.TRACING,
    31       ('F' : Int) -> Markup.WARNING,
    32       ('G' : Int) -> Markup.ERROR)
    33   }
    34 
    35   class Result(val message: XML.Elem)
    36   {
    37     def kind = message.markup.name
    38     def properties = message.markup.properties
    39     def body = message.body
    40 
    41     def is_init = kind == Markup.INIT
    42     def is_exit = kind == Markup.EXIT
    43     def is_stdout = kind == Markup.STDOUT
    44     def is_system = kind == Markup.SYSTEM
    45     def is_status = kind == Markup.STATUS
    46     def is_report = kind == Markup.REPORT
    47     def is_ready = is_status && body == List(XML.Elem(Markup.Ready, Nil))
    48 
    49     override def toString: String =
    50     {
    51       val res =
    52         if (is_status || is_report) message.body.map(_.toString).mkString
    53         else Pretty.string_of(message.body)
    54       if (properties.isEmpty)
    55         kind.toString + " [[" + res + "]]"
    56       else
    57         kind.toString + " " +
    58           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    59     }
    60   }
    61 }
    62 
    63 
    64 class Isabelle_Process(system: Isabelle_System, timeout: Int, receiver: Actor, args: String*)
    65 {
    66   import Isabelle_Process._
    67 
    68 
    69   /* demo constructor */
    70 
    71   def this(args: String*) =
    72     this(new Isabelle_System, 10000,
    73       actor { loop { react { case res => Console.println(res) } } }, args: _*)
    74 
    75 
    76   /* system log */
    77 
    78   private val system_results = new mutable.ListBuffer[String]
    79 
    80   private def system_result(text: String)
    81   {
    82     synchronized { system_results += text }
    83     receiver ! new Result(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))
    84   }
    85 
    86   def syslog(): List[String] = synchronized { system_results.toList }
    87 
    88 
    89   /* results */
    90 
    91   private val xml_cache = new XML.Cache(131071)
    92 
    93   private def put_result(kind: String, props: List[(String, String)], body: XML.Body)
    94   {
    95     if (kind == Markup.INIT) rm_fifos()
    96     val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
    97     xml_cache.cache_tree(msg)((message: XML.Tree) =>
    98       receiver ! new Result(message.asInstanceOf[XML.Elem]))
    99   }
   100 
   101   private def put_result(kind: String, text: String)
   102   {
   103     put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
   104   }
   105 
   106 
   107   /* input actors */
   108 
   109   private case class Input_Text(text: String)
   110   private case class Input_Chunks(chunks: List[Array[Byte]])
   111 
   112   private case object Close
   113   private def close(p: (Thread, Actor))
   114   {
   115     if (p != null && p._1.isAlive) {
   116       p._2 ! Close
   117       p._1.join
   118     }
   119   }
   120 
   121   @volatile private var standard_input: (Thread, Actor) = null
   122   @volatile private var command_input: (Thread, Actor) = null
   123 
   124 
   125   /** process manager **/
   126 
   127   private val in_fifo = system.mk_fifo()
   128   private val out_fifo = system.mk_fifo()
   129   private def rm_fifos() { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
   130 
   131   private val process =
   132     try {
   133       val cmdline =
   134         List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
   135       new system.Managed_Process(true, cmdline: _*)
   136     }
   137     catch { case e: IOException => rm_fifos(); throw(e) }
   138 
   139   private def terminate_process()
   140   {
   141     try { process.terminate }
   142     catch { case e: IOException => system_result("Failed to terminate Isabelle: " + e.getMessage) }
   143   }
   144 
   145   private val process_manager = Simple_Thread.fork("process_manager")
   146   {
   147     val (startup_failed, startup_output) =
   148     {
   149       val expired = System.currentTimeMillis() + timeout
   150       val result = new StringBuilder(100)
   151 
   152       var finished = false
   153       while (!finished && System.currentTimeMillis() <= expired) {
   154         while (!finished && process.stdout.ready) {
   155           val c = process.stdout.read
   156           if (c == 2) finished = true
   157           else result += c.toChar
   158         }
   159         Thread.sleep(10)
   160       }
   161       (!finished, result.toString)
   162     }
   163     system_result(startup_output)
   164 
   165     if (startup_failed) {
   166       put_result(Markup.EXIT, "127")
   167       process.stdin.close
   168       Thread.sleep(300)
   169       terminate_process()
   170     }
   171     else {
   172       // rendezvous
   173       val command_stream = system.fifo_output_stream(in_fifo)
   174       val message_stream = system.fifo_input_stream(out_fifo)
   175 
   176       standard_input = stdin_actor()
   177       val stdout = stdout_actor()
   178       command_input = input_actor(command_stream)
   179       val message = message_actor(message_stream)
   180 
   181       val rc = process.join
   182       system_result("Isabelle process terminated")
   183       for ((thread, _) <- List(standard_input, stdout, command_input, message)) thread.join
   184       system_result("process_manager terminated")
   185       put_result(Markup.EXIT, rc.toString)
   186     }
   187     rm_fifos()
   188   }
   189 
   190 
   191   /* management methods */
   192 
   193   def join() { process_manager.join() }
   194 
   195   def interrupt()
   196   {
   197     try { process.interrupt }
   198     catch { case e: IOException => system_result("Failed to interrupt Isabelle: " + e.getMessage) }
   199   }
   200 
   201   def terminate()
   202   {
   203     close()
   204     system_result("Terminating Isabelle process")
   205     terminate_process()
   206   }
   207 
   208 
   209 
   210   /** stream actors **/
   211 
   212   /* raw stdin */
   213 
   214   private def stdin_actor(): (Thread, Actor) =
   215   {
   216     val name = "standard_input"
   217     Simple_Thread.actor(name) {
   218       var finished = false
   219       while (!finished) {
   220         try {
   221           //{{{
   222           receive {
   223             case Input_Text(text) =>
   224               process.stdin.write(text)
   225               process.stdin.flush
   226             case Close =>
   227               process.stdin.close
   228               finished = true
   229             case bad => System.err.println(name + ": ignoring bad message " + bad)
   230           }
   231           //}}}
   232         }
   233         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   234       }
   235       system_result(name + " terminated")
   236     }
   237   }
   238 
   239 
   240   /* raw stdout */
   241 
   242   private def stdout_actor(): (Thread, Actor) =
   243   {
   244     val name = "standard_output"
   245     Simple_Thread.actor(name) {
   246       var result = new StringBuilder(100)
   247 
   248       var finished = false
   249       while (!finished) {
   250         try {
   251           //{{{
   252           var c = -1
   253           var done = false
   254           while (!done && (result.length == 0 || process.stdout.ready)) {
   255             c = process.stdout.read
   256             if (c >= 0) result.append(c.asInstanceOf[Char])
   257             else done = true
   258           }
   259           if (result.length > 0) {
   260             put_result(Markup.STDOUT, result.toString)
   261             result.length = 0
   262           }
   263           else {
   264             process.stdout.close
   265             finished = true
   266           }
   267           //}}}
   268         }
   269         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   270       }
   271       system_result(name + " terminated")
   272     }
   273   }
   274 
   275 
   276   /* command input */
   277 
   278   private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
   279   {
   280     val name = "command_input"
   281     Simple_Thread.actor(name) {
   282       val stream = new BufferedOutputStream(raw_stream)
   283       var finished = false
   284       while (!finished) {
   285         try {
   286           //{{{
   287           receive {
   288             case Input_Chunks(chunks) =>
   289               stream.write(Standard_System.string_bytes(
   290                   chunks.map(_.length).mkString("", ",", "\n")));
   291               chunks.foreach(stream.write(_));
   292               stream.flush
   293             case Close =>
   294               stream.close
   295               finished = true
   296             case bad => System.err.println(name + ": ignoring bad message " + bad)
   297           }
   298           //}}}
   299         }
   300         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   301       }
   302       system_result(name + " terminated")
   303     }
   304   }
   305 
   306 
   307   /* message output */
   308 
   309   private def message_actor(stream: InputStream): (Thread, Actor) =
   310   {
   311     class EOF extends Exception
   312     class Protocol_Error(msg: String) extends Exception(msg)
   313 
   314     val name = "message_output"
   315     Simple_Thread.actor(name) {
   316       val default_buffer = new Array[Byte](65536)
   317       var c = -1
   318 
   319       def read_chunk(): XML.Body =
   320       {
   321         //{{{
   322         // chunk size
   323         var n = 0
   324         c = stream.read
   325         if (c == -1) throw new EOF
   326         while (48 <= c && c <= 57) {
   327           n = 10 * n + (c - 48)
   328           c = stream.read
   329         }
   330         if (c != 10) throw new Protocol_Error("bad message chunk header")
   331 
   332         // chunk content
   333         val buf =
   334           if (n <= default_buffer.size) default_buffer
   335           else new Array[Byte](n)
   336 
   337         var i = 0
   338         var m = 0
   339         do {
   340           m = stream.read(buf, i, n - i)
   341           i += m
   342         } while (m > 0 && n > i)
   343 
   344         if (i != n) throw new Protocol_Error("bad message chunk content")
   345 
   346         YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
   347         //}}}
   348       }
   349 
   350       do {
   351         try {
   352           val header = read_chunk()
   353           val body = read_chunk()
   354           header match {
   355             case List(XML.Elem(Markup(name, props), Nil))
   356                 if name.size == 1 && Kind.message_markup.isDefinedAt(name(0)) =>
   357               put_result(Kind.message_markup(name(0)), props, body)
   358             case _ => throw new Protocol_Error("bad header: " + header.toString)
   359           }
   360         }
   361         catch {
   362           case e: IOException => system_result("Cannot read message:\n" + e.getMessage)
   363           case e: Protocol_Error => system_result("Malformed message:\n" + e.getMessage)
   364           case _: EOF =>
   365         }
   366       } while (c != -1)
   367       stream.close
   368 
   369       system_result(name + " terminated")
   370     }
   371   }
   372 
   373 
   374   /** main methods **/
   375 
   376   def input_raw(text: String): Unit = standard_input._2 ! Input_Text(text)
   377 
   378   def input_bytes(name: String, args: Array[Byte]*): Unit =
   379     command_input._2 ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
   380 
   381   def input(name: String, args: String*): Unit =
   382     input_bytes(name, args.map(Standard_System.string_bytes): _*)
   383 
   384   def close(): Unit = { close(command_input); close(standard_input) }
   385 }