src/Pure/System/isabelle_process.scala
author wenzelm
Tue Aug 10 12:29:11 2010 +0200 (2010-08-10 ago)
changeset 38259 2b61c5e27399
parent 38253 3d4e521014f7
child 38262 bb2df73fab2c
permissions -rw-r--r--
distinguish proper Isabelle_Process INPUT vs. raw STDIN, tuned corresponding method names;
asynchronous Isabelle_Process.init -- raw ML toplevel stays active;
simplified Isabelle_Process using actors;
misc tuning;
     1 /*  Title:      Pure/System/isabelle_process.ML
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.util.concurrent.LinkedBlockingQueue
    11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    12   InputStream, OutputStream, IOException}
    13 
    14 import scala.actors.Actor
    15 import Actor._
    16 
    17 
    18 object Isabelle_Process
    19 {
    20   /* results */
    21 
    22   object Kind {
    23     // message markup
    24     val markup = Map(
    25       ('A' : Int) -> Markup.INIT,
    26       ('B' : Int) -> Markup.STATUS,
    27       ('C' : Int) -> Markup.REPORT,
    28       ('D' : Int) -> Markup.WRITELN,
    29       ('E' : Int) -> Markup.TRACING,
    30       ('F' : Int) -> Markup.WARNING,
    31       ('G' : Int) -> Markup.ERROR,
    32       ('H' : Int) -> Markup.DEBUG)
    33     def is_raw(kind: String) =
    34       kind == Markup.STDOUT
    35     def is_control(kind: String) =
    36       kind == Markup.SYSTEM ||
    37       kind == Markup.SIGNAL ||
    38       kind == Markup.EXIT
    39     def is_system(kind: String) =
    40       kind == Markup.SYSTEM ||
    41       kind == Markup.INPUT ||
    42       kind == Markup.STDIN ||
    43       kind == Markup.SIGNAL ||
    44       kind == Markup.EXIT ||
    45       kind == Markup.STATUS
    46   }
    47 
    48   class Result(val message: XML.Elem)
    49   {
    50     def kind = message.markup.name
    51     def properties = message.markup.properties
    52     def body = message.body
    53 
    54     def is_raw = Kind.is_raw(kind)
    55     def is_control = Kind.is_control(kind)
    56     def is_system = Kind.is_system(kind)
    57     def is_status = kind == Markup.STATUS
    58     def is_report = kind == Markup.REPORT
    59     def is_ready = is_status && body == List(XML.Elem(Markup.Ready, Nil))
    60 
    61     override def toString: String =
    62     {
    63       val res =
    64         if (is_status || is_report) message.body.map(_.toString).mkString
    65         else Pretty.string_of(message.body)
    66       if (properties.isEmpty)
    67         kind.toString + " [[" + res + "]]"
    68       else
    69         kind.toString + " " +
    70           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    71     }
    72 
    73     def cache(c: XML.Cache): Result = new Result(c.cache_tree(message).asInstanceOf[XML.Elem])
    74   }
    75 }
    76 
    77 
    78 class Isabelle_Process(system: Isabelle_System, receiver: Actor, args: String*)
    79 {
    80   import Isabelle_Process._
    81 
    82 
    83   /* demo constructor */
    84 
    85   def this(args: String*) =
    86     this(new Isabelle_System,
    87       actor { loop { react { case res => Console.println(res) } } }, args: _*)
    88 
    89 
    90   /* process information */
    91 
    92   @volatile private var proc: Process = null
    93   @volatile private var closing = false
    94   @volatile private var pid: String = null
    95 
    96 
    97   /* results */
    98 
    99   private def put_result(kind: String, props: List[(String, String)], body: List[XML.Tree])
   100   {
   101     if (kind == Markup.INIT) {
   102       for ((Markup.PID, p) <- props) pid = p
   103     }
   104     receiver ! new Result(XML.Elem(Markup(kind, props), body))
   105   }
   106 
   107   private def put_result(kind: String, text: String)
   108   {
   109     put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
   110   }
   111 
   112 
   113   /* signals */
   114 
   115   def interrupt() = synchronized {  // FIXME avoid synchronized
   116     if (proc == null) error("Cannot interrupt Isabelle: no process")
   117     if (pid == null) put_result(Markup.SYSTEM, "Cannot interrupt: unknown pid")
   118     else {
   119       try {
   120         if (system.execute(true, "kill", "-INT", pid).waitFor == 0)
   121           put_result(Markup.SIGNAL, "INT")
   122         else
   123           put_result(Markup.SYSTEM, "Cannot interrupt: kill command failed")
   124       }
   125       catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
   126     }
   127   }
   128 
   129   def kill() = synchronized {  // FIXME avoid synchronized
   130     if (proc == 0) error("Cannot kill Isabelle: no process")
   131     else {
   132       try_close()
   133       Thread.sleep(500)  // FIXME property!?
   134       put_result(Markup.SIGNAL, "KILL")
   135       proc.destroy
   136       proc = null
   137       pid = null
   138     }
   139   }
   140 
   141 
   142 
   143   /** stream actors **/
   144 
   145   /* input */
   146 
   147   case class Input(cmd: String)
   148   case object Close
   149 
   150   private def input_actor(name: String, kind: String, stream: => OutputStream): Actor =
   151     Library.thread_actor(name) {
   152       val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset))
   153       var finished = false
   154       while (!finished) {
   155         try {
   156           //{{{
   157           receive {
   158             case Input(text) =>
   159               put_result(kind, text)
   160               writer.write(text)
   161               writer.flush
   162             case Close =>
   163               writer.close
   164               finished = true
   165             case bad => System.err.println(name + ": ignoring bad message " + bad)
   166           }
   167           //}}}
   168         }
   169         catch {
   170           case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
   171         }
   172       }
   173       put_result(Markup.SYSTEM, name + " terminated")
   174     }
   175 
   176 
   177   /* raw output */
   178 
   179   private def output_actor(name: String, kind: String, stream: => InputStream): Actor =
   180     Library.thread_actor(name) {
   181       val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset))
   182       var result = new StringBuilder(100)
   183 
   184       var finished = false
   185       while (!finished) {
   186         try {
   187           //{{{
   188           var c = -1
   189           var done = false
   190           while (!done && (result.length == 0 || reader.ready)) {
   191             c = reader.read
   192             if (c >= 0) result.append(c.asInstanceOf[Char])
   193             else done = true
   194           }
   195           if (result.length > 0) {
   196             put_result(kind, result.toString)
   197             result.length = 0
   198           }
   199           else {
   200             reader.close
   201             finished = true
   202             try_close()
   203           }
   204           //}}}
   205         }
   206         catch {
   207           case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
   208         }
   209       }
   210       put_result(Markup.SYSTEM, name + " terminated")
   211     }
   212 
   213 
   214   /* message output */
   215 
   216   private class Protocol_Error(msg: String) extends Exception(msg)
   217 
   218   private def message_actor(name: String, stream: InputStream): Actor =
   219     Library.thread_actor(name) {
   220       val default_buffer = new Array[Byte](65536)
   221       var c = -1
   222 
   223       def read_chunk(): List[XML.Tree] =
   224       {
   225         //{{{
   226         // chunk size
   227         var n = 0
   228         c = stream.read
   229         while (48 <= c && c <= 57) {
   230           n = 10 * n + (c - 48)
   231           c = stream.read
   232         }
   233         if (c != 10) throw new Protocol_Error("bad message chunk header")
   234 
   235         // chunk content
   236         val buf =
   237           if (n <= default_buffer.size) default_buffer
   238           else new Array[Byte](n)
   239 
   240         var i = 0
   241         var m = 0
   242         do {
   243           m = stream.read(buf, i, n - i)
   244           i += m
   245         } while (m > 0 && n > i)
   246 
   247         if (i != n) throw new Protocol_Error("bad message chunk content")
   248 
   249         YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
   250         //}}}
   251       }
   252 
   253       do {
   254         try {
   255           //{{{
   256           c = stream.read
   257           var non_sync = 0
   258           while (c >= 0 && c != 2) {
   259             non_sync += 1
   260             c = stream.read
   261           }
   262           if (non_sync > 0)
   263             throw new Protocol_Error("lost synchronization -- skipping " + non_sync + " bytes")
   264           if (c == 2) {
   265             val header = read_chunk()
   266             val body = read_chunk()
   267             header match {
   268               case List(XML.Elem(Markup(name, props), Nil))
   269                   if name.size == 1 && Kind.markup.isDefinedAt(name(0)) =>
   270                 put_result(Kind.markup(name(0)), props, body)
   271               case _ => throw new Protocol_Error("bad header: " + header.toString)
   272             }
   273           }
   274           //}}}
   275         }
   276         catch {
   277           case e: IOException =>
   278             put_result(Markup.SYSTEM, "Cannot read message:\n" + e.getMessage)
   279           case e: Protocol_Error =>
   280             put_result(Markup.SYSTEM, "Malformed message:\n" + e.getMessage)
   281         }
   282       } while (c != -1)
   283       stream.close
   284       try_close()
   285 
   286       put_result(Markup.SYSTEM, name + " terminated")
   287     }
   288 
   289 
   290 
   291   /** init **/
   292 
   293   /* exec process */
   294 
   295   private val in_fifo = system.mk_fifo()
   296   private val out_fifo = system.mk_fifo()
   297   private def rm_fifos() = { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
   298 
   299   try {
   300     val cmdline =
   301       List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
   302     proc = system.execute(true, cmdline: _*)
   303   }
   304   catch {
   305     case e: IOException =>
   306       rm_fifos()
   307       error("Failed to execute Isabelle process: " + e.getMessage)
   308   }
   309 
   310 
   311   /* exit process */
   312 
   313   Library.thread_actor("process_exit") {
   314     val rc = proc.waitFor()
   315     Thread.sleep(300)  // FIXME property!?
   316     put_result(Markup.SYSTEM, "process_exit terminated")
   317     put_result(Markup.EXIT, rc.toString)
   318     rm_fifos()
   319   }
   320 
   321 
   322   /* I/O actors */
   323 
   324   private val standard_input =
   325     input_actor("standard_input", Markup.STDIN, proc.getOutputStream)
   326 
   327   private val command_input =
   328     input_actor("command_input", Markup.INPUT, system.fifo_output_stream(in_fifo))
   329 
   330   output_actor("standard_output", Markup.STDOUT, proc.getInputStream)
   331   message_actor("message_output", system.fifo_input_stream(out_fifo))
   332 
   333 
   334 
   335   /** main methods **/
   336 
   337   def input_raw(text: String) = standard_input ! Input(text)
   338 
   339   def input(text: String) = synchronized {  // FIXME avoid synchronized
   340     if (proc == null) error("Cannot output to Isabelle: no process")
   341     if (closing) error("Cannot output to Isabelle: already closing")
   342     command_input ! Input(" \\<^sync>\n; " + text + " \\<^sync>;\n")
   343   }
   344 
   345   def command(text: String) = input("Isabelle.command " + Isabelle_Syntax.encode_string(text))
   346 
   347   def command(props: List[(String, String)], text: String) =
   348     input("Isabelle.command " + Isabelle_Syntax.encode_properties(props) + " " +
   349       Isabelle_Syntax.encode_string(text))
   350 
   351   def ML_val(text: String) = input("ML_val " + Isabelle_Syntax.encode_string(text))
   352   def ML_command(text: String) = input("ML_command " + Isabelle_Syntax.encode_string(text))
   353 
   354   def close() = synchronized {    // FIXME avoid synchronized
   355     command_input ! Close
   356     closing = true
   357   }
   358 
   359   def try_close() = synchronized {
   360     if (proc != null && !closing) {
   361       try { close() }
   362       catch { case _: RuntimeException => }
   363     }
   364   }
   365 }