src/Pure/System/isabelle_process.scala
author haftmann
Tue Oct 20 16:13:01 2009 +0200 (2009-10-20)
changeset 33037 b22e44496dc2
parent 32474 0818e6b1c8a6
child 34100 ea24958c2af5
permissions -rw-r--r--
replaced old_style infixes eq_set, subset, union, inter and variants by generic versions
     1 /*  Title:      Pure/System/isabelle_process.ML
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.util.concurrent.LinkedBlockingQueue
    11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    12   InputStream, OutputStream, IOException}
    13 
    14 import scala.actors.Actor
    15 import Actor._
    16 
    17 
    18 object Isabelle_Process
    19 {
    20   /* results */
    21 
    22   object Kind extends Enumeration {
    23     //{{{ values and codes
    24     // internal system notification
    25     val SYSTEM = Value("SYSTEM")
    26     // Posix channels/events
    27     val STDIN = Value("STDIN")
    28     val STDOUT = Value("STDOUT")
    29     val SIGNAL = Value("SIGNAL")
    30     val EXIT = Value("EXIT")
    31     // Isabelle messages
    32     val INIT = Value("INIT")
    33     val STATUS = Value("STATUS")
    34     val WRITELN = Value("WRITELN")
    35     val PRIORITY = Value("PRIORITY")
    36     val TRACING = Value("TRACING")
    37     val WARNING = Value("WARNING")
    38     val ERROR = Value("ERROR")
    39     val DEBUG = Value("DEBUG")
    40     // messages codes
    41     val code = Map(
    42       ('A' : Int) -> Kind.INIT,
    43       ('B' : Int) -> Kind.STATUS,
    44       ('C' : Int) -> Kind.WRITELN,
    45       ('D' : Int) -> Kind.PRIORITY,
    46       ('E' : Int) -> Kind.TRACING,
    47       ('F' : Int) -> Kind.WARNING,
    48       ('G' : Int) -> Kind.ERROR,
    49       ('H' : Int) -> Kind.DEBUG,
    50       ('0' : Int) -> Kind.SYSTEM,
    51       ('1' : Int) -> Kind.STDIN,
    52       ('2' : Int) -> Kind.STDOUT,
    53       ('3' : Int) -> Kind.SIGNAL,
    54       ('4' : Int) -> Kind.EXIT)
    55     // message markup
    56     val markup = Map(
    57       Kind.INIT -> Markup.INIT,
    58       Kind.STATUS -> Markup.STATUS,
    59       Kind.WRITELN -> Markup.WRITELN,
    60       Kind.PRIORITY -> Markup.PRIORITY,
    61       Kind.TRACING -> Markup.TRACING,
    62       Kind.WARNING -> Markup.WARNING,
    63       Kind.ERROR -> Markup.ERROR,
    64       Kind.DEBUG -> Markup.DEBUG,
    65       Kind.SYSTEM -> Markup.SYSTEM,
    66       Kind.STDIN -> Markup.STDIN,
    67       Kind.STDOUT -> Markup.STDOUT,
    68       Kind.SIGNAL -> Markup.SIGNAL,
    69       Kind.EXIT -> Markup.EXIT)
    70     //}}}
    71     def is_raw(kind: Value) =
    72       kind == STDOUT
    73     def is_control(kind: Value) =
    74       kind == SYSTEM ||
    75       kind == SIGNAL ||
    76       kind == EXIT
    77     def is_system(kind: Value) =
    78       kind == SYSTEM ||
    79       kind == STDIN ||
    80       kind == SIGNAL ||
    81       kind == EXIT ||
    82       kind == STATUS
    83   }
    84 
    85   class Result(val kind: Kind.Value, val props: List[(String, String)], val result: String) {
    86     override def toString = {
    87       val trees = YXML.parse_body_failsafe(result)
    88       val res =
    89         if (kind == Kind.STATUS) trees.map(_.toString).mkString
    90         else trees.flatMap(XML.content(_).mkString).mkString
    91       if (props.isEmpty)
    92         kind.toString + " [[" + res + "]]"
    93       else
    94         kind.toString + " " +
    95           (for ((x, y) <- props) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    96     }
    97     def is_raw = Kind.is_raw(kind)
    98     def is_control = Kind.is_control(kind)
    99     def is_system = Kind.is_system(kind)
   100   }
   101 
   102   def parse_message(isabelle_system: Isabelle_System, result: Result) =
   103   {
   104     XML.Elem(Markup.MESSAGE, (Markup.CLASS, Kind.markup(result.kind)) :: result.props,
   105       YXML.parse_body_failsafe(isabelle_system.symbols.decode(result.result)))
   106   }
   107 }
   108 
   109 
   110 class Isabelle_Process(isabelle_system: Isabelle_System, receiver: Actor, args: String*)
   111 {
   112   import Isabelle_Process._
   113 
   114 
   115   /* demo constructor */
   116 
   117   def this(args: String*) =
   118     this(new Isabelle_System,
   119       new Actor { def act = loop { react { case res => Console.println(res) } } }.start, args: _*)
   120 
   121 
   122   /* process information */
   123 
   124   @volatile private var proc: Process = null
   125   @volatile private var closing = false
   126   @volatile private var pid: String = null
   127   @volatile private var the_session: String = null
   128   def session = the_session
   129 
   130 
   131   /* results */
   132 
   133   private def put_result(kind: Kind.Value, props: List[(String, String)], result: String)
   134   {
   135     if (kind == Kind.INIT) {
   136       val map = Map(props: _*)
   137       if (map.isDefinedAt(Markup.PID)) pid = map(Markup.PID)
   138       if (map.isDefinedAt(Markup.SESSION)) the_session = map(Markup.SESSION)
   139     }
   140     receiver ! new Result(kind, props, result)
   141   }
   142 
   143 
   144   /* signals */
   145 
   146   def interrupt() = synchronized {
   147     if (proc == null) error("Cannot interrupt Isabelle: no process")
   148     if (pid == null) put_result(Kind.SYSTEM, Nil, "Cannot interrupt: unknown pid")
   149     else {
   150       try {
   151         if (isabelle_system.execute(true, "kill", "-INT", pid).waitFor == 0)
   152           put_result(Kind.SIGNAL, Nil, "INT")
   153         else
   154           put_result(Kind.SYSTEM, Nil, "Cannot interrupt: kill command failed")
   155       }
   156       catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
   157     }
   158   }
   159 
   160   def kill() = synchronized {
   161     if (proc == 0) error("Cannot kill Isabelle: no process")
   162     else {
   163       try_close()
   164       Thread.sleep(500)
   165       put_result(Kind.SIGNAL, Nil, "KILL")
   166       proc.destroy
   167       proc = null
   168       pid = null
   169     }
   170   }
   171 
   172 
   173   /* output being piped into the process */
   174 
   175   private val output = new LinkedBlockingQueue[String]
   176 
   177   private def output_raw(text: String) = synchronized {
   178     if (proc == null) error("Cannot output to Isabelle: no process")
   179     if (closing) error("Cannot output to Isabelle: already closing")
   180     output.put(text)
   181   }
   182 
   183   def output_sync(text: String) =
   184     output_raw(" \\<^sync>\n; " + text + " \\<^sync>;\n")
   185 
   186 
   187   def command(text: String) =
   188     output_sync("Isabelle.command " + Isabelle_Syntax.encode_string(text))
   189 
   190   def command(props: List[(String, String)], text: String) =
   191     output_sync("Isabelle.command " + Isabelle_Syntax.encode_properties(props) + " " +
   192       Isabelle_Syntax.encode_string(text))
   193 
   194   def ML(text: String) =
   195     output_sync("ML_val " + Isabelle_Syntax.encode_string(text))
   196 
   197   def close() = synchronized {    // FIXME watchdog/timeout
   198     output_raw("\u0000")
   199     closing = true
   200   }
   201 
   202   def try_close() = synchronized {
   203     if (proc != null && !closing) {
   204       try { close() }
   205       catch { case _: RuntimeException => }
   206     }
   207   }
   208 
   209 
   210   /* stdin */
   211 
   212   private class StdinThread(out_stream: OutputStream) extends Thread("isabelle: stdin") {
   213     override def run() = {
   214       val writer = new BufferedWriter(new OutputStreamWriter(out_stream, Isabelle_System.charset))
   215       var finished = false
   216       while (!finished) {
   217         try {
   218           //{{{
   219           val s = output.take
   220           if (s == "\u0000") {
   221             writer.close
   222             finished = true
   223           }
   224           else {
   225             put_result(Kind.STDIN, Nil, s)
   226             writer.write(s)
   227             writer.flush
   228           }
   229           //}}}
   230         }
   231         catch {
   232           case e: IOException => put_result(Kind.SYSTEM, Nil, "Stdin thread: " + e.getMessage)
   233         }
   234       }
   235       put_result(Kind.SYSTEM, Nil, "Stdin thread terminated")
   236     }
   237   }
   238 
   239 
   240   /* stdout */
   241 
   242   private class StdoutThread(in_stream: InputStream) extends Thread("isabelle: stdout") {
   243     override def run() = {
   244       val reader = new BufferedReader(new InputStreamReader(in_stream, Isabelle_System.charset))
   245       var result = new StringBuilder(100)
   246 
   247       var finished = false
   248       while (!finished) {
   249         try {
   250           //{{{
   251           var c = -1
   252           var done = false
   253           while (!done && (result.length == 0 || reader.ready)) {
   254             c = reader.read
   255             if (c >= 0) result.append(c.asInstanceOf[Char])
   256             else done = true
   257           }
   258           if (result.length > 0) {
   259             put_result(Kind.STDOUT, Nil, result.toString)
   260             result.length = 0
   261           }
   262           else {
   263             reader.close
   264             finished = true
   265             try_close()
   266           }
   267           //}}}
   268         }
   269         catch {
   270           case e: IOException => put_result(Kind.SYSTEM, Nil, "Stdout thread: " + e.getMessage)
   271         }
   272       }
   273       put_result(Kind.SYSTEM, Nil, "Stdout thread terminated")
   274     }
   275   }
   276 
   277 
   278   /* messages */
   279 
   280   private class MessageThread(fifo: String) extends Thread("isabelle: messages") {
   281     override def run() = {
   282       val reader = isabelle_system.fifo_reader(fifo)
   283       var kind: Kind.Value = null
   284       var props: List[(String, String)] = Nil
   285       var result = new StringBuilder
   286 
   287       var finished = false
   288       while (!finished) {
   289         try {
   290           if (kind == null) {
   291             //{{{ Char mode -- resync
   292             var c = -1
   293             do {
   294               c = reader.read
   295               if (c >= 0 && c != 2) result.append(c.asInstanceOf[Char])
   296             } while (c >= 0 && c != 2)
   297 
   298             if (result.length > 0) {
   299               put_result(Kind.SYSTEM, Nil, "Malformed message:\n" + result.toString)
   300               result.length = 0
   301             }
   302             if (c < 0) {
   303               reader.close
   304               finished = true
   305               try_close()
   306             }
   307             else {
   308               c = reader.read
   309               if (Kind.code.isDefinedAt(c)) kind = Kind.code(c)
   310               else kind = null
   311             }
   312             //}}}
   313           }
   314           else {
   315             //{{{ Line mode
   316             val line = reader.readLine
   317             if (line == null) {
   318               reader.close
   319               finished = true
   320               try_close()
   321             }
   322             else {
   323               val len = line.length
   324               // property
   325               if (line.endsWith("\u0002,")) {
   326                 val i = line.indexOf('=')
   327                 if (i > 0) {
   328                   val name = line.substring(0, i)
   329                   val value = line.substring(i + 1, len - 2)
   330                   props = (name, value) :: props
   331                 }
   332               }
   333               // last text line
   334               else if (line.endsWith("\u0002.")) {
   335                 result.append(line.substring(0, len - 2))
   336                 put_result(kind, props.reverse, result.toString)
   337                 kind = null
   338                 props = Nil
   339                 result.length = 0
   340               }
   341               // text line
   342               else {
   343                 result.append(line)
   344                 result.append('\n')
   345               }
   346             }
   347             //}}}
   348           }
   349         }
   350         catch {
   351           case e: IOException => put_result(Kind.SYSTEM, Nil, "Message thread: " + e.getMessage)
   352         }
   353       }
   354       put_result(Kind.SYSTEM, Nil, "Message thread terminated")
   355     }
   356   }
   357 
   358 
   359 
   360   /** main **/
   361 
   362   {
   363     /* isabelle version */
   364 
   365     {
   366       val (msg, rc) = isabelle_system.isabelle_tool("version")
   367       if (rc != 0) error("Version check failed -- bad Isabelle installation:\n" + msg)
   368       put_result(Kind.SYSTEM, Nil, msg)
   369     }
   370 
   371 
   372     /* messages */
   373 
   374     val message_fifo = isabelle_system.mk_fifo()
   375     def rm_fifo() = isabelle_system.rm_fifo(message_fifo)
   376 
   377     val message_thread = new MessageThread(message_fifo)
   378     message_thread.start
   379 
   380 
   381     /* exec process */
   382 
   383     try {
   384       val cmdline =
   385         List(isabelle_system.getenv_strict("ISABELLE_PROCESS"), "-W", message_fifo) ++ args
   386       proc = isabelle_system.execute(true, cmdline: _*)
   387     }
   388     catch {
   389       case e: IOException =>
   390         rm_fifo()
   391         error("Failed to execute Isabelle process: " + e.getMessage)
   392     }
   393 
   394 
   395     /* stdin/stdout */
   396 
   397     new StdinThread(proc.getOutputStream).start
   398     new StdoutThread(proc.getInputStream).start
   399 
   400 
   401     /* exit */
   402 
   403     new Thread("isabelle: exit") {
   404       override def run() = {
   405         val rc = proc.waitFor()
   406         Thread.sleep(300)
   407         put_result(Kind.SYSTEM, Nil, "Exit thread terminated")
   408         put_result(Kind.EXIT, Nil, Integer.toString(rc))
   409         rm_fifo()
   410       }
   411     }.start
   412 
   413   }
   414 }