src/Pure/System/isabelle_process.scala
author wenzelm
Sun Sep 19 17:12:34 2010 +0200 (2010-09-19 ago)
changeset 39525 72e949a0425b
parent 39524 59ebce09ce6e
child 39526 f1296795a8dc
permissions -rw-r--r--
simplified Isabelle_Process message kinds;
misc tuning and simplification;
     1 /*  Title:      Pure/System/isabelle_process.ML
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.util.concurrent.LinkedBlockingQueue
    11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    12   InputStream, OutputStream, BufferedOutputStream, IOException}
    13 
    14 import scala.actors.Actor
    15 import Actor._
    16 
    17 
    18 object Isabelle_Process
    19 {
    20   /* results */
    21 
    22   object Kind
    23   {
    24     val message_markup = Map(
    25       ('A' : Int) -> Markup.INIT,
    26       ('B' : Int) -> Markup.STATUS,
    27       ('C' : Int) -> Markup.REPORT,
    28       ('D' : Int) -> Markup.WRITELN,
    29       ('E' : Int) -> Markup.TRACING,
    30       ('F' : Int) -> Markup.WARNING,
    31       ('G' : Int) -> Markup.ERROR)
    32   }
    33 
    34   class Result(val message: XML.Elem)
    35   {
    36     def kind = message.markup.name
    37     def properties = message.markup.properties
    38     def body = message.body
    39 
    40     def is_init = kind == Markup.INIT
    41     def is_exit = kind == Markup.EXIT
    42     def is_stdout = kind == Markup.STDOUT
    43     def is_system = kind == Markup.SYSTEM
    44     def is_status = kind == Markup.STATUS
    45     def is_report = kind == Markup.REPORT
    46     def is_ready = is_status && body == List(XML.Elem(Markup.Ready, Nil))
    47 
    48     override def toString: String =
    49     {
    50       val res =
    51         if (is_status || is_report) message.body.map(_.toString).mkString
    52         else Pretty.string_of(message.body)
    53       if (properties.isEmpty)
    54         kind.toString + " [[" + res + "]]"
    55       else
    56         kind.toString + " " +
    57           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    58     }
    59   }
    60 }
    61 
    62 
    63 class Isabelle_Process(system: Isabelle_System, receiver: Actor, args: String*)
    64 {
    65   import Isabelle_Process._
    66 
    67 
    68   /* demo constructor */
    69 
    70   def this(args: String*) =
    71     this(new Isabelle_System,
    72       actor { loop { react { case res => Console.println(res) } } }, args: _*)
    73 
    74 
    75   /* process information */
    76 
    77   @volatile private var proc: Option[Process] = None
    78   @volatile private var pid: Option[String] = None
    79 
    80 
    81   /* results */
    82 
    83   private def system_result(text: String)
    84   {
    85     receiver ! new Result(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))
    86   }
    87 
    88 
    89   private val xml_cache = new XML.Cache(131071)
    90 
    91   private def put_result(kind: String, props: List[(String, String)], body: XML.Body)
    92   {
    93     if (pid.isEmpty && kind == Markup.INIT) {
    94       props.find(_._1 == Markup.PID).map(_._1) match {
    95         case None => system_result("Bad Isabelle process initialization: missing pid")
    96         case p => pid = p
    97       }
    98     }
    99 
   100     val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
   101     xml_cache.cache_tree(msg)((message: XML.Tree) =>
   102       receiver ! new Result(message.asInstanceOf[XML.Elem]))
   103   }
   104 
   105   private def put_result(kind: String, text: String)
   106   {
   107     put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
   108   }
   109 
   110 
   111   /* signals */
   112 
   113   def interrupt()
   114   {
   115     if (proc.isEmpty) system_result("Cannot interrupt Isabelle: no process")
   116     else
   117       pid match {
   118         case None => system_result("Cannot interrupt Isabelle: unknowd pid")
   119         case Some(i) =>
   120           try {
   121             if (system.execute(true, "kill", "-INT", i).waitFor == 0)
   122               system_result("Interrupt Isabelle")
   123             else
   124               system_result("Cannot interrupt Isabelle: kill command failed")
   125           }
   126           catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
   127       }
   128   }
   129 
   130   def kill()
   131   {
   132     proc match {
   133       case None => system_result("Cannot kill Isabelle: no process")
   134       case Some(p) =>
   135         close()
   136         Thread.sleep(500)  // FIXME !?
   137         system_result("Kill Isabelle")
   138         p.destroy
   139         proc = None
   140         pid = None
   141     }
   142   }
   143 
   144 
   145 
   146   /** stream actors **/
   147 
   148   private case class Input_Text(text: String)
   149   private case class Input_Chunks(chunks: List[Array[Byte]])
   150   private case object Close
   151 
   152 
   153   /* raw stdin */
   154 
   155   private def stdin_actor(name: String, stream: OutputStream): Actor =
   156     Simple_Thread.actor(name) {
   157       val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset))
   158       var finished = false
   159       while (!finished) {
   160         try {
   161           //{{{
   162           receive {
   163             case Input_Text(text) =>
   164               writer.write(text)
   165               writer.flush
   166             case Close =>
   167               writer.close
   168               finished = true
   169             case bad => System.err.println(name + ": ignoring bad message " + bad)
   170           }
   171           //}}}
   172         }
   173         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   174       }
   175       system_result(name + " terminated")
   176     }
   177 
   178 
   179   /* raw stdout */
   180 
   181   private def stdout_actor(name: String, stream: InputStream): Actor =
   182     Simple_Thread.actor(name) {
   183       val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset))
   184       var result = new StringBuilder(100)
   185 
   186       var finished = false
   187       while (!finished) {
   188         try {
   189           //{{{
   190           var c = -1
   191           var done = false
   192           while (!done && (result.length == 0 || reader.ready)) {
   193             c = reader.read
   194             if (c >= 0) result.append(c.asInstanceOf[Char])
   195             else done = true
   196           }
   197           if (result.length > 0) {
   198             put_result(Markup.STDOUT, result.toString)
   199             result.length = 0
   200           }
   201           else {
   202             reader.close
   203             finished = true
   204             close()
   205           }
   206           //}}}
   207         }
   208         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   209       }
   210       system_result(name + " terminated")
   211     }
   212 
   213 
   214   /* command input */
   215 
   216   private def input_actor(name: String, fifo: String): Actor =
   217     Simple_Thread.actor(name) {
   218       val stream = new BufferedOutputStream(system.fifo_output_stream(fifo))  // FIXME potentially blocking forever
   219       var finished = false
   220       while (!finished) {
   221         try {
   222           //{{{
   223           receive {
   224             case Input_Chunks(chunks) =>
   225               stream.write(Standard_System.string_bytes(
   226                   chunks.map(_.length).mkString("", ",", "\n")));
   227               chunks.foreach(stream.write(_));
   228               stream.flush
   229             case Close =>
   230               stream.close
   231               finished = true
   232             case bad => System.err.println(name + ": ignoring bad message " + bad)
   233           }
   234           //}}}
   235         }
   236         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   237       }
   238       system_result(name + " terminated")
   239     }
   240 
   241 
   242   /* message output */
   243 
   244   private class Protocol_Error(msg: String) extends Exception(msg)
   245 
   246   private def message_actor(name: String, fifo: String): Actor =
   247     Simple_Thread.actor(name) {
   248       val stream = system.fifo_input_stream(fifo)  // FIXME potentially blocking forever
   249       val default_buffer = new Array[Byte](65536)
   250       var c = -1
   251 
   252       def read_chunk(): XML.Body =
   253       {
   254         //{{{
   255         // chunk size
   256         var n = 0
   257         c = stream.read
   258         while (48 <= c && c <= 57) {
   259           n = 10 * n + (c - 48)
   260           c = stream.read
   261         }
   262         if (c != 10) throw new Protocol_Error("bad message chunk header")
   263 
   264         // chunk content
   265         val buf =
   266           if (n <= default_buffer.size) default_buffer
   267           else new Array[Byte](n)
   268 
   269         var i = 0
   270         var m = 0
   271         do {
   272           m = stream.read(buf, i, n - i)
   273           i += m
   274         } while (m > 0 && n > i)
   275 
   276         if (i != n) throw new Protocol_Error("bad message chunk content")
   277 
   278         YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
   279         //}}}
   280       }
   281 
   282       do {
   283         try {
   284           val header = read_chunk()
   285           val body = read_chunk()
   286           header match {
   287             case List(XML.Elem(Markup(name, props), Nil))
   288                 if name.size == 1 && Kind.message_markup.isDefinedAt(name(0)) =>
   289               put_result(Kind.message_markup(name(0)), props, body)
   290             case _ => throw new Protocol_Error("bad header: " + header.toString)
   291           }
   292         }
   293         catch {
   294           case e: IOException => system_result("Cannot read message:\n" + e.getMessage)
   295           case e: Protocol_Error => system_result("Malformed message:\n" + e.getMessage)
   296         }
   297       } while (c != -1)
   298       stream.close
   299       close()
   300 
   301       system_result(name + " terminated")
   302     }
   303 
   304 
   305 
   306   /** init **/
   307 
   308   /* exec process */
   309 
   310   private val in_fifo = system.mk_fifo()
   311   private val out_fifo = system.mk_fifo()
   312   private def rm_fifos() { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
   313 
   314   try {
   315     val cmdline =
   316       List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
   317     proc = Some(system.execute(true, cmdline: _*))
   318   }
   319   catch { case e: IOException => rm_fifos(); throw(e) }
   320 
   321 
   322   /* I/O actors */
   323 
   324   private val command_input = input_actor("command_input", in_fifo)
   325   message_actor("message_output", out_fifo)
   326 
   327   private val standard_input = stdin_actor("standard_input", proc.get.getOutputStream)
   328   stdout_actor("standard_output", proc.get.getInputStream)
   329 
   330 
   331   /* exit process */
   332 
   333   Simple_Thread.actor("process_exit") {
   334     proc match {
   335       case None =>
   336       case Some(p) =>
   337         val rc = p.waitFor()
   338         Thread.sleep(300)  // FIXME property!?
   339         system_result("Isabelle process terminated")
   340         put_result(Markup.EXIT, rc.toString)
   341     }
   342     rm_fifos()
   343   }
   344 
   345 
   346 
   347   /** main methods **/
   348 
   349   def input_raw(text: String): Unit = standard_input ! Input_Text(text)
   350 
   351   def input_bytes(name: String, args: Array[Byte]*): Unit =
   352     command_input ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
   353 
   354   def input(name: String, args: String*): Unit =
   355     input_bytes(name, args.map(Standard_System.string_bytes): _*)
   356 
   357   def close(): Unit = { standard_input ! Close; command_input ! Close }
   358 }