src/Pure/System/isabelle_process.scala
author wenzelm
Thu Sep 23 14:39:29 2010 +0200 (2010-09-23 ago)
changeset 39625 fb0c851e4f9d
parent 39623 6aae022fde9b
child 39626 a5d0bcfb95a3
permissions -rw-r--r--
tuned prover message categorization;
     1 /*  Title:      Pure/System/isabelle_process.ML
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.util.concurrent.LinkedBlockingQueue
    11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    12   InputStream, OutputStream, BufferedOutputStream, IOException}
    13 
    14 import scala.actors.Actor
    15 import Actor._
    16 import scala.collection.mutable
    17 
    18 
    19 object Isabelle_Process
    20 {
    21   /* results */
    22 
    23   object Kind
    24   {
    25     val message_markup = Map(
    26       ('A' : Int) -> Markup.INIT,
    27       ('B' : Int) -> Markup.STATUS,
    28       ('C' : Int) -> Markup.REPORT,
    29       ('D' : Int) -> Markup.WRITELN,
    30       ('E' : Int) -> Markup.TRACING,
    31       ('F' : Int) -> Markup.WARNING,
    32       ('G' : Int) -> Markup.ERROR)
    33   }
    34 
    35   class Result(val message: XML.Elem)
    36   {
    37     def kind = message.markup.name
    38     def properties = message.markup.properties
    39     def body = message.body
    40 
    41     def is_init = kind == Markup.INIT
    42     def is_exit = kind == Markup.EXIT
    43     def is_stdout = kind == Markup.STDOUT
    44     def is_system = kind == Markup.SYSTEM
    45     def is_status = kind == Markup.STATUS
    46     def is_report = kind == Markup.REPORT
    47     def is_ready = Isar_Document.is_ready(message)
    48     def is_syslog = is_init || is_exit || is_system || is_ready
    49 
    50     override def toString: String =
    51     {
    52       val res =
    53         if (is_status || is_report) message.body.map(_.toString).mkString
    54         else Pretty.string_of(message.body)
    55       if (properties.isEmpty)
    56         kind.toString + " [[" + res + "]]"
    57       else
    58         kind.toString + " " +
    59           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    60     }
    61   }
    62 }
    63 
    64 
    65 class Isabelle_Process(system: Isabelle_System, timeout: Int, receiver: Actor, args: String*)
    66 {
    67   import Isabelle_Process._
    68 
    69 
    70   /* demo constructor */
    71 
    72   def this(args: String*) =
    73     this(new Isabelle_System, 10000,
    74       actor { loop { react { case res => Console.println(res) } } }, args: _*)
    75 
    76 
    77   /* system log */
    78 
    79   private val system_results = new mutable.ListBuffer[String]
    80 
    81   private def system_result(text: String)
    82   {
    83     synchronized { system_results += text }
    84     receiver ! new Result(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))
    85   }
    86 
    87   def syslog(): List[String] = synchronized { system_results.toList }
    88 
    89 
    90   /* results */
    91 
    92   private val xml_cache = new XML.Cache(131071)
    93 
    94   private def put_result(kind: String, props: List[(String, String)], body: XML.Body)
    95   {
    96     if (kind == Markup.INIT) rm_fifos()
    97     val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
    98     xml_cache.cache_tree(msg)((message: XML.Tree) =>
    99       receiver ! new Result(message.asInstanceOf[XML.Elem]))
   100   }
   101 
   102   private def put_result(kind: String, text: String)
   103   {
   104     put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
   105   }
   106 
   107 
   108   /* input actors */
   109 
   110   private case class Input_Text(text: String)
   111   private case class Input_Chunks(chunks: List[Array[Byte]])
   112 
   113   private case object Close
   114   private def close(p: (Thread, Actor))
   115   {
   116     if (p != null && p._1.isAlive) {
   117       p._2 ! Close
   118       p._1.join
   119     }
   120   }
   121 
   122   @volatile private var standard_input: (Thread, Actor) = null
   123   @volatile private var command_input: (Thread, Actor) = null
   124 
   125 
   126   /** process manager **/
   127 
   128   private val in_fifo = system.mk_fifo()
   129   private val out_fifo = system.mk_fifo()
   130   private def rm_fifos() { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
   131 
   132   private val process =
   133     try {
   134       val cmdline =
   135         List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
   136       new system.Managed_Process(true, cmdline: _*)
   137     }
   138     catch { case e: IOException => rm_fifos(); throw(e) }
   139 
   140   val process_result =
   141     Simple_Thread.future("process_result") { process.join }
   142 
   143   private def terminate_process()
   144   {
   145     try { process.terminate }
   146     catch { case e: IOException => system_result("Failed to terminate Isabelle: " + e.getMessage) }
   147   }
   148 
   149   private val process_manager = Simple_Thread.fork("process_manager")
   150   {
   151     val (startup_failed, startup_output) =
   152     {
   153       val expired = System.currentTimeMillis() + timeout
   154       val result = new StringBuilder(100)
   155 
   156       var finished: Option[Boolean] = None
   157       while (finished.isEmpty && System.currentTimeMillis() <= expired) {
   158         while (finished.isEmpty && process.stdout.ready) {
   159           val c = process.stdout.read
   160           if (c == 2) finished = Some(true)
   161           else result += c.toChar
   162         }
   163         if (process_result.is_finished) finished = Some(false)
   164         else Thread.sleep(10)
   165       }
   166       (finished.isEmpty || !finished.get, result.toString.trim)
   167     }
   168     system_result(startup_output)
   169 
   170     if (startup_failed) {
   171       put_result(Markup.EXIT, "127")
   172       process.stdin.close
   173       Thread.sleep(300)
   174       terminate_process()
   175       process_result.join
   176     }
   177     else {
   178       // rendezvous
   179       val command_stream = system.fifo_output_stream(in_fifo)
   180       val message_stream = system.fifo_input_stream(out_fifo)
   181 
   182       standard_input = stdin_actor()
   183       val stdout = stdout_actor()
   184       command_input = input_actor(command_stream)
   185       val message = message_actor(message_stream)
   186 
   187       val rc = process_result.join
   188       system_result("process terminated")
   189       for ((thread, _) <- List(standard_input, stdout, command_input, message)) thread.join
   190       system_result("process_manager terminated")
   191       put_result(Markup.EXIT, rc.toString)
   192     }
   193     rm_fifos()
   194   }
   195 
   196 
   197   /* management methods */
   198 
   199   def join() { process_manager.join() }
   200 
   201   def interrupt()
   202   {
   203     try { process.interrupt }
   204     catch { case e: IOException => system_result("Failed to interrupt Isabelle: " + e.getMessage) }
   205   }
   206 
   207   def terminate()
   208   {
   209     close()
   210     system_result("Terminating Isabelle process")
   211     terminate_process()
   212   }
   213 
   214 
   215 
   216   /** stream actors **/
   217 
   218   /* raw stdin */
   219 
   220   private def stdin_actor(): (Thread, Actor) =
   221   {
   222     val name = "standard_input"
   223     Simple_Thread.actor(name) {
   224       var finished = false
   225       while (!finished) {
   226         try {
   227           //{{{
   228           receive {
   229             case Input_Text(text) =>
   230               process.stdin.write(text)
   231               process.stdin.flush
   232             case Close =>
   233               process.stdin.close
   234               finished = true
   235             case bad => System.err.println(name + ": ignoring bad message " + bad)
   236           }
   237           //}}}
   238         }
   239         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   240       }
   241       system_result(name + " terminated")
   242     }
   243   }
   244 
   245 
   246   /* raw stdout */
   247 
   248   private def stdout_actor(): (Thread, Actor) =
   249   {
   250     val name = "standard_output"
   251     Simple_Thread.actor(name) {
   252       var result = new StringBuilder(100)
   253 
   254       var finished = false
   255       while (!finished) {
   256         try {
   257           //{{{
   258           var c = -1
   259           var done = false
   260           while (!done && (result.length == 0 || process.stdout.ready)) {
   261             c = process.stdout.read
   262             if (c >= 0) result.append(c.asInstanceOf[Char])
   263             else done = true
   264           }
   265           if (result.length > 0) {
   266             put_result(Markup.STDOUT, result.toString)
   267             result.length = 0
   268           }
   269           else {
   270             process.stdout.close
   271             finished = true
   272           }
   273           //}}}
   274         }
   275         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   276       }
   277       system_result(name + " terminated")
   278     }
   279   }
   280 
   281 
   282   /* command input */
   283 
   284   private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
   285   {
   286     val name = "command_input"
   287     Simple_Thread.actor(name) {
   288       val stream = new BufferedOutputStream(raw_stream)
   289       var finished = false
   290       while (!finished) {
   291         try {
   292           //{{{
   293           receive {
   294             case Input_Chunks(chunks) =>
   295               stream.write(Standard_System.string_bytes(
   296                   chunks.map(_.length).mkString("", ",", "\n")));
   297               chunks.foreach(stream.write(_));
   298               stream.flush
   299             case Close =>
   300               stream.close
   301               finished = true
   302             case bad => System.err.println(name + ": ignoring bad message " + bad)
   303           }
   304           //}}}
   305         }
   306         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   307       }
   308       system_result(name + " terminated")
   309     }
   310   }
   311 
   312 
   313   /* message output */
   314 
   315   private def message_actor(stream: InputStream): (Thread, Actor) =
   316   {
   317     class EOF extends Exception
   318     class Protocol_Error(msg: String) extends Exception(msg)
   319 
   320     val name = "message_output"
   321     Simple_Thread.actor(name) {
   322       val default_buffer = new Array[Byte](65536)
   323       var c = -1
   324 
   325       def read_chunk(): XML.Body =
   326       {
   327         //{{{
   328         // chunk size
   329         var n = 0
   330         c = stream.read
   331         if (c == -1) throw new EOF
   332         while (48 <= c && c <= 57) {
   333           n = 10 * n + (c - 48)
   334           c = stream.read
   335         }
   336         if (c != 10) throw new Protocol_Error("bad message chunk header")
   337 
   338         // chunk content
   339         val buf =
   340           if (n <= default_buffer.size) default_buffer
   341           else new Array[Byte](n)
   342 
   343         var i = 0
   344         var m = 0
   345         do {
   346           m = stream.read(buf, i, n - i)
   347           i += m
   348         } while (m > 0 && n > i)
   349 
   350         if (i != n) throw new Protocol_Error("bad message chunk content")
   351 
   352         YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
   353         //}}}
   354       }
   355 
   356       do {
   357         try {
   358           val header = read_chunk()
   359           val body = read_chunk()
   360           header match {
   361             case List(XML.Elem(Markup(name, props), Nil))
   362                 if name.size == 1 && Kind.message_markup.isDefinedAt(name(0)) =>
   363               put_result(Kind.message_markup(name(0)), props, body)
   364             case _ => throw new Protocol_Error("bad header: " + header.toString)
   365           }
   366         }
   367         catch {
   368           case e: IOException => system_result("Cannot read message:\n" + e.getMessage)
   369           case e: Protocol_Error => system_result("Malformed message:\n" + e.getMessage)
   370           case _: EOF =>
   371         }
   372       } while (c != -1)
   373       stream.close
   374 
   375       system_result(name + " terminated")
   376     }
   377   }
   378 
   379 
   380   /** main methods **/
   381 
   382   def input_raw(text: String): Unit = standard_input._2 ! Input_Text(text)
   383 
   384   def input_bytes(name: String, args: Array[Byte]*): Unit =
   385     command_input._2 ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
   386 
   387   def input(name: String, args: String*): Unit =
   388     input_bytes(name, args.map(Standard_System.string_bytes): _*)
   389 
   390   def close(): Unit = { close(command_input); close(standard_input) }
   391 }