src/Pure/System/isabelle_process.scala
changeset 38259 2b61c5e27399
parent 38253 3d4e521014f7
child 38262 bb2df73fab2c
     1.1 --- a/src/Pure/System/isabelle_process.scala	Tue Aug 10 12:09:53 2010 +0200
     1.2 +++ b/src/Pure/System/isabelle_process.scala	Tue Aug 10 12:29:11 2010 +0200
     1.3 @@ -38,6 +38,7 @@
     1.4        kind == Markup.EXIT
     1.5      def is_system(kind: String) =
     1.6        kind == Markup.SYSTEM ||
     1.7 +      kind == Markup.INPUT ||
     1.8        kind == Markup.STDIN ||
     1.9        kind == Markup.SIGNAL ||
    1.10        kind == Markup.EXIT ||
    1.11 @@ -111,7 +112,7 @@
    1.12  
    1.13    /* signals */
    1.14  
    1.15 -  def interrupt() = synchronized {
    1.16 +  def interrupt() = synchronized {  // FIXME avoid synchronized
    1.17      if (proc == null) error("Cannot interrupt Isabelle: no process")
    1.18      if (pid == null) put_result(Markup.SYSTEM, "Cannot interrupt: unknown pid")
    1.19      else {
    1.20 @@ -125,7 +126,7 @@
    1.21      }
    1.22    }
    1.23  
    1.24 -  def kill() = synchronized {
    1.25 +  def kill() = synchronized {  // FIXME avoid synchronized
    1.26      if (proc == 0) error("Cannot kill Isabelle: no process")
    1.27      else {
    1.28        try_close()
    1.29 @@ -138,85 +139,45 @@
    1.30    }
    1.31  
    1.32  
    1.33 -  /* output being piped into the process */
    1.34 -
    1.35 -  private val output = new LinkedBlockingQueue[String]
    1.36 -
    1.37 -  private def output_raw(text: String) = synchronized {
    1.38 -    if (proc == null) error("Cannot output to Isabelle: no process")
    1.39 -    if (closing) error("Cannot output to Isabelle: already closing")
    1.40 -    output.put(text)
    1.41 -  }
    1.42  
    1.43 -  def output_sync(text: String) =
    1.44 -    output_raw(" \\<^sync>\n; " + text + " \\<^sync>;\n")
    1.45 -
    1.46 -
    1.47 -  def command(text: String) =
    1.48 -    output_sync("Isabelle.command " + Isabelle_Syntax.encode_string(text))
    1.49 +  /** stream actors **/
    1.50  
    1.51 -  def command(props: List[(String, String)], text: String) =
    1.52 -    output_sync("Isabelle.command " + Isabelle_Syntax.encode_properties(props) + " " +
    1.53 -      Isabelle_Syntax.encode_string(text))
    1.54 -
    1.55 -  def ML_val(text: String) =
    1.56 -    output_sync("ML_val " + Isabelle_Syntax.encode_string(text))
    1.57 +  /* input */
    1.58  
    1.59 -  def ML_command(text: String) =
    1.60 -    output_sync("ML_command " + Isabelle_Syntax.encode_string(text))
    1.61 -
    1.62 -  def close() = synchronized {    // FIXME watchdog/timeout
    1.63 -    output_raw("\u0000")
    1.64 -    closing = true
    1.65 -  }
    1.66 +  case class Input(cmd: String)
    1.67 +  case object Close
    1.68  
    1.69 -  def try_close() = synchronized {
    1.70 -    if (proc != null && !closing) {
    1.71 -      try { close() }
    1.72 -      catch { case _: RuntimeException => }
    1.73 -    }
    1.74 -  }
    1.75 -
    1.76 -
    1.77 -  /* commands */
    1.78 -
    1.79 -  private class Command_Thread(fifo: String) extends Thread("isabelle: commands")
    1.80 -  {
    1.81 -    override def run()
    1.82 -    {
    1.83 -      val stream = system.fifo_output_stream(fifo)
    1.84 +  private def input_actor(name: String, kind: String, stream: => OutputStream): Actor =
    1.85 +    Library.thread_actor(name) {
    1.86        val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset))
    1.87        var finished = false
    1.88        while (!finished) {
    1.89          try {
    1.90            //{{{
    1.91 -          val s = output.take
    1.92 -          if (s == "\u0000") {
    1.93 -            writer.close
    1.94 -            finished = true
    1.95 -          }
    1.96 -          else {
    1.97 -            put_result(Markup.STDIN, s)
    1.98 -            writer.write(s)
    1.99 -            writer.flush
   1.100 +          receive {
   1.101 +            case Input(text) =>
   1.102 +              put_result(kind, text)
   1.103 +              writer.write(text)
   1.104 +              writer.flush
   1.105 +            case Close =>
   1.106 +              writer.close
   1.107 +              finished = true
   1.108 +            case bad => System.err.println(name + ": ignoring bad message " + bad)
   1.109            }
   1.110            //}}}
   1.111          }
   1.112          catch {
   1.113 -          case e: IOException => put_result(Markup.SYSTEM, "Command thread: " + e.getMessage)
   1.114 +          case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
   1.115          }
   1.116        }
   1.117 -      put_result(Markup.SYSTEM, "Command thread terminated")
   1.118 +      put_result(Markup.SYSTEM, name + " terminated")
   1.119      }
   1.120 -  }
   1.121  
   1.122  
   1.123 -  /* raw stdout */
   1.124 +  /* raw output */
   1.125  
   1.126 -  private class Stdout_Thread(stream: InputStream) extends Thread("isabelle: stdout")
   1.127 -  {
   1.128 -    override def run() =
   1.129 -    {
   1.130 +  private def output_actor(name: String, kind: String, stream: => InputStream): Actor =
   1.131 +    Library.thread_actor(name) {
   1.132        val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset))
   1.133        var result = new StringBuilder(100)
   1.134  
   1.135 @@ -232,7 +193,7 @@
   1.136              else done = true
   1.137            }
   1.138            if (result.length > 0) {
   1.139 -            put_result(Markup.STDOUT, result.toString)
   1.140 +            put_result(kind, result.toString)
   1.141              result.length = 0
   1.142            }
   1.143            else {
   1.144 @@ -243,22 +204,19 @@
   1.145            //}}}
   1.146          }
   1.147          catch {
   1.148 -          case e: IOException => put_result(Markup.SYSTEM, "Stdout thread: " + e.getMessage)
   1.149 +          case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
   1.150          }
   1.151        }
   1.152 -      put_result(Markup.SYSTEM, "Stdout thread terminated")
   1.153 +      put_result(Markup.SYSTEM, name + " terminated")
   1.154      }
   1.155 -  }
   1.156  
   1.157  
   1.158 -  /* messages */
   1.159 +  /* message output */
   1.160  
   1.161 -  private class Message_Thread(fifo: String) extends Thread("isabelle: messages")
   1.162 -  {
   1.163 -    private class Protocol_Error(msg: String) extends Exception(msg)
   1.164 -    override def run()
   1.165 -    {
   1.166 -      val stream = system.fifo_input_stream(fifo)
   1.167 +  private class Protocol_Error(msg: String) extends Exception(msg)
   1.168 +
   1.169 +  private def message_actor(name: String, stream: InputStream): Actor =
   1.170 +    Library.thread_actor(name) {
   1.171        val default_buffer = new Array[Byte](65536)
   1.172        var c = -1
   1.173  
   1.174 @@ -325,54 +283,83 @@
   1.175        stream.close
   1.176        try_close()
   1.177  
   1.178 -      put_result(Markup.SYSTEM, "Message thread terminated")
   1.179 +      put_result(Markup.SYSTEM, name + " terminated")
   1.180      }
   1.181 -  }
   1.182  
   1.183  
   1.184  
   1.185 -  /** main **/
   1.186 +  /** init **/
   1.187 +
   1.188 +  /* exec process */
   1.189  
   1.190 -  {
   1.191 -    /* private communication channels */
   1.192 +  private val in_fifo = system.mk_fifo()
   1.193 +  private val out_fifo = system.mk_fifo()
   1.194 +  private def rm_fifos() = { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
   1.195  
   1.196 -    val in_fifo = system.mk_fifo()
   1.197 -    val out_fifo = system.mk_fifo()
   1.198 -    def rm_fifos() = { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
   1.199 +  try {
   1.200 +    val cmdline =
   1.201 +      List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
   1.202 +    proc = system.execute(true, cmdline: _*)
   1.203 +  }
   1.204 +  catch {
   1.205 +    case e: IOException =>
   1.206 +      rm_fifos()
   1.207 +      error("Failed to execute Isabelle process: " + e.getMessage)
   1.208 +  }
   1.209  
   1.210 -    val command_thread = new Command_Thread(in_fifo)
   1.211 -    val message_thread = new Message_Thread(out_fifo)
   1.212 +
   1.213 +  /* exit process */
   1.214  
   1.215 -    command_thread.start
   1.216 -    message_thread.start
   1.217 +  Library.thread_actor("process_exit") {
   1.218 +    val rc = proc.waitFor()
   1.219 +    Thread.sleep(300)  // FIXME property!?
   1.220 +    put_result(Markup.SYSTEM, "process_exit terminated")
   1.221 +    put_result(Markup.EXIT, rc.toString)
   1.222 +    rm_fifos()
   1.223 +  }
   1.224  
   1.225  
   1.226 -    /* exec process */
   1.227 +  /* I/O actors */
   1.228 +
   1.229 +  private val standard_input =
   1.230 +    input_actor("standard_input", Markup.STDIN, proc.getOutputStream)
   1.231  
   1.232 -    try {
   1.233 -      val cmdline =
   1.234 -        List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
   1.235 -      proc = system.execute(true, cmdline: _*)
   1.236 -    }
   1.237 -    catch {
   1.238 -      case e: IOException =>
   1.239 -        rm_fifos()
   1.240 -        error("Failed to execute Isabelle process: " + e.getMessage)
   1.241 -    }
   1.242 -    new Stdout_Thread(proc.getInputStream).start
   1.243 +  private val command_input =
   1.244 +    input_actor("command_input", Markup.INPUT, system.fifo_output_stream(in_fifo))
   1.245 +
   1.246 +  output_actor("standard_output", Markup.STDOUT, proc.getInputStream)
   1.247 +  message_actor("message_output", system.fifo_input_stream(out_fifo))
   1.248 +
   1.249  
   1.250  
   1.251 -    /* exit */
   1.252 +  /** main methods **/
   1.253 +
   1.254 +  def input_raw(text: String) = standard_input ! Input(text)
   1.255 +
   1.256 +  def input(text: String) = synchronized {  // FIXME avoid synchronized
   1.257 +    if (proc == null) error("Cannot output to Isabelle: no process")
   1.258 +    if (closing) error("Cannot output to Isabelle: already closing")
   1.259 +    command_input ! Input(" \\<^sync>\n; " + text + " \\<^sync>;\n")
   1.260 +  }
   1.261 +
   1.262 +  def command(text: String) = input("Isabelle.command " + Isabelle_Syntax.encode_string(text))
   1.263  
   1.264 -    new Thread("isabelle: exit") {
   1.265 -      override def run()
   1.266 -      {
   1.267 -        val rc = proc.waitFor()
   1.268 -        Thread.sleep(300)  // FIXME property!?
   1.269 -        put_result(Markup.SYSTEM, "Exit thread terminated")
   1.270 -        put_result(Markup.EXIT, rc.toString)
   1.271 -        rm_fifos()
   1.272 -      }
   1.273 -    }.start
   1.274 +  def command(props: List[(String, String)], text: String) =
   1.275 +    input("Isabelle.command " + Isabelle_Syntax.encode_properties(props) + " " +
   1.276 +      Isabelle_Syntax.encode_string(text))
   1.277 +
   1.278 +  def ML_val(text: String) = input("ML_val " + Isabelle_Syntax.encode_string(text))
   1.279 +  def ML_command(text: String) = input("ML_command " + Isabelle_Syntax.encode_string(text))
   1.280 +
   1.281 +  def close() = synchronized {    // FIXME avoid synchronized
   1.282 +    command_input ! Close
   1.283 +    closing = true
   1.284 +  }
   1.285 +
   1.286 +  def try_close() = synchronized {
   1.287 +    if (proc != null && !closing) {
   1.288 +      try { close() }
   1.289 +      catch { case _: RuntimeException => }
   1.290 +    }
   1.291    }
   1.292  }