refined Isabelle_Process startup: emit \002 before rendezvous on fifos, more robust treatment of startup failure with timeout, do not quit() after main loop;
authorwenzelm
Sun Sep 19 22:40:22 2010 +0200 (2010-09-19 ago)
changeset 39528c01d89d18ff0
parent 39527 f03a9c57760a
child 39529 4e466a5f67f3
refined Isabelle_Process startup: emit \002 before rendezvous on fifos, more robust treatment of startup failure with timeout, do not quit() after main loop;
tuned;
src/Pure/System/isabelle_process.ML
src/Pure/System/isabelle_process.scala
src/Pure/System/session.scala
     1.1 --- a/src/Pure/System/isabelle_process.ML	Sun Sep 19 22:20:48 2010 +0200
     1.2 +++ b/src/Pure/System/isabelle_process.ML	Sun Sep 19 22:40:22 2010 +0200
     1.3 @@ -3,6 +3,16 @@
     1.4  
     1.5  Isabelle process wrapper, based on private fifos for maximum
     1.6  robustness and performance.
     1.7 +
     1.8 +Startup phases:
     1.9 +  . raw Posix process startup with uncontrolled output on stdout/stderr
    1.10 +  . stdout \002: ML running
    1.11 +  .. stdin/stdout/stderr freely available (raw ML loop)
    1.12 +  .. protocol thread initialization
    1.13 +  ... switch to in_fifo/out_fifo channels (rendezvous via open)
    1.14 +  ... out_fifo INIT(pid): channels ready
    1.15 +  ... out_fifo STATUS(keywords)
    1.16 +  ... out_fifo READY: main loop ready
    1.17  *)
    1.18  
    1.19  signature ISABELLE_PROCESS =
    1.20 @@ -166,17 +176,21 @@
    1.21  
    1.22  (* init *)
    1.23  
    1.24 -fun init in_fifo out_fifo =
    1.25 +fun init in_fifo out_fifo = ignore (Simple_Thread.fork false (fn () =>
    1.26    let
    1.27 +    val _ = OS.Process.sleep (Time.fromMilliseconds 500);  (*yield to raw ML toplevel*)
    1.28 +    val _ = Output.std_output Symbol.STX;
    1.29 +
    1.30 +    val _ = quick_and_dirty := true;  (* FIXME !? *)
    1.31 +    val _ = Context.set_thread_data NONE;
    1.32      val _ = Unsynchronized.change print_mode
    1.33        (fold (update op =) [isabelle_processN, Keyword.keyword_statusN, Pretty.symbolicN]);
    1.34 +
    1.35      val (in_stream, out_stream) = setup_channels in_fifo out_fifo;
    1.36      val _ = init_message out_stream;
    1.37 -    val _ = quick_and_dirty := true;  (* FIXME !? *)
    1.38      val _ = Keyword.status ();
    1.39      val _ = Output.status (Markup.markup Markup.ready "");
    1.40 -    val _ = Context.set_thread_data NONE;
    1.41 -    val _ = Simple_Thread.fork false (fn () => (loop in_stream; quit ()));
    1.42 -  in () end;
    1.43 +  in loop in_stream end));
    1.44  
    1.45  end;
    1.46 +
     2.1 --- a/src/Pure/System/isabelle_process.scala	Sun Sep 19 22:20:48 2010 +0200
     2.2 +++ b/src/Pure/System/isabelle_process.scala	Sun Sep 19 22:40:22 2010 +0200
     2.3 @@ -60,7 +60,7 @@
     2.4  }
     2.5  
     2.6  
     2.7 -class Isabelle_Process(system: Isabelle_System, receiver: Actor, args: String*)
     2.8 +class Isabelle_Process(system: Isabelle_System, timeout: Int, receiver: Actor, args: String*)
     2.9  {
    2.10    import Isabelle_Process._
    2.11  
    2.12 @@ -68,14 +68,81 @@
    2.13    /* demo constructor */
    2.14  
    2.15    def this(args: String*) =
    2.16 -    this(new Isabelle_System,
    2.17 +    this(new Isabelle_System, 10000,
    2.18        actor { loop { react { case res => Console.println(res) } } }, args: _*)
    2.19  
    2.20  
    2.21 -  /* process information */
    2.22 +  /* input actors */
    2.23 +
    2.24 +  private case class Input_Text(text: String)
    2.25 +  private case class Input_Chunks(chunks: List[Array[Byte]])
    2.26 +
    2.27 +  private case object Close
    2.28 +  private def close(a: Actor) { if (a != null) a ! Close }
    2.29 +
    2.30 +  @volatile private var standard_input: Actor = null
    2.31 +  @volatile private var command_input: Actor = null
    2.32 +
    2.33 +
    2.34 +  /* process manager */
    2.35 +
    2.36 +  private val in_fifo = system.mk_fifo()
    2.37 +  private val out_fifo = system.mk_fifo()
    2.38 +  private def rm_fifos() { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
    2.39 +
    2.40 +  private val proc =
    2.41 +    try {
    2.42 +      val cmdline =
    2.43 +        List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
    2.44 +      system.execute(true, cmdline: _*)
    2.45 +    }
    2.46 +    catch { case e: IOException => rm_fifos(); throw(e) }
    2.47 +
    2.48 +  private val stdout_reader =
    2.49 +    new BufferedReader(new InputStreamReader(proc.getInputStream, Standard_System.charset))
    2.50 +
    2.51 +  private val stdin_writer =
    2.52 +    new BufferedWriter(new OutputStreamWriter(proc.getOutputStream, Standard_System.charset))
    2.53  
    2.54 -  @volatile private var proc: Option[Process] = None
    2.55 -  @volatile private var pid: Option[String] = None
    2.56 +  Simple_Thread.actor("process_manager") {
    2.57 +    val (startup_failed, startup_output) =
    2.58 +    {
    2.59 +      val expired = System.currentTimeMillis() + timeout
    2.60 +      val result = new StringBuilder(100)
    2.61 +
    2.62 +      var finished = false
    2.63 +      while (!finished && System.currentTimeMillis() <= expired) {
    2.64 +        while (!finished && stdout_reader.ready) {
    2.65 +          val c = stdout_reader.read
    2.66 +          if (c == 2) finished = true
    2.67 +          else result += c.toChar
    2.68 +        }
    2.69 +        Thread.sleep(10)
    2.70 +      }
    2.71 +      (!finished, result.toString)
    2.72 +    }
    2.73 +    if (startup_failed) {
    2.74 +      put_result(Markup.STDOUT, startup_output)
    2.75 +      put_result(Markup.EXIT, "127")
    2.76 +      stdin_writer.close
    2.77 +      Thread.sleep(300)  // FIXME !?
    2.78 +      proc.destroy  // FIXME reliable!?
    2.79 +    }
    2.80 +    else {
    2.81 +      put_result(Markup.SYSTEM, startup_output)
    2.82 +
    2.83 +      standard_input = stdin_actor()
    2.84 +      stdout_actor()
    2.85 +      command_input = input_actor()
    2.86 +      message_actor()
    2.87 +
    2.88 +      val rc = proc.waitFor()
    2.89 +      Thread.sleep(300)  // FIXME !?
    2.90 +      system_result("Isabelle process terminated")
    2.91 +      put_result(Markup.EXIT, rc.toString)
    2.92 +    }
    2.93 +    rm_fifos()
    2.94 +  }
    2.95  
    2.96  
    2.97    /* results */
    2.98 @@ -110,34 +177,33 @@
    2.99  
   2.100    /* signals */
   2.101  
   2.102 +  @volatile private var pid: Option[String] = None
   2.103 +
   2.104    def interrupt()
   2.105    {
   2.106 -    if (proc.isEmpty) system_result("Cannot interrupt Isabelle: no process")
   2.107 -    else
   2.108 -      pid match {
   2.109 -        case None => system_result("Cannot interrupt Isabelle: unknowd pid")
   2.110 -        case Some(i) =>
   2.111 -          try {
   2.112 -            if (system.execute(true, "kill", "-INT", i).waitFor == 0)
   2.113 -              system_result("Interrupt Isabelle")
   2.114 -            else
   2.115 -              system_result("Cannot interrupt Isabelle: kill command failed")
   2.116 -          }
   2.117 -          catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
   2.118 -      }
   2.119 +    pid match {
   2.120 +      case None => system_result("Cannot interrupt Isabelle: unknowd pid")
   2.121 +      case Some(i) =>
   2.122 +        try {
   2.123 +          if (system.execute(true, "kill", "-INT", i).waitFor == 0)
   2.124 +            system_result("Interrupt Isabelle")
   2.125 +          else
   2.126 +            system_result("Cannot interrupt Isabelle: kill command failed")
   2.127 +        }
   2.128 +        catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
   2.129 +    }
   2.130    }
   2.131  
   2.132    def kill()
   2.133    {
   2.134 -    proc match {
   2.135 -      case None => system_result("Cannot kill Isabelle: no process")
   2.136 -      case Some(p) =>
   2.137 -        close()
   2.138 -        Thread.sleep(500)  // FIXME !?
   2.139 -        system_result("Kill Isabelle")
   2.140 -        p.destroy
   2.141 -        proc = None
   2.142 -        pid = None
   2.143 +    val running =
   2.144 +      try { proc.exitValue; false }
   2.145 +      catch { case e: java.lang.IllegalThreadStateException => true }
   2.146 +    if (running) {
   2.147 +      close()
   2.148 +      Thread.sleep(500)  // FIXME !?
   2.149 +      system_result("Kill Isabelle")
   2.150 +      proc.destroy
   2.151      }
   2.152    }
   2.153  
   2.154 @@ -145,26 +211,22 @@
   2.155  
   2.156    /** stream actors **/
   2.157  
   2.158 -  private case class Input_Text(text: String)
   2.159 -  private case class Input_Chunks(chunks: List[Array[Byte]])
   2.160 -  private case object Close
   2.161 -
   2.162 -
   2.163    /* raw stdin */
   2.164  
   2.165 -  private def stdin_actor(name: String, stream: OutputStream): Actor =
   2.166 +  private def stdin_actor(): Actor =
   2.167 +  {
   2.168 +    val name = "standard_input"
   2.169      Simple_Thread.actor(name) {
   2.170 -      val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset))
   2.171        var finished = false
   2.172        while (!finished) {
   2.173          try {
   2.174            //{{{
   2.175            receive {
   2.176              case Input_Text(text) =>
   2.177 -              writer.write(text)
   2.178 -              writer.flush
   2.179 +              stdin_writer.write(text)
   2.180 +              stdin_writer.flush
   2.181              case Close =>
   2.182 -              writer.close
   2.183 +              stdin_writer.close
   2.184                finished = true
   2.185              case bad => System.err.println(name + ": ignoring bad message " + bad)
   2.186            }
   2.187 @@ -174,13 +236,15 @@
   2.188        }
   2.189        system_result(name + " terminated")
   2.190      }
   2.191 +  }
   2.192  
   2.193  
   2.194    /* raw stdout */
   2.195  
   2.196 -  private def stdout_actor(name: String, stream: InputStream): Actor =
   2.197 +  private def stdout_actor(): Actor =
   2.198 +  {
   2.199 +    val name = "standard_output"
   2.200      Simple_Thread.actor(name) {
   2.201 -      val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset))
   2.202        var result = new StringBuilder(100)
   2.203  
   2.204        var finished = false
   2.205 @@ -189,8 +253,8 @@
   2.206            //{{{
   2.207            var c = -1
   2.208            var done = false
   2.209 -          while (!done && (result.length == 0 || reader.ready)) {
   2.210 -            c = reader.read
   2.211 +          while (!done && (result.length == 0 || stdout_reader.ready)) {
   2.212 +            c = stdout_reader.read
   2.213              if (c >= 0) result.append(c.asInstanceOf[Char])
   2.214              else done = true
   2.215            }
   2.216 @@ -199,9 +263,8 @@
   2.217              result.length = 0
   2.218            }
   2.219            else {
   2.220 -            reader.close
   2.221 +            stdout_reader.close
   2.222              finished = true
   2.223 -            close()
   2.224            }
   2.225            //}}}
   2.226          }
   2.227 @@ -209,13 +272,16 @@
   2.228        }
   2.229        system_result(name + " terminated")
   2.230      }
   2.231 +  }
   2.232  
   2.233  
   2.234    /* command input */
   2.235  
   2.236 -  private def input_actor(name: String, fifo: String): Actor =
   2.237 +  private def input_actor(): Actor =
   2.238 +  {
   2.239 +    val name = "command_input"
   2.240      Simple_Thread.actor(name) {
   2.241 -      val stream = new BufferedOutputStream(system.fifo_output_stream(fifo))  // FIXME potentially blocking forever
   2.242 +      val stream = new BufferedOutputStream(system.fifo_output_stream(in_fifo))
   2.243        var finished = false
   2.244        while (!finished) {
   2.245          try {
   2.246 @@ -237,17 +303,19 @@
   2.247        }
   2.248        system_result(name + " terminated")
   2.249      }
   2.250 +  }
   2.251  
   2.252  
   2.253    /* message output */
   2.254  
   2.255 -  private def message_actor(name: String, fifo: String): Actor =
   2.256 +  private def message_actor(): Actor =
   2.257    {
   2.258      class EOF extends Exception
   2.259      class Protocol_Error(msg: String) extends Exception(msg)
   2.260  
   2.261 +    val name = "message_output"
   2.262      Simple_Thread.actor(name) {
   2.263 -      val stream = system.fifo_input_stream(fifo)  // FIXME potentially blocking forever
   2.264 +      val stream = system.fifo_input_stream(out_fifo)
   2.265        val default_buffer = new Array[Byte](65536)
   2.266        var c = -1
   2.267  
   2.268 @@ -300,55 +368,12 @@
   2.269          }
   2.270        } while (c != -1)
   2.271        stream.close
   2.272 -      close()
   2.273  
   2.274        system_result(name + " terminated")
   2.275      }
   2.276    }
   2.277  
   2.278  
   2.279 -
   2.280 -  /** init **/
   2.281 -
   2.282 -  /* exec process */
   2.283 -
   2.284 -  private val in_fifo = system.mk_fifo()
   2.285 -  private val out_fifo = system.mk_fifo()
   2.286 -  private def rm_fifos() { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
   2.287 -
   2.288 -  try {
   2.289 -    val cmdline =
   2.290 -      List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
   2.291 -    proc = Some(system.execute(true, cmdline: _*))
   2.292 -  }
   2.293 -  catch { case e: IOException => rm_fifos(); throw(e) }
   2.294 -
   2.295 -
   2.296 -  /* I/O actors */
   2.297 -
   2.298 -  private val command_input = input_actor("command_input", in_fifo)
   2.299 -  message_actor("message_output", out_fifo)
   2.300 -
   2.301 -  private val standard_input = stdin_actor("standard_input", proc.get.getOutputStream)
   2.302 -  stdout_actor("standard_output", proc.get.getInputStream)
   2.303 -
   2.304 -
   2.305 -  /* exit process */
   2.306 -
   2.307 -  Simple_Thread.actor("process_exit") {
   2.308 -    proc match {
   2.309 -      case None =>
   2.310 -      case Some(p) =>
   2.311 -        val rc = p.waitFor()
   2.312 -        Thread.sleep(300)  // FIXME property!?
   2.313 -        system_result("Isabelle process terminated")
   2.314 -        put_result(Markup.EXIT, rc.toString)
   2.315 -    }
   2.316 -    rm_fifos()
   2.317 -  }
   2.318 -
   2.319 -
   2.320 -
   2.321    /** main methods **/
   2.322  
   2.323    def input_raw(text: String): Unit = standard_input ! Input_Text(text)
   2.324 @@ -359,5 +384,5 @@
   2.325    def input(name: String, args: String*): Unit =
   2.326      input_bytes(name, args.map(Standard_System.string_bytes): _*)
   2.327  
   2.328 -  def close(): Unit = { standard_input ! Close; command_input ! Close }
   2.329 +  def close(): Unit = { close(command_input); close(standard_input) }
   2.330  }
     3.1 --- a/src/Pure/System/session.scala	Sun Sep 19 22:20:48 2010 +0200
     3.2 +++ b/src/Pure/System/session.scala	Sun Sep 19 22:40:22 2010 +0200
     3.3 @@ -270,7 +270,7 @@
     3.4  
     3.5          case Started(timeout, args) =>
     3.6            if (prover == null) {
     3.7 -            prover = new Isabelle_Process(system, self, args:_*) with Isar_Document
     3.8 +            prover = new Isabelle_Process(system, timeout, self, args:_*) with Isar_Document
     3.9              val origin = sender
    3.10              val opt_err = prover_startup(timeout)
    3.11              if (opt_err.isDefined) prover = null