src/Pure/System/isabelle_process.scala
author wenzelm
Mon Sep 20 23:28:35 2010 +0200 (2010-09-20 ago)
changeset 39575 c77b9374f45c
parent 39573 a874ca3f5474
child 39585 00be8711082f
permissions -rw-r--r--
tuned;
     1 /*  Title:      Pure/System/isabelle_process.ML
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.util.concurrent.LinkedBlockingQueue
    11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    12   InputStream, OutputStream, BufferedOutputStream, IOException}
    13 
    14 import scala.actors.Actor
    15 import Actor._
    16 import scala.collection.mutable
    17 
    18 
    19 object Isabelle_Process
    20 {
    21   /* results */
    22 
    23   object Kind
    24   {
    25     val message_markup = Map(
    26       ('A' : Int) -> Markup.INIT,
    27       ('B' : Int) -> Markup.STATUS,
    28       ('C' : Int) -> Markup.REPORT,
    29       ('D' : Int) -> Markup.WRITELN,
    30       ('E' : Int) -> Markup.TRACING,
    31       ('F' : Int) -> Markup.WARNING,
    32       ('G' : Int) -> Markup.ERROR)
    33   }
    34 
    35   class Result(val message: XML.Elem)
    36   {
    37     def kind = message.markup.name
    38     def properties = message.markup.properties
    39     def body = message.body
    40 
    41     def is_init = kind == Markup.INIT
    42     def is_exit = kind == Markup.EXIT
    43     def is_stdout = kind == Markup.STDOUT
    44     def is_system = kind == Markup.SYSTEM
    45     def is_status = kind == Markup.STATUS
    46     def is_report = kind == Markup.REPORT
    47     def is_ready = is_status && body == List(XML.Elem(Markup.Ready, Nil))
    48 
    49     override def toString: String =
    50     {
    51       val res =
    52         if (is_status || is_report) message.body.map(_.toString).mkString
    53         else Pretty.string_of(message.body)
    54       if (properties.isEmpty)
    55         kind.toString + " [[" + res + "]]"
    56       else
    57         kind.toString + " " +
    58           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    59     }
    60   }
    61 }
    62 
    63 
    64 class Isabelle_Process(system: Isabelle_System, timeout: Int, receiver: Actor, args: String*)
    65 {
    66   import Isabelle_Process._
    67 
    68 
    69   /* demo constructor */
    70 
    71   def this(args: String*) =
    72     this(new Isabelle_System, 10000,
    73       actor { loop { react { case res => Console.println(res) } } }, args: _*)
    74 
    75 
    76   /* system log */
    77 
    78   private val system_results = new mutable.ListBuffer[String]
    79 
    80   private def system_result(text: String)
    81   {
    82     synchronized { system_results += text }
    83     receiver ! new Result(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))
    84   }
    85 
    86   def syslog(): List[String] = synchronized { system_results.toList }
    87 
    88 
    89   /* results */
    90 
    91   private val xml_cache = new XML.Cache(131071)
    92 
    93   private def put_result(kind: String, props: List[(String, String)], body: XML.Body)
    94   {
    95     if (pid.isEmpty && kind == Markup.INIT) {
    96       rm_fifos()
    97       props.find(_._1 == Markup.PID).map(_._1) match {
    98         case None => system_result("Bad Isabelle process initialization: missing pid")
    99         case p => pid = p
   100       }
   101     }
   102 
   103     val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
   104     xml_cache.cache_tree(msg)((message: XML.Tree) =>
   105       receiver ! new Result(message.asInstanceOf[XML.Elem]))
   106   }
   107 
   108   private def put_result(kind: String, text: String)
   109   {
   110     put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
   111   }
   112 
   113 
   114   /* input actors */
   115 
   116   private case class Input_Text(text: String)
   117   private case class Input_Chunks(chunks: List[Array[Byte]])
   118 
   119   private case object Close
   120   private def close(a: Actor) { if (a != null) a ! Close }
   121 
   122   @volatile private var standard_input: Actor = null
   123   @volatile private var command_input: Actor = null
   124 
   125 
   126   /* process manager */
   127 
   128   private val in_fifo = system.mk_fifo()
   129   private val out_fifo = system.mk_fifo()
   130   private def rm_fifos() { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
   131 
   132   private val proc =
   133     try {
   134       val cmdline =
   135         List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
   136       system.execute(true, cmdline: _*)
   137     }
   138     catch { case e: IOException => rm_fifos(); throw(e) }
   139 
   140   private val stdout_reader =
   141     new BufferedReader(new InputStreamReader(proc.getInputStream, Standard_System.charset))
   142 
   143   private val stdin_writer =
   144     new BufferedWriter(new OutputStreamWriter(proc.getOutputStream, Standard_System.charset))
   145 
   146   private val (process_manager, _) = Simple_Thread.actor("process_manager")
   147   {
   148     val (startup_failed, startup_output) =
   149     {
   150       val expired = System.currentTimeMillis() + timeout
   151       val result = new StringBuilder(100)
   152 
   153       var finished = false
   154       while (!finished && System.currentTimeMillis() <= expired) {
   155         while (!finished && stdout_reader.ready) {
   156           val c = stdout_reader.read
   157           if (c == 2) finished = true
   158           else result += c.toChar
   159         }
   160         Thread.sleep(10)
   161       }
   162       (!finished, result.toString)
   163     }
   164     system_result(startup_output)
   165 
   166     if (startup_failed) {
   167       put_result(Markup.EXIT, "127")
   168       stdin_writer.close
   169       Thread.sleep(300)  // FIXME !?
   170       proc.destroy  // FIXME unreliable
   171     }
   172     else {
   173       // rendezvous
   174       val command_stream = system.fifo_output_stream(in_fifo)
   175       val message_stream = system.fifo_input_stream(out_fifo)
   176 
   177       val stdin = stdin_actor(); standard_input = stdin._2
   178       val stdout = stdout_actor()
   179       val input = input_actor(command_stream); command_input = input._2
   180       val message = message_actor(message_stream)
   181 
   182       val rc = proc.waitFor()
   183       system_result("Isabelle process terminated")
   184       for ((thread, _) <- List(stdin, stdout, input, message)) thread.join
   185       system_result("process_manager terminated")
   186       put_result(Markup.EXIT, rc.toString)
   187     }
   188     rm_fifos()
   189   }
   190 
   191   def join() { process_manager.join() }
   192 
   193 
   194   /* signals */
   195 
   196   @volatile private var pid: Option[String] = None
   197 
   198   def interrupt()
   199   {
   200     pid match {
   201       case None => system_result("Cannot interrupt Isabelle: unknown pid")
   202       case Some(i) =>
   203         try {
   204           if (system.execute(true, "kill", "-INT", i).waitFor == 0)
   205             system_result("Interrupt Isabelle")
   206           else
   207             system_result("Cannot interrupt Isabelle: kill command failed")
   208         }
   209         catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
   210     }
   211   }
   212 
   213   def kill()
   214   {
   215     val running =
   216       try { proc.exitValue; false }
   217       catch { case e: java.lang.IllegalThreadStateException => true }
   218     if (running) {
   219       close()
   220       Thread.sleep(500)  // FIXME !?
   221       system_result("Kill Isabelle")
   222       proc.destroy
   223     }
   224   }
   225 
   226 
   227 
   228   /** stream actors **/
   229 
   230   /* raw stdin */
   231 
   232   private def stdin_actor(): (Thread, Actor) =
   233   {
   234     val name = "standard_input"
   235     Simple_Thread.actor(name) {
   236       var finished = false
   237       while (!finished) {
   238         try {
   239           //{{{
   240           receive {
   241             case Input_Text(text) =>
   242               stdin_writer.write(text)
   243               stdin_writer.flush
   244             case Close =>
   245               stdin_writer.close
   246               finished = true
   247             case bad => System.err.println(name + ": ignoring bad message " + bad)
   248           }
   249           //}}}
   250         }
   251         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   252       }
   253       system_result(name + " terminated")
   254     }
   255   }
   256 
   257 
   258   /* raw stdout */
   259 
   260   private def stdout_actor(): (Thread, Actor) =
   261   {
   262     val name = "standard_output"
   263     Simple_Thread.actor(name) {
   264       var result = new StringBuilder(100)
   265 
   266       var finished = false
   267       while (!finished) {
   268         try {
   269           //{{{
   270           var c = -1
   271           var done = false
   272           while (!done && (result.length == 0 || stdout_reader.ready)) {
   273             c = stdout_reader.read
   274             if (c >= 0) result.append(c.asInstanceOf[Char])
   275             else done = true
   276           }
   277           if (result.length > 0) {
   278             put_result(Markup.STDOUT, result.toString)
   279             result.length = 0
   280           }
   281           else {
   282             stdout_reader.close
   283             finished = true
   284           }
   285           //}}}
   286         }
   287         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   288       }
   289       system_result(name + " terminated")
   290     }
   291   }
   292 
   293 
   294   /* command input */
   295 
   296   private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
   297   {
   298     val name = "command_input"
   299     Simple_Thread.actor(name) {
   300       val stream = new BufferedOutputStream(raw_stream)
   301       var finished = false
   302       while (!finished) {
   303         try {
   304           //{{{
   305           receive {
   306             case Input_Chunks(chunks) =>
   307               stream.write(Standard_System.string_bytes(
   308                   chunks.map(_.length).mkString("", ",", "\n")));
   309               chunks.foreach(stream.write(_));
   310               stream.flush
   311             case Close =>
   312               stream.close
   313               finished = true
   314             case bad => System.err.println(name + ": ignoring bad message " + bad)
   315           }
   316           //}}}
   317         }
   318         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   319       }
   320       system_result(name + " terminated")
   321     }
   322   }
   323 
   324 
   325   /* message output */
   326 
   327   private def message_actor(stream: InputStream): (Thread, Actor) =
   328   {
   329     class EOF extends Exception
   330     class Protocol_Error(msg: String) extends Exception(msg)
   331 
   332     val name = "message_output"
   333     Simple_Thread.actor(name) {
   334       val default_buffer = new Array[Byte](65536)
   335       var c = -1
   336 
   337       def read_chunk(): XML.Body =
   338       {
   339         //{{{
   340         // chunk size
   341         var n = 0
   342         c = stream.read
   343         if (c == -1) throw new EOF
   344         while (48 <= c && c <= 57) {
   345           n = 10 * n + (c - 48)
   346           c = stream.read
   347         }
   348         if (c != 10) throw new Protocol_Error("bad message chunk header")
   349 
   350         // chunk content
   351         val buf =
   352           if (n <= default_buffer.size) default_buffer
   353           else new Array[Byte](n)
   354 
   355         var i = 0
   356         var m = 0
   357         do {
   358           m = stream.read(buf, i, n - i)
   359           i += m
   360         } while (m > 0 && n > i)
   361 
   362         if (i != n) throw new Protocol_Error("bad message chunk content")
   363 
   364         YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
   365         //}}}
   366       }
   367 
   368       do {
   369         try {
   370           val header = read_chunk()
   371           val body = read_chunk()
   372           header match {
   373             case List(XML.Elem(Markup(name, props), Nil))
   374                 if name.size == 1 && Kind.message_markup.isDefinedAt(name(0)) =>
   375               put_result(Kind.message_markup(name(0)), props, body)
   376             case _ => throw new Protocol_Error("bad header: " + header.toString)
   377           }
   378         }
   379         catch {
   380           case e: IOException => system_result("Cannot read message:\n" + e.getMessage)
   381           case e: Protocol_Error => system_result("Malformed message:\n" + e.getMessage)
   382           case _: EOF =>
   383         }
   384       } while (c != -1)
   385       stream.close
   386 
   387       system_result(name + " terminated")
   388     }
   389   }
   390 
   391 
   392   /** main methods **/
   393 
   394   def input_raw(text: String): Unit = standard_input ! Input_Text(text)
   395 
   396   def input_bytes(name: String, args: Array[Byte]*): Unit =
   397     command_input ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
   398 
   399   def input(name: String, args: String*): Unit =
   400     input_bytes(name, args.map(Standard_System.string_bytes): _*)
   401 
   402   def close(): Unit = { close(command_input); close(standard_input) }
   403 }