src/Pure/System/isabelle_process.scala
author wenzelm
Wed Jun 02 11:09:26 2010 +0200 (2010-06-02 ago)
changeset 37251 72c7e636067b
parent 37132 10ef4da1c314
child 37689 628eabe2213a
permissions -rw-r--r--
normalize and postprocess proof body in a separate future, taking care of platforms without multithreading (greately improves parallelization in general without the overhead of promised proofs, cf. usedir -q 0);
     1 /*  Title:      Pure/System/isabelle_process.ML
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.util.concurrent.LinkedBlockingQueue
    11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    12   InputStream, OutputStream, IOException}
    13 
    14 import scala.actors.Actor
    15 import Actor._
    16 
    17 
    18 object Isabelle_Process
    19 {
    20   /* results */
    21 
    22   object Kind extends Enumeration {
    23     //{{{ values and codes
    24     // internal system notification
    25     val SYSTEM = Value("SYSTEM")
    26     // Posix channels/events
    27     val STDIN = Value("STDIN")
    28     val STDOUT = Value("STDOUT")
    29     val SIGNAL = Value("SIGNAL")
    30     val EXIT = Value("EXIT")
    31     // Isabelle messages
    32     val INIT = Value("INIT")
    33     val STATUS = Value("STATUS")
    34     val WRITELN = Value("WRITELN")
    35     val TRACING = Value("TRACING")
    36     val WARNING = Value("WARNING")
    37     val ERROR = Value("ERROR")
    38     val DEBUG = Value("DEBUG")
    39     // messages codes
    40     val code = Map(
    41       ('A' : Int) -> Kind.INIT,
    42       ('B' : Int) -> Kind.STATUS,
    43       ('C' : Int) -> Kind.WRITELN,
    44       ('D' : Int) -> Kind.TRACING,
    45       ('E' : Int) -> Kind.WARNING,
    46       ('F' : Int) -> Kind.ERROR,
    47       ('G' : Int) -> Kind.DEBUG,
    48       ('0' : Int) -> Kind.SYSTEM,
    49       ('1' : Int) -> Kind.STDIN,
    50       ('2' : Int) -> Kind.STDOUT,
    51       ('3' : Int) -> Kind.SIGNAL,
    52       ('4' : Int) -> Kind.EXIT)
    53     // message markup
    54     val markup = Map(
    55       Kind.INIT -> Markup.INIT,
    56       Kind.STATUS -> Markup.STATUS,
    57       Kind.WRITELN -> Markup.WRITELN,
    58       Kind.TRACING -> Markup.TRACING,
    59       Kind.WARNING -> Markup.WARNING,
    60       Kind.ERROR -> Markup.ERROR,
    61       Kind.DEBUG -> Markup.DEBUG,
    62       Kind.SYSTEM -> Markup.SYSTEM,
    63       Kind.STDIN -> Markup.STDIN,
    64       Kind.STDOUT -> Markup.STDOUT,
    65       Kind.SIGNAL -> Markup.SIGNAL,
    66       Kind.EXIT -> Markup.EXIT)
    67     //}}}
    68     def is_raw(kind: Value) =
    69       kind == STDOUT
    70     def is_control(kind: Value) =
    71       kind == SYSTEM ||
    72       kind == SIGNAL ||
    73       kind == EXIT
    74     def is_system(kind: Value) =
    75       kind == SYSTEM ||
    76       kind == STDIN ||
    77       kind == SIGNAL ||
    78       kind == EXIT ||
    79       kind == STATUS
    80   }
    81 
    82   class Result(val kind: Kind.Value, val props: List[(String, String)], val body: List[XML.Tree])
    83   {
    84     def message = XML.Elem(Kind.markup(kind), props, body)
    85 
    86     override def toString: String =
    87     {
    88       val res =
    89         if (kind == Kind.STATUS) body.map(_.toString).mkString
    90         else Pretty.string_of(body)
    91       if (props.isEmpty)
    92         kind.toString + " [[" + res + "]]"
    93       else
    94         kind.toString + " " +
    95           (for ((x, y) <- props) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    96     }
    97     def is_raw = Kind.is_raw(kind)
    98     def is_control = Kind.is_control(kind)
    99     def is_system = Kind.is_system(kind)
   100 
   101     def is_ready = kind == Kind.STATUS && body == List(XML.Elem(Markup.READY, Nil, Nil))
   102 
   103     def cache(c: XML.Cache): Result =
   104       new Result(kind, c.cache_props(props), c.cache_trees(body))
   105   }
   106 }
   107 
   108 
   109 class Isabelle_Process(system: Isabelle_System, receiver: Actor, args: String*)
   110 {
   111   import Isabelle_Process._
   112 
   113 
   114   /* demo constructor */
   115 
   116   def this(args: String*) =
   117     this(new Isabelle_System,
   118       actor { loop { react { case res => Console.println(res) } } }, args: _*)
   119 
   120 
   121   /* process information */
   122 
   123   @volatile private var proc: Process = null
   124   @volatile private var closing = false
   125   @volatile private var pid: String = null
   126 
   127 
   128   /* results */
   129 
   130   private def put_result(kind: Kind.Value, props: List[(String, String)], body: List[XML.Tree])
   131   {
   132     if (kind == Kind.INIT) {
   133       for ((Markup.PID, p) <- props) pid = p
   134     }
   135     receiver ! new Result(kind, props, body)
   136   }
   137 
   138   private def put_result(kind: Kind.Value, text: String)
   139   {
   140     put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
   141   }
   142 
   143 
   144   /* signals */
   145 
   146   def interrupt() = synchronized {
   147     if (proc == null) error("Cannot interrupt Isabelle: no process")
   148     if (pid == null) put_result(Kind.SYSTEM, "Cannot interrupt: unknown pid")
   149     else {
   150       try {
   151         if (system.execute(true, "kill", "-INT", pid).waitFor == 0)
   152           put_result(Kind.SIGNAL, "INT")
   153         else
   154           put_result(Kind.SYSTEM, "Cannot interrupt: kill command failed")
   155       }
   156       catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
   157     }
   158   }
   159 
   160   def kill() = synchronized {
   161     if (proc == 0) error("Cannot kill Isabelle: no process")
   162     else {
   163       try_close()
   164       Thread.sleep(500)  // FIXME property!?
   165       put_result(Kind.SIGNAL, "KILL")
   166       proc.destroy
   167       proc = null
   168       pid = null
   169     }
   170   }
   171 
   172 
   173   /* output being piped into the process */
   174 
   175   private val output = new LinkedBlockingQueue[String]
   176 
   177   private def output_raw(text: String) = synchronized {
   178     if (proc == null) error("Cannot output to Isabelle: no process")
   179     if (closing) error("Cannot output to Isabelle: already closing")
   180     output.put(text)
   181   }
   182 
   183   def output_sync(text: String) =
   184     output_raw(" \\<^sync>\n; " + text + " \\<^sync>;\n")
   185 
   186 
   187   def command(text: String) =
   188     output_sync("Isabelle.command " + Isabelle_Syntax.encode_string(text))
   189 
   190   def command(props: List[(String, String)], text: String) =
   191     output_sync("Isabelle.command " + Isabelle_Syntax.encode_properties(props) + " " +
   192       Isabelle_Syntax.encode_string(text))
   193 
   194   def ML(text: String) =
   195     output_sync("ML_val " + Isabelle_Syntax.encode_string(text))
   196 
   197   def close() = synchronized {    // FIXME watchdog/timeout
   198     output_raw("\u0000")
   199     closing = true
   200   }
   201 
   202   def try_close() = synchronized {
   203     if (proc != null && !closing) {
   204       try { close() }
   205       catch { case _: RuntimeException => }
   206     }
   207   }
   208 
   209 
   210   /* stdin */
   211 
   212   private class Stdin_Thread(out_stream: OutputStream) extends Thread("isabelle: stdin") {
   213     override def run() = {
   214       val writer = new BufferedWriter(new OutputStreamWriter(out_stream, Standard_System.charset))
   215       var finished = false
   216       while (!finished) {
   217         try {
   218           //{{{
   219           val s = output.take
   220           if (s == "\u0000") {
   221             writer.close
   222             finished = true
   223           }
   224           else {
   225             put_result(Kind.STDIN, s)
   226             writer.write(s)
   227             writer.flush
   228           }
   229           //}}}
   230         }
   231         catch {
   232           case e: IOException => put_result(Kind.SYSTEM, "Stdin thread: " + e.getMessage)
   233         }
   234       }
   235       put_result(Kind.SYSTEM, "Stdin thread terminated")
   236     }
   237   }
   238 
   239 
   240   /* stdout */
   241 
   242   private class Stdout_Thread(in_stream: InputStream) extends Thread("isabelle: stdout") {
   243     override def run() = {
   244       val reader = new BufferedReader(new InputStreamReader(in_stream, Standard_System.charset))
   245       var result = new StringBuilder(100)
   246 
   247       var finished = false
   248       while (!finished) {
   249         try {
   250           //{{{
   251           var c = -1
   252           var done = false
   253           while (!done && (result.length == 0 || reader.ready)) {
   254             c = reader.read
   255             if (c >= 0) result.append(c.asInstanceOf[Char])
   256             else done = true
   257           }
   258           if (result.length > 0) {
   259             put_result(Kind.STDOUT, result.toString)
   260             result.length = 0
   261           }
   262           else {
   263             reader.close
   264             finished = true
   265             try_close()
   266           }
   267           //}}}
   268         }
   269         catch {
   270           case e: IOException => put_result(Kind.SYSTEM, "Stdout thread: " + e.getMessage)
   271         }
   272       }
   273       put_result(Kind.SYSTEM, "Stdout thread terminated")
   274     }
   275   }
   276 
   277 
   278   /* messages */
   279 
   280   private class Message_Thread(fifo: String) extends Thread("isabelle: messages")
   281   {
   282     private class Protocol_Error(msg: String) extends Exception(msg)
   283     override def run()
   284     {
   285       val stream = system.fifo_stream(fifo)
   286       val default_buffer = new Array[Byte](65536)
   287       var c = -1
   288 
   289       def read_chunk(): List[XML.Tree] =
   290       {
   291         //{{{
   292         // chunk size
   293         var n = 0
   294         c = stream.read
   295         while (48 <= c && c <= 57) {
   296           n = 10 * n + (c - 48)
   297           c = stream.read
   298         }
   299         if (c != 10) throw new Protocol_Error("bad message chunk header")
   300 
   301         // chunk content
   302         val buf =
   303           if (n <= default_buffer.size) default_buffer
   304           else new Array[Byte](n)
   305 
   306         var i = 0
   307         var m = 0
   308         do {
   309           m = stream.read(buf, i, n - i)
   310           i += m
   311         } while (m > 0 && n > i)
   312 
   313         if (i != n) throw new Protocol_Error("bad message chunk content")
   314 
   315         YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
   316         //}}}
   317       }
   318 
   319       do {
   320         try {
   321           //{{{
   322           c = stream.read
   323           var non_sync = 0
   324           while (c >= 0 && c != 2) {
   325             non_sync += 1
   326             c = stream.read
   327           }
   328           if (non_sync > 0)
   329             throw new Protocol_Error("lost synchronization -- skipping " + non_sync + " bytes")
   330           if (c == 2) {
   331             val header = read_chunk()
   332             val body = read_chunk()
   333             header match {
   334               case List(XML.Elem(name, props, Nil))
   335                   if name.size == 1 && Kind.code.isDefinedAt(name(0)) =>
   336                 put_result(Kind.code(name(0)), props, body)
   337               case _ => throw new Protocol_Error("bad header: " + header.toString)
   338             }
   339           }
   340           //}}}
   341         }
   342         catch {
   343           case e: IOException =>
   344             put_result(Kind.SYSTEM, "Cannot read message:\n" + e.getMessage)
   345           case e: Protocol_Error =>
   346             put_result(Kind.SYSTEM, "Malformed message:\n" + e.getMessage)
   347         }
   348       } while (c != -1)
   349       stream.close
   350       try_close()
   351 
   352       put_result(Kind.SYSTEM, "Message thread terminated")
   353     }
   354   }
   355 
   356 
   357 
   358   /** main **/
   359 
   360   {
   361     /* messages */
   362 
   363     val message_fifo = system.mk_fifo()
   364     def rm_fifo() = system.rm_fifo(message_fifo)
   365 
   366     val message_thread = new Message_Thread(message_fifo)
   367     message_thread.start
   368 
   369 
   370     /* exec process */
   371 
   372     try {
   373       val cmdline = List(system.getenv_strict("ISABELLE_PROCESS"), "-W", message_fifo) ++ args
   374       proc = system.execute(true, cmdline: _*)
   375     }
   376     catch {
   377       case e: IOException =>
   378         rm_fifo()
   379         error("Failed to execute Isabelle process: " + e.getMessage)
   380     }
   381 
   382 
   383     /* stdin/stdout */
   384 
   385     new Stdin_Thread(proc.getOutputStream).start
   386     new Stdout_Thread(proc.getInputStream).start
   387 
   388 
   389     /* exit */
   390 
   391     new Thread("isabelle: exit") {
   392       override def run() = {
   393         val rc = proc.waitFor()
   394         Thread.sleep(300)  // FIXME property!?
   395         put_result(Kind.SYSTEM, "Exit thread terminated")
   396         put_result(Kind.EXIT, rc.toString)
   397         rm_fifo()
   398       }
   399     }.start
   400   }
   401 }