src/Pure/System/isabelle_process.scala
author wenzelm
Sat Aug 07 22:09:52 2010 +0200 (2010-08-07 ago)
changeset 38230 ed147003de4b
parent 37712 7f25bf4b4bca
child 38231 968844caaff9
permissions -rw-r--r--
simplified type XML.Tree: embed Markup directly, avoid slightly odd triple;
XML.cache_tree: actually store XML.Text as well;
added Isabelle_Process.Result.properties;
     1 /*  Title:      Pure/System/isabelle_process.ML
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.util.concurrent.LinkedBlockingQueue
    11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    12   InputStream, OutputStream, IOException}
    13 
    14 import scala.actors.Actor
    15 import Actor._
    16 
    17 
    18 object Isabelle_Process
    19 {
    20   /* results */
    21 
    22   object Kind {
    23     // message markup
    24     val markup = Map(
    25       ('A' : Int) -> Markup.INIT,
    26       ('B' : Int) -> Markup.STATUS,
    27       ('C' : Int) -> Markup.WRITELN,
    28       ('D' : Int) -> Markup.TRACING,
    29       ('E' : Int) -> Markup.WARNING,
    30       ('F' : Int) -> Markup.ERROR,
    31       ('G' : Int) -> Markup.DEBUG)
    32     def is_raw(kind: String) =
    33       kind == Markup.STDOUT
    34     def is_control(kind: String) =
    35       kind == Markup.SYSTEM ||
    36       kind == Markup.SIGNAL ||
    37       kind == Markup.EXIT
    38     def is_system(kind: String) =
    39       kind == Markup.SYSTEM ||
    40       kind == Markup.STDIN ||
    41       kind == Markup.SIGNAL ||
    42       kind == Markup.EXIT ||
    43       kind == Markup.STATUS
    44   }
    45 
    46   class Result(val message: XML.Elem)
    47   {
    48     def kind = message.markup.name
    49     def properties = message.markup.properties
    50     def body = message.body
    51 
    52     def is_raw = Kind.is_raw(kind)
    53     def is_control = Kind.is_control(kind)
    54     def is_system = Kind.is_system(kind)
    55     def is_status = kind == Markup.STATUS
    56     def is_ready = is_status && body == List(XML.Elem(Markup(Markup.READY, Nil), Nil))
    57 
    58     override def toString: String =
    59     {
    60       val res =
    61         if (is_status) message.body.map(_.toString).mkString
    62         else Pretty.string_of(message.body)
    63       if (properties.isEmpty)
    64         kind.toString + " [[" + res + "]]"
    65       else
    66         kind.toString + " " +
    67           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    68     }
    69 
    70     def cache(c: XML.Cache): Result = new Result(c.cache_tree(message).asInstanceOf[XML.Elem])
    71   }
    72 }
    73 
    74 
    75 class Isabelle_Process(system: Isabelle_System, receiver: Actor, args: String*)
    76 {
    77   import Isabelle_Process._
    78 
    79 
    80   /* demo constructor */
    81 
    82   def this(args: String*) =
    83     this(new Isabelle_System,
    84       actor { loop { react { case res => Console.println(res) } } }, args: _*)
    85 
    86 
    87   /* process information */
    88 
    89   @volatile private var proc: Process = null
    90   @volatile private var closing = false
    91   @volatile private var pid: String = null
    92 
    93 
    94   /* results */
    95 
    96   private def put_result(kind: String, props: List[(String, String)], body: List[XML.Tree])
    97   {
    98     if (kind == Markup.INIT) {
    99       for ((Markup.PID, p) <- props) pid = p
   100     }
   101     receiver ! new Result(XML.Elem(Markup(kind, props), body))
   102   }
   103 
   104   private def put_result(kind: String, text: String)
   105   {
   106     put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
   107   }
   108 
   109 
   110   /* signals */
   111 
   112   def interrupt() = synchronized {
   113     if (proc == null) error("Cannot interrupt Isabelle: no process")
   114     if (pid == null) put_result(Markup.SYSTEM, "Cannot interrupt: unknown pid")
   115     else {
   116       try {
   117         if (system.execute(true, "kill", "-INT", pid).waitFor == 0)
   118           put_result(Markup.SIGNAL, "INT")
   119         else
   120           put_result(Markup.SYSTEM, "Cannot interrupt: kill command failed")
   121       }
   122       catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
   123     }
   124   }
   125 
   126   def kill() = synchronized {
   127     if (proc == 0) error("Cannot kill Isabelle: no process")
   128     else {
   129       try_close()
   130       Thread.sleep(500)  // FIXME property!?
   131       put_result(Markup.SIGNAL, "KILL")
   132       proc.destroy
   133       proc = null
   134       pid = null
   135     }
   136   }
   137 
   138 
   139   /* output being piped into the process */
   140 
   141   private val output = new LinkedBlockingQueue[String]
   142 
   143   private def output_raw(text: String) = synchronized {
   144     if (proc == null) error("Cannot output to Isabelle: no process")
   145     if (closing) error("Cannot output to Isabelle: already closing")
   146     output.put(text)
   147   }
   148 
   149   def output_sync(text: String) =
   150     output_raw(" \\<^sync>\n; " + text + " \\<^sync>;\n")
   151 
   152 
   153   def command(text: String) =
   154     output_sync("Isabelle.command " + Isabelle_Syntax.encode_string(text))
   155 
   156   def command(props: List[(String, String)], text: String) =
   157     output_sync("Isabelle.command " + Isabelle_Syntax.encode_properties(props) + " " +
   158       Isabelle_Syntax.encode_string(text))
   159 
   160   def ML_val(text: String) =
   161     output_sync("ML_val " + Isabelle_Syntax.encode_string(text))
   162 
   163   def ML_command(text: String) =
   164     output_sync("ML_command " + Isabelle_Syntax.encode_string(text))
   165 
   166   def close() = synchronized {    // FIXME watchdog/timeout
   167     output_raw("\u0000")
   168     closing = true
   169   }
   170 
   171   def try_close() = synchronized {
   172     if (proc != null && !closing) {
   173       try { close() }
   174       catch { case _: RuntimeException => }
   175     }
   176   }
   177 
   178 
   179   /* stdin */
   180 
   181   private class Stdin_Thread(out_stream: OutputStream) extends Thread("isabelle: stdin") {
   182     override def run() = {
   183       val writer = new BufferedWriter(new OutputStreamWriter(out_stream, Standard_System.charset))
   184       var finished = false
   185       while (!finished) {
   186         try {
   187           //{{{
   188           val s = output.take
   189           if (s == "\u0000") {
   190             writer.close
   191             finished = true
   192           }
   193           else {
   194             put_result(Markup.STDIN, s)
   195             writer.write(s)
   196             writer.flush
   197           }
   198           //}}}
   199         }
   200         catch {
   201           case e: IOException => put_result(Markup.SYSTEM, "Stdin thread: " + e.getMessage)
   202         }
   203       }
   204       put_result(Markup.SYSTEM, "Stdin thread terminated")
   205     }
   206   }
   207 
   208 
   209   /* stdout */
   210 
   211   private class Stdout_Thread(in_stream: InputStream) extends Thread("isabelle: stdout") {
   212     override def run() = {
   213       val reader = new BufferedReader(new InputStreamReader(in_stream, Standard_System.charset))
   214       var result = new StringBuilder(100)
   215 
   216       var finished = false
   217       while (!finished) {
   218         try {
   219           //{{{
   220           var c = -1
   221           var done = false
   222           while (!done && (result.length == 0 || reader.ready)) {
   223             c = reader.read
   224             if (c >= 0) result.append(c.asInstanceOf[Char])
   225             else done = true
   226           }
   227           if (result.length > 0) {
   228             put_result(Markup.STDOUT, result.toString)
   229             result.length = 0
   230           }
   231           else {
   232             reader.close
   233             finished = true
   234             try_close()
   235           }
   236           //}}}
   237         }
   238         catch {
   239           case e: IOException => put_result(Markup.SYSTEM, "Stdout thread: " + e.getMessage)
   240         }
   241       }
   242       put_result(Markup.SYSTEM, "Stdout thread terminated")
   243     }
   244   }
   245 
   246 
   247   /* messages */
   248 
   249   private class Message_Thread(fifo: String) extends Thread("isabelle: messages")
   250   {
   251     private class Protocol_Error(msg: String) extends Exception(msg)
   252     override def run()
   253     {
   254       val stream = system.fifo_stream(fifo)
   255       val default_buffer = new Array[Byte](65536)
   256       var c = -1
   257 
   258       def read_chunk(): List[XML.Tree] =
   259       {
   260         //{{{
   261         // chunk size
   262         var n = 0
   263         c = stream.read
   264         while (48 <= c && c <= 57) {
   265           n = 10 * n + (c - 48)
   266           c = stream.read
   267         }
   268         if (c != 10) throw new Protocol_Error("bad message chunk header")
   269 
   270         // chunk content
   271         val buf =
   272           if (n <= default_buffer.size) default_buffer
   273           else new Array[Byte](n)
   274 
   275         var i = 0
   276         var m = 0
   277         do {
   278           m = stream.read(buf, i, n - i)
   279           i += m
   280         } while (m > 0 && n > i)
   281 
   282         if (i != n) throw new Protocol_Error("bad message chunk content")
   283 
   284         YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
   285         //}}}
   286       }
   287 
   288       do {
   289         try {
   290           //{{{
   291           c = stream.read
   292           var non_sync = 0
   293           while (c >= 0 && c != 2) {
   294             non_sync += 1
   295             c = stream.read
   296           }
   297           if (non_sync > 0)
   298             throw new Protocol_Error("lost synchronization -- skipping " + non_sync + " bytes")
   299           if (c == 2) {
   300             val header = read_chunk()
   301             val body = read_chunk()
   302             header match {
   303               case List(XML.Elem(Markup(name, props), Nil))
   304                   if name.size == 1 && Kind.markup.isDefinedAt(name(0)) =>
   305                 put_result(Kind.markup(name(0)), props, body)
   306               case _ => throw new Protocol_Error("bad header: " + header.toString)
   307             }
   308           }
   309           //}}}
   310         }
   311         catch {
   312           case e: IOException =>
   313             put_result(Markup.SYSTEM, "Cannot read message:\n" + e.getMessage)
   314           case e: Protocol_Error =>
   315             put_result(Markup.SYSTEM, "Malformed message:\n" + e.getMessage)
   316         }
   317       } while (c != -1)
   318       stream.close
   319       try_close()
   320 
   321       put_result(Markup.SYSTEM, "Message thread terminated")
   322     }
   323   }
   324 
   325 
   326 
   327   /** main **/
   328 
   329   {
   330     /* messages */
   331 
   332     val message_fifo = system.mk_fifo()
   333     def rm_fifo() = system.rm_fifo(message_fifo)
   334 
   335     val message_thread = new Message_Thread(message_fifo)
   336     message_thread.start
   337 
   338 
   339     /* exec process */
   340 
   341     try {
   342       val cmdline = List(system.getenv_strict("ISABELLE_PROCESS"), "-W", message_fifo) ++ args
   343       proc = system.execute(true, cmdline: _*)
   344     }
   345     catch {
   346       case e: IOException =>
   347         rm_fifo()
   348         error("Failed to execute Isabelle process: " + e.getMessage)
   349     }
   350 
   351 
   352     /* stdin/stdout */
   353 
   354     new Stdin_Thread(proc.getOutputStream).start
   355     new Stdout_Thread(proc.getInputStream).start
   356 
   357 
   358     /* exit */
   359 
   360     new Thread("isabelle: exit") {
   361       override def run() = {
   362         val rc = proc.waitFor()
   363         Thread.sleep(300)  // FIXME property!?
   364         put_result(Markup.SYSTEM, "Exit thread terminated")
   365         put_result(Markup.EXIT, rc.toString)
   366         rm_fifo()
   367       }
   368     }.start
   369   }
   370 }