src/Pure/System/isabelle_process.scala
author wenzelm
Mon Aug 09 18:18:32 2010 +0200 (2010-08-09 ago)
changeset 38253 3d4e521014f7
parent 38236 d8c7be27e01d
child 38259 2b61c5e27399
permissions -rw-r--r--
Isabelle_Process: separate input fifo for commands (still using the old tty protocol);
some partial workarounds for Cygwin;
     1 /*  Title:      Pure/System/isabelle_process.ML
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.util.concurrent.LinkedBlockingQueue
    11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    12   InputStream, OutputStream, IOException}
    13 
    14 import scala.actors.Actor
    15 import Actor._
    16 
    17 
    18 object Isabelle_Process
    19 {
    20   /* results */
    21 
    22   object Kind {
    23     // message markup
    24     val markup = Map(
    25       ('A' : Int) -> Markup.INIT,
    26       ('B' : Int) -> Markup.STATUS,
    27       ('C' : Int) -> Markup.REPORT,
    28       ('D' : Int) -> Markup.WRITELN,
    29       ('E' : Int) -> Markup.TRACING,
    30       ('F' : Int) -> Markup.WARNING,
    31       ('G' : Int) -> Markup.ERROR,
    32       ('H' : Int) -> Markup.DEBUG)
    33     def is_raw(kind: String) =
    34       kind == Markup.STDOUT
    35     def is_control(kind: String) =
    36       kind == Markup.SYSTEM ||
    37       kind == Markup.SIGNAL ||
    38       kind == Markup.EXIT
    39     def is_system(kind: String) =
    40       kind == Markup.SYSTEM ||
    41       kind == Markup.STDIN ||
    42       kind == Markup.SIGNAL ||
    43       kind == Markup.EXIT ||
    44       kind == Markup.STATUS
    45   }
    46 
    47   class Result(val message: XML.Elem)
    48   {
    49     def kind = message.markup.name
    50     def properties = message.markup.properties
    51     def body = message.body
    52 
    53     def is_raw = Kind.is_raw(kind)
    54     def is_control = Kind.is_control(kind)
    55     def is_system = Kind.is_system(kind)
    56     def is_status = kind == Markup.STATUS
    57     def is_report = kind == Markup.REPORT
    58     def is_ready = is_status && body == List(XML.Elem(Markup.Ready, Nil))
    59 
    60     override def toString: String =
    61     {
    62       val res =
    63         if (is_status || is_report) message.body.map(_.toString).mkString
    64         else Pretty.string_of(message.body)
    65       if (properties.isEmpty)
    66         kind.toString + " [[" + res + "]]"
    67       else
    68         kind.toString + " " +
    69           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    70     }
    71 
    72     def cache(c: XML.Cache): Result = new Result(c.cache_tree(message).asInstanceOf[XML.Elem])
    73   }
    74 }
    75 
    76 
    77 class Isabelle_Process(system: Isabelle_System, receiver: Actor, args: String*)
    78 {
    79   import Isabelle_Process._
    80 
    81 
    82   /* demo constructor */
    83 
    84   def this(args: String*) =
    85     this(new Isabelle_System,
    86       actor { loop { react { case res => Console.println(res) } } }, args: _*)
    87 
    88 
    89   /* process information */
    90 
    91   @volatile private var proc: Process = null
    92   @volatile private var closing = false
    93   @volatile private var pid: String = null
    94 
    95 
    96   /* results */
    97 
    98   private def put_result(kind: String, props: List[(String, String)], body: List[XML.Tree])
    99   {
   100     if (kind == Markup.INIT) {
   101       for ((Markup.PID, p) <- props) pid = p
   102     }
   103     receiver ! new Result(XML.Elem(Markup(kind, props), body))
   104   }
   105 
   106   private def put_result(kind: String, text: String)
   107   {
   108     put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
   109   }
   110 
   111 
   112   /* signals */
   113 
   114   def interrupt() = synchronized {
   115     if (proc == null) error("Cannot interrupt Isabelle: no process")
   116     if (pid == null) put_result(Markup.SYSTEM, "Cannot interrupt: unknown pid")
   117     else {
   118       try {
   119         if (system.execute(true, "kill", "-INT", pid).waitFor == 0)
   120           put_result(Markup.SIGNAL, "INT")
   121         else
   122           put_result(Markup.SYSTEM, "Cannot interrupt: kill command failed")
   123       }
   124       catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
   125     }
   126   }
   127 
   128   def kill() = synchronized {
   129     if (proc == 0) error("Cannot kill Isabelle: no process")
   130     else {
   131       try_close()
   132       Thread.sleep(500)  // FIXME property!?
   133       put_result(Markup.SIGNAL, "KILL")
   134       proc.destroy
   135       proc = null
   136       pid = null
   137     }
   138   }
   139 
   140 
   141   /* output being piped into the process */
   142 
   143   private val output = new LinkedBlockingQueue[String]
   144 
   145   private def output_raw(text: String) = synchronized {
   146     if (proc == null) error("Cannot output to Isabelle: no process")
   147     if (closing) error("Cannot output to Isabelle: already closing")
   148     output.put(text)
   149   }
   150 
   151   def output_sync(text: String) =
   152     output_raw(" \\<^sync>\n; " + text + " \\<^sync>;\n")
   153 
   154 
   155   def command(text: String) =
   156     output_sync("Isabelle.command " + Isabelle_Syntax.encode_string(text))
   157 
   158   def command(props: List[(String, String)], text: String) =
   159     output_sync("Isabelle.command " + Isabelle_Syntax.encode_properties(props) + " " +
   160       Isabelle_Syntax.encode_string(text))
   161 
   162   def ML_val(text: String) =
   163     output_sync("ML_val " + Isabelle_Syntax.encode_string(text))
   164 
   165   def ML_command(text: String) =
   166     output_sync("ML_command " + Isabelle_Syntax.encode_string(text))
   167 
   168   def close() = synchronized {    // FIXME watchdog/timeout
   169     output_raw("\u0000")
   170     closing = true
   171   }
   172 
   173   def try_close() = synchronized {
   174     if (proc != null && !closing) {
   175       try { close() }
   176       catch { case _: RuntimeException => }
   177     }
   178   }
   179 
   180 
   181   /* commands */
   182 
   183   private class Command_Thread(fifo: String) extends Thread("isabelle: commands")
   184   {
   185     override def run()
   186     {
   187       val stream = system.fifo_output_stream(fifo)
   188       val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset))
   189       var finished = false
   190       while (!finished) {
   191         try {
   192           //{{{
   193           val s = output.take
   194           if (s == "\u0000") {
   195             writer.close
   196             finished = true
   197           }
   198           else {
   199             put_result(Markup.STDIN, s)
   200             writer.write(s)
   201             writer.flush
   202           }
   203           //}}}
   204         }
   205         catch {
   206           case e: IOException => put_result(Markup.SYSTEM, "Command thread: " + e.getMessage)
   207         }
   208       }
   209       put_result(Markup.SYSTEM, "Command thread terminated")
   210     }
   211   }
   212 
   213 
   214   /* raw stdout */
   215 
   216   private class Stdout_Thread(stream: InputStream) extends Thread("isabelle: stdout")
   217   {
   218     override def run() =
   219     {
   220       val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset))
   221       var result = new StringBuilder(100)
   222 
   223       var finished = false
   224       while (!finished) {
   225         try {
   226           //{{{
   227           var c = -1
   228           var done = false
   229           while (!done && (result.length == 0 || reader.ready)) {
   230             c = reader.read
   231             if (c >= 0) result.append(c.asInstanceOf[Char])
   232             else done = true
   233           }
   234           if (result.length > 0) {
   235             put_result(Markup.STDOUT, result.toString)
   236             result.length = 0
   237           }
   238           else {
   239             reader.close
   240             finished = true
   241             try_close()
   242           }
   243           //}}}
   244         }
   245         catch {
   246           case e: IOException => put_result(Markup.SYSTEM, "Stdout thread: " + e.getMessage)
   247         }
   248       }
   249       put_result(Markup.SYSTEM, "Stdout thread terminated")
   250     }
   251   }
   252 
   253 
   254   /* messages */
   255 
   256   private class Message_Thread(fifo: String) extends Thread("isabelle: messages")
   257   {
   258     private class Protocol_Error(msg: String) extends Exception(msg)
   259     override def run()
   260     {
   261       val stream = system.fifo_input_stream(fifo)
   262       val default_buffer = new Array[Byte](65536)
   263       var c = -1
   264 
   265       def read_chunk(): List[XML.Tree] =
   266       {
   267         //{{{
   268         // chunk size
   269         var n = 0
   270         c = stream.read
   271         while (48 <= c && c <= 57) {
   272           n = 10 * n + (c - 48)
   273           c = stream.read
   274         }
   275         if (c != 10) throw new Protocol_Error("bad message chunk header")
   276 
   277         // chunk content
   278         val buf =
   279           if (n <= default_buffer.size) default_buffer
   280           else new Array[Byte](n)
   281 
   282         var i = 0
   283         var m = 0
   284         do {
   285           m = stream.read(buf, i, n - i)
   286           i += m
   287         } while (m > 0 && n > i)
   288 
   289         if (i != n) throw new Protocol_Error("bad message chunk content")
   290 
   291         YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
   292         //}}}
   293       }
   294 
   295       do {
   296         try {
   297           //{{{
   298           c = stream.read
   299           var non_sync = 0
   300           while (c >= 0 && c != 2) {
   301             non_sync += 1
   302             c = stream.read
   303           }
   304           if (non_sync > 0)
   305             throw new Protocol_Error("lost synchronization -- skipping " + non_sync + " bytes")
   306           if (c == 2) {
   307             val header = read_chunk()
   308             val body = read_chunk()
   309             header match {
   310               case List(XML.Elem(Markup(name, props), Nil))
   311                   if name.size == 1 && Kind.markup.isDefinedAt(name(0)) =>
   312                 put_result(Kind.markup(name(0)), props, body)
   313               case _ => throw new Protocol_Error("bad header: " + header.toString)
   314             }
   315           }
   316           //}}}
   317         }
   318         catch {
   319           case e: IOException =>
   320             put_result(Markup.SYSTEM, "Cannot read message:\n" + e.getMessage)
   321           case e: Protocol_Error =>
   322             put_result(Markup.SYSTEM, "Malformed message:\n" + e.getMessage)
   323         }
   324       } while (c != -1)
   325       stream.close
   326       try_close()
   327 
   328       put_result(Markup.SYSTEM, "Message thread terminated")
   329     }
   330   }
   331 
   332 
   333 
   334   /** main **/
   335 
   336   {
   337     /* private communication channels */
   338 
   339     val in_fifo = system.mk_fifo()
   340     val out_fifo = system.mk_fifo()
   341     def rm_fifos() = { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
   342 
   343     val command_thread = new Command_Thread(in_fifo)
   344     val message_thread = new Message_Thread(out_fifo)
   345 
   346     command_thread.start
   347     message_thread.start
   348 
   349 
   350     /* exec process */
   351 
   352     try {
   353       val cmdline =
   354         List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
   355       proc = system.execute(true, cmdline: _*)
   356     }
   357     catch {
   358       case e: IOException =>
   359         rm_fifos()
   360         error("Failed to execute Isabelle process: " + e.getMessage)
   361     }
   362     new Stdout_Thread(proc.getInputStream).start
   363 
   364 
   365     /* exit */
   366 
   367     new Thread("isabelle: exit") {
   368       override def run()
   369       {
   370         val rc = proc.waitFor()
   371         Thread.sleep(300)  // FIXME property!?
   372         put_result(Markup.SYSTEM, "Exit thread terminated")
   373         put_result(Markup.EXIT, rc.toString)
   374         rm_fifos()
   375       }
   376     }.start
   377   }
   378 }