src/Pure/System/isabelle_process.scala
author wenzelm
Fri Sep 17 15:51:11 2010 +0200 (2010-09-17 ago)
changeset 39439 1c294d150ded
parent 38636 b7647ca7de5a
child 39513 fce2202892c4
permissions -rw-r--r--
eliminated markup "location" in favour of more explicit "no_report", which is actually deleted from messages;
     1 /*  Title:      Pure/System/isabelle_process.ML
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.util.concurrent.LinkedBlockingQueue
    11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    12   InputStream, OutputStream, BufferedOutputStream, IOException}
    13 
    14 import scala.actors.Actor
    15 import Actor._
    16 
    17 
    18 object Isabelle_Process
    19 {
    20   /* results */
    21 
    22   object Kind {
    23     // message markup
    24     val markup = Map(
    25       ('A' : Int) -> Markup.INIT,
    26       ('B' : Int) -> Markup.STATUS,
    27       ('C' : Int) -> Markup.REPORT,
    28       ('D' : Int) -> Markup.WRITELN,
    29       ('E' : Int) -> Markup.TRACING,
    30       ('F' : Int) -> Markup.WARNING,
    31       ('G' : Int) -> Markup.ERROR,
    32       ('H' : Int) -> Markup.DEBUG)
    33     def is_raw(kind: String) =
    34       kind == Markup.STDOUT
    35     def is_control(kind: String) =
    36       kind == Markup.SYSTEM ||
    37       kind == Markup.SIGNAL ||
    38       kind == Markup.EXIT
    39     def is_system(kind: String) =
    40       kind == Markup.SYSTEM ||
    41       kind == Markup.INPUT ||
    42       kind == Markup.STDIN ||
    43       kind == Markup.SIGNAL ||
    44       kind == Markup.EXIT ||
    45       kind == Markup.STATUS
    46   }
    47 
    48   class Result(val message: XML.Elem)
    49   {
    50     def kind = message.markup.name
    51     def properties = message.markup.properties
    52     def body = message.body
    53 
    54     def is_raw = Kind.is_raw(kind)
    55     def is_control = Kind.is_control(kind)
    56     def is_system = Kind.is_system(kind)
    57     def is_status = kind == Markup.STATUS
    58     def is_report = kind == Markup.REPORT
    59     def is_ready = is_status && body == List(XML.Elem(Markup.Ready, Nil))
    60 
    61     override def toString: String =
    62     {
    63       val res =
    64         if (is_status || is_report) message.body.map(_.toString).mkString
    65         else Pretty.string_of(message.body)
    66       if (properties.isEmpty)
    67         kind.toString + " [[" + res + "]]"
    68       else
    69         kind.toString + " " +
    70           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    71     }
    72   }
    73 }
    74 
    75 
    76 class Isabelle_Process(system: Isabelle_System, receiver: Actor, args: String*)
    77 {
    78   import Isabelle_Process._
    79 
    80 
    81   /* demo constructor */
    82 
    83   def this(args: String*) =
    84     this(new Isabelle_System,
    85       actor { loop { react { case res => Console.println(res) } } }, args: _*)
    86 
    87 
    88   /* process information */
    89 
    90   @volatile private var proc: Option[Process] = None
    91   @volatile private var pid: Option[String] = None
    92 
    93 
    94   /* results */
    95 
    96   private val xml_cache = new XML.Cache(131071)
    97 
    98   private def put_result(kind: String, props: List[(String, String)], body: XML.Body)
    99   {
   100     if (pid.isEmpty && kind == Markup.INIT)
   101       pid = props.find(_._1 == Markup.PID).map(_._1)
   102 
   103     val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
   104     xml_cache.cache_tree(msg)((message: XML.Tree) =>
   105       receiver ! new Result(message.asInstanceOf[XML.Elem]))
   106   }
   107 
   108   private def put_result(kind: String, text: String)
   109   {
   110     put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
   111   }
   112 
   113 
   114   /* signals */
   115 
   116   def interrupt()
   117   {
   118     if (proc.isEmpty) put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: no process")
   119     else
   120       pid match {
   121         case None => put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: unknowd pid")
   122         case Some(i) =>
   123           try {
   124             if (system.execute(true, "kill", "-INT", i).waitFor == 0)
   125               put_result(Markup.SIGNAL, "INT")
   126             else
   127               put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: kill command failed")
   128           }
   129           catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
   130       }
   131   }
   132 
   133   def kill()
   134   {
   135     proc match {
   136       case None => put_result(Markup.SYSTEM, "Cannot kill Isabelle: no process")
   137       case Some(p) =>
   138         close()
   139         Thread.sleep(500)  // FIXME !?
   140         put_result(Markup.SIGNAL, "KILL")
   141         p.destroy
   142         proc = None
   143         pid = None
   144     }
   145   }
   146 
   147 
   148 
   149   /** stream actors **/
   150 
   151   case class Input_Text(text: String)
   152   case class Input_Chunks(chunks: List[Array[Byte]])
   153   case object Close
   154 
   155 
   156   /* raw stdin */
   157 
   158   private def stdin_actor(name: String, stream: OutputStream): Actor =
   159     Simple_Thread.actor(name) {
   160       val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset))
   161       var finished = false
   162       while (!finished) {
   163         try {
   164           //{{{
   165           receive {
   166             case Input_Text(text) =>
   167               // FIXME echo input?!
   168               writer.write(text)
   169               writer.flush
   170             case Close =>
   171               writer.close
   172               finished = true
   173             case bad => System.err.println(name + ": ignoring bad message " + bad)
   174           }
   175           //}}}
   176         }
   177         catch {
   178           case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
   179         }
   180       }
   181       put_result(Markup.SYSTEM, name + " terminated")
   182     }
   183 
   184 
   185   /* raw stdout */
   186 
   187   private def stdout_actor(name: String, stream: InputStream): Actor =
   188     Simple_Thread.actor(name) {
   189       val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset))
   190       var result = new StringBuilder(100)
   191 
   192       var finished = false
   193       while (!finished) {
   194         try {
   195           //{{{
   196           var c = -1
   197           var done = false
   198           while (!done && (result.length == 0 || reader.ready)) {
   199             c = reader.read
   200             if (c >= 0) result.append(c.asInstanceOf[Char])
   201             else done = true
   202           }
   203           if (result.length > 0) {
   204             put_result(Markup.STDOUT, result.toString)
   205             result.length = 0
   206           }
   207           else {
   208             reader.close
   209             finished = true
   210             close()
   211           }
   212           //}}}
   213         }
   214         catch {
   215           case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
   216         }
   217       }
   218       put_result(Markup.SYSTEM, name + " terminated")
   219     }
   220 
   221 
   222   /* command input */
   223 
   224   private def input_actor(name: String, raw_stream: OutputStream): Actor =
   225     Simple_Thread.actor(name) {
   226       val stream = new BufferedOutputStream(raw_stream)
   227       var finished = false
   228       while (!finished) {
   229         try {
   230           //{{{
   231           receive {
   232             case Input_Chunks(chunks) =>
   233               stream.write(Standard_System.string_bytes(
   234                   chunks.map(_.length).mkString("", ",", "\n")));
   235               chunks.foreach(stream.write(_));
   236               stream.flush
   237             case Close =>
   238               stream.close
   239               finished = true
   240             case bad => System.err.println(name + ": ignoring bad message " + bad)
   241           }
   242           //}}}
   243         }
   244         catch {
   245           case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
   246         }
   247       }
   248       put_result(Markup.SYSTEM, name + " terminated")
   249     }
   250 
   251 
   252   /* message output */
   253 
   254   private class Protocol_Error(msg: String) extends Exception(msg)
   255 
   256   private def message_actor(name: String, stream: InputStream): Actor =
   257     Simple_Thread.actor(name) {
   258       val default_buffer = new Array[Byte](65536)
   259       var c = -1
   260 
   261       def read_chunk(): XML.Body =
   262       {
   263         //{{{
   264         // chunk size
   265         var n = 0
   266         c = stream.read
   267         while (48 <= c && c <= 57) {
   268           n = 10 * n + (c - 48)
   269           c = stream.read
   270         }
   271         if (c != 10) throw new Protocol_Error("bad message chunk header")
   272 
   273         // chunk content
   274         val buf =
   275           if (n <= default_buffer.size) default_buffer
   276           else new Array[Byte](n)
   277 
   278         var i = 0
   279         var m = 0
   280         do {
   281           m = stream.read(buf, i, n - i)
   282           i += m
   283         } while (m > 0 && n > i)
   284 
   285         if (i != n) throw new Protocol_Error("bad message chunk content")
   286 
   287         YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
   288         //}}}
   289       }
   290 
   291       do {
   292         try {
   293           val header = read_chunk()
   294           val body = read_chunk()
   295           header match {
   296             case List(XML.Elem(Markup(name, props), Nil))
   297                 if name.size == 1 && Kind.markup.isDefinedAt(name(0)) =>
   298               put_result(Kind.markup(name(0)), props, body)
   299             case _ => throw new Protocol_Error("bad header: " + header.toString)
   300           }
   301         }
   302         catch {
   303           case e: IOException =>
   304             put_result(Markup.SYSTEM, "Cannot read message:\n" + e.getMessage)
   305           case e: Protocol_Error =>
   306             put_result(Markup.SYSTEM, "Malformed message:\n" + e.getMessage)
   307         }
   308       } while (c != -1)
   309       stream.close
   310       close()
   311 
   312       put_result(Markup.SYSTEM, name + " terminated")
   313     }
   314 
   315 
   316 
   317   /** init **/
   318 
   319   /* exec process */
   320 
   321   private val in_fifo = system.mk_fifo()
   322   private val out_fifo = system.mk_fifo()
   323   private def rm_fifos() = { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
   324 
   325   try {
   326     val cmdline =
   327       List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
   328     proc = Some(system.execute(true, cmdline: _*))
   329   }
   330   catch {
   331     case e: IOException =>
   332       rm_fifos()
   333       error("Failed to execute Isabelle process: " + e.getMessage)
   334   }
   335 
   336 
   337   /* I/O actors */
   338 
   339   private val standard_input = stdin_actor("standard_input", proc.get.getOutputStream)
   340   stdout_actor("standard_output", proc.get.getInputStream)
   341 
   342   private val command_input = input_actor("command_input", system.fifo_output_stream(in_fifo))
   343   message_actor("message_output", system.fifo_input_stream(out_fifo))
   344 
   345 
   346   /* exit process */
   347 
   348   Simple_Thread.actor("process_exit") {
   349     proc match {
   350       case None =>
   351       case Some(p) =>
   352         val rc = p.waitFor()
   353         Thread.sleep(300)  // FIXME property!?
   354         put_result(Markup.SYSTEM, "process_exit terminated")
   355         put_result(Markup.EXIT, rc.toString)
   356     }
   357     rm_fifos()
   358   }
   359 
   360 
   361 
   362   /** main methods **/
   363 
   364   def input_raw(text: String): Unit = standard_input ! Input_Text(text)
   365 
   366   def input_bytes(name: String, args: Array[Byte]*): Unit =
   367     command_input ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
   368 
   369   def input(name: String, args: String*): Unit =
   370     input_bytes(name, args.map(Standard_System.string_bytes): _*)
   371 
   372   def close(): Unit = command_input ! Close
   373 }