src/Pure/System/isabelle_process.scala
author wenzelm
Fri Aug 13 21:33:13 2010 +0200 (2010-08-13 ago)
changeset 38372 e753f71b6b34
parent 38270 71bb3c273dd1
child 38445 ba9ea6b9b75c
permissions -rw-r--r--
added Isabelle_Process.input_bytes, which avoids the somewhat slow Standard_System.string_bytes (just in case someone wants to stream raw data at 250MB/s);
     1 /*  Title:      Pure/System/isabelle_process.ML
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.util.concurrent.LinkedBlockingQueue
    11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    12   InputStream, OutputStream, BufferedOutputStream, IOException}
    13 
    14 import scala.actors.Actor
    15 import Actor._
    16 
    17 
    18 object Isabelle_Process
    19 {
    20   /* results */
    21 
    22   object Kind {
    23     // message markup
    24     val markup = Map(
    25       ('A' : Int) -> Markup.INIT,
    26       ('B' : Int) -> Markup.STATUS,
    27       ('C' : Int) -> Markup.REPORT,
    28       ('D' : Int) -> Markup.WRITELN,
    29       ('E' : Int) -> Markup.TRACING,
    30       ('F' : Int) -> Markup.WARNING,
    31       ('G' : Int) -> Markup.ERROR,
    32       ('H' : Int) -> Markup.DEBUG)
    33     def is_raw(kind: String) =
    34       kind == Markup.STDOUT
    35     def is_control(kind: String) =
    36       kind == Markup.SYSTEM ||
    37       kind == Markup.SIGNAL ||
    38       kind == Markup.EXIT
    39     def is_system(kind: String) =
    40       kind == Markup.SYSTEM ||
    41       kind == Markup.INPUT ||
    42       kind == Markup.STDIN ||
    43       kind == Markup.SIGNAL ||
    44       kind == Markup.EXIT ||
    45       kind == Markup.STATUS
    46   }
    47 
    48   class Result(val message: XML.Elem)
    49   {
    50     def kind = message.markup.name
    51     def properties = message.markup.properties
    52     def body = message.body
    53 
    54     def is_raw = Kind.is_raw(kind)
    55     def is_control = Kind.is_control(kind)
    56     def is_system = Kind.is_system(kind)
    57     def is_status = kind == Markup.STATUS
    58     def is_report = kind == Markup.REPORT
    59     def is_ready = is_status && body == List(XML.Elem(Markup.Ready, Nil))
    60 
    61     override def toString: String =
    62     {
    63       val res =
    64         if (is_status || is_report) message.body.map(_.toString).mkString
    65         else Pretty.string_of(message.body)
    66       if (properties.isEmpty)
    67         kind.toString + " [[" + res + "]]"
    68       else
    69         kind.toString + " " +
    70           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    71     }
    72 
    73     def cache(c: XML.Cache): Result = new Result(c.cache_tree(message).asInstanceOf[XML.Elem])
    74   }
    75 }
    76 
    77 
    78 class Isabelle_Process(system: Isabelle_System, receiver: Actor, args: String*)
    79 {
    80   import Isabelle_Process._
    81 
    82 
    83   /* demo constructor */
    84 
    85   def this(args: String*) =
    86     this(new Isabelle_System,
    87       actor { loop { react { case res => Console.println(res) } } }, args: _*)
    88 
    89 
    90   /* process information */
    91 
    92   @volatile private var proc: Option[Process] = None
    93   @volatile private var pid: Option[String] = None
    94 
    95 
    96   /* results */
    97 
    98   private def put_result(kind: String, props: List[(String, String)], body: List[XML.Tree])
    99   {
   100     if (kind == Markup.INIT) {
   101       for ((Markup.PID, p) <- props) pid = Some(p)
   102     }
   103     receiver ! new Result(XML.Elem(Markup(kind, props), body))
   104   }
   105 
   106   private def put_result(kind: String, text: String)
   107   {
   108     put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
   109   }
   110 
   111 
   112   /* signals */
   113 
   114   def interrupt()
   115   {
   116     if (proc.isEmpty) put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: no process")
   117     else
   118       pid match {
   119         case None => put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: unknowd pid")
   120         case Some(i) =>
   121           try {
   122             if (system.execute(true, "kill", "-INT", i).waitFor == 0)
   123               put_result(Markup.SIGNAL, "INT")
   124             else
   125               put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: kill command failed")
   126           }
   127           catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
   128       }
   129   }
   130 
   131   def kill()
   132   {
   133     proc match {
   134       case None => put_result(Markup.SYSTEM, "Cannot kill Isabelle: no process")
   135       case Some(p) =>
   136         close()
   137         Thread.sleep(500)  // FIXME !?
   138         put_result(Markup.SIGNAL, "KILL")
   139         p.destroy
   140         proc = None
   141         pid = None
   142     }
   143   }
   144 
   145 
   146 
   147   /** stream actors **/
   148 
   149   case class Input_Text(text: String)
   150   case class Input_Chunks(chunks: List[Array[Byte]])
   151   case object Close
   152 
   153 
   154   /* raw stdin */
   155 
   156   private def stdin_actor(name: String, stream: OutputStream): Actor =
   157     Library.thread_actor(name) {
   158       val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset))
   159       var finished = false
   160       while (!finished) {
   161         try {
   162           //{{{
   163           receive {
   164             case Input_Text(text) =>
   165               // FIXME echo input?!
   166               writer.write(text)
   167               writer.flush
   168             case Close =>
   169               writer.close
   170               finished = true
   171             case bad => System.err.println(name + ": ignoring bad message " + bad)
   172           }
   173           //}}}
   174         }
   175         catch {
   176           case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
   177         }
   178       }
   179       put_result(Markup.SYSTEM, name + " terminated")
   180     }
   181 
   182 
   183   /* raw stdout */
   184 
   185   private def stdout_actor(name: String, stream: InputStream): Actor =
   186     Library.thread_actor(name) {
   187       val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset))
   188       var result = new StringBuilder(100)
   189 
   190       var finished = false
   191       while (!finished) {
   192         try {
   193           //{{{
   194           var c = -1
   195           var done = false
   196           while (!done && (result.length == 0 || reader.ready)) {
   197             c = reader.read
   198             if (c >= 0) result.append(c.asInstanceOf[Char])
   199             else done = true
   200           }
   201           if (result.length > 0) {
   202             put_result(Markup.STDOUT, result.toString)
   203             result.length = 0
   204           }
   205           else {
   206             reader.close
   207             finished = true
   208             close()
   209           }
   210           //}}}
   211         }
   212         catch {
   213           case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
   214         }
   215       }
   216       put_result(Markup.SYSTEM, name + " terminated")
   217     }
   218 
   219 
   220   /* command input */
   221 
   222   private def input_actor(name: String, raw_stream: OutputStream): Actor =
   223     Library.thread_actor(name) {
   224       val stream = new BufferedOutputStream(raw_stream)
   225       var finished = false
   226       while (!finished) {
   227         try {
   228           //{{{
   229           receive {
   230             case Input_Chunks(chunks) =>
   231               stream.write(Standard_System.string_bytes(
   232                   chunks.map(_.length).mkString("", ",", "\n")));
   233               chunks.foreach(stream.write(_));
   234               stream.flush
   235             case Close =>
   236               stream.close
   237               finished = true
   238             case bad => System.err.println(name + ": ignoring bad message " + bad)
   239           }
   240           //}}}
   241         }
   242         catch {
   243           case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
   244         }
   245       }
   246       put_result(Markup.SYSTEM, name + " terminated")
   247     }
   248 
   249 
   250   /* message output */
   251 
   252   private class Protocol_Error(msg: String) extends Exception(msg)
   253 
   254   private def message_actor(name: String, stream: InputStream): Actor =
   255     Library.thread_actor(name) {
   256       val default_buffer = new Array[Byte](65536)
   257       var c = -1
   258 
   259       def read_chunk(): List[XML.Tree] =
   260       {
   261         //{{{
   262         // chunk size
   263         var n = 0
   264         c = stream.read
   265         while (48 <= c && c <= 57) {
   266           n = 10 * n + (c - 48)
   267           c = stream.read
   268         }
   269         if (c != 10) throw new Protocol_Error("bad message chunk header")
   270 
   271         // chunk content
   272         val buf =
   273           if (n <= default_buffer.size) default_buffer
   274           else new Array[Byte](n)
   275 
   276         var i = 0
   277         var m = 0
   278         do {
   279           m = stream.read(buf, i, n - i)
   280           i += m
   281         } while (m > 0 && n > i)
   282 
   283         if (i != n) throw new Protocol_Error("bad message chunk content")
   284 
   285         YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
   286         //}}}
   287       }
   288 
   289       do {
   290         try {
   291           //{{{
   292           c = stream.read
   293           var non_sync = 0
   294           while (c >= 0 && c != 2) {
   295             non_sync += 1
   296             c = stream.read
   297           }
   298           if (non_sync > 0)
   299             throw new Protocol_Error("lost synchronization -- skipping " + non_sync + " bytes")
   300           if (c == 2) {
   301             val header = read_chunk()
   302             val body = read_chunk()
   303             header match {
   304               case List(XML.Elem(Markup(name, props), Nil))
   305                   if name.size == 1 && Kind.markup.isDefinedAt(name(0)) =>
   306                 put_result(Kind.markup(name(0)), props, body)
   307               case _ => throw new Protocol_Error("bad header: " + header.toString)
   308             }
   309           }
   310           //}}}
   311         }
   312         catch {
   313           case e: IOException =>
   314             put_result(Markup.SYSTEM, "Cannot read message:\n" + e.getMessage)
   315           case e: Protocol_Error =>
   316             put_result(Markup.SYSTEM, "Malformed message:\n" + e.getMessage)
   317         }
   318       } while (c != -1)
   319       stream.close
   320       close()
   321 
   322       put_result(Markup.SYSTEM, name + " terminated")
   323     }
   324 
   325 
   326 
   327   /** init **/
   328 
   329   /* exec process */
   330 
   331   private val in_fifo = system.mk_fifo()
   332   private val out_fifo = system.mk_fifo()
   333   private def rm_fifos() = { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
   334 
   335   try {
   336     val cmdline =
   337       List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
   338     proc = Some(system.execute(true, cmdline: _*))
   339   }
   340   catch {
   341     case e: IOException =>
   342       rm_fifos()
   343       error("Failed to execute Isabelle process: " + e.getMessage)
   344   }
   345 
   346 
   347   /* I/O actors */
   348 
   349   private val standard_input = stdin_actor("standard_input", proc.get.getOutputStream)
   350   stdout_actor("standard_output", proc.get.getInputStream)
   351 
   352   private val command_input = input_actor("command_input", system.fifo_output_stream(in_fifo))
   353   message_actor("message_output", system.fifo_input_stream(out_fifo))
   354 
   355 
   356   /* exit process */
   357 
   358   Library.thread_actor("process_exit") {
   359     proc match {
   360       case None =>
   361       case Some(p) =>
   362         val rc = p.waitFor()
   363         Thread.sleep(300)  // FIXME property!?
   364         put_result(Markup.SYSTEM, "process_exit terminated")
   365         put_result(Markup.EXIT, rc.toString)
   366     }
   367     rm_fifos()
   368   }
   369 
   370 
   371 
   372   /** main methods **/
   373 
   374   def input_raw(text: String): Unit = standard_input ! Input_Text(text)
   375 
   376   def input_bytes(name: String, args: Array[Byte]*): Unit =
   377     command_input ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
   378 
   379   def input(name: String, args: String*): Unit =
   380     input_bytes(name, args.map(Standard_System.string_bytes): _*)
   381 
   382   def close(): Unit = command_input ! Close
   383 }