refined Isabelle_Process startup: emit \002 before rendezvous on fifos, more robust treatment of startup failure with timeout, do not quit() after main loop;
authorwenzelm
Sun, 19 Sep 2010 22:40:22 +0200
changeset 39528 c01d89d18ff0
parent 39527 f03a9c57760a
child 39529 4e466a5f67f3
refined Isabelle_Process startup: emit \002 before rendezvous on fifos, more robust treatment of startup failure with timeout, do not quit() after main loop; tuned;
src/Pure/System/isabelle_process.ML
src/Pure/System/isabelle_process.scala
src/Pure/System/session.scala
--- a/src/Pure/System/isabelle_process.ML	Sun Sep 19 22:20:48 2010 +0200
+++ b/src/Pure/System/isabelle_process.ML	Sun Sep 19 22:40:22 2010 +0200
@@ -3,6 +3,16 @@
 
 Isabelle process wrapper, based on private fifos for maximum
 robustness and performance.
+
+Startup phases:
+  . raw Posix process startup with uncontrolled output on stdout/stderr
+  . stdout \002: ML running
+  .. stdin/stdout/stderr freely available (raw ML loop)
+  .. protocol thread initialization
+  ... switch to in_fifo/out_fifo channels (rendezvous via open)
+  ... out_fifo INIT(pid): channels ready
+  ... out_fifo STATUS(keywords)
+  ... out_fifo READY: main loop ready
 *)
 
 signature ISABELLE_PROCESS =
@@ -166,17 +176,21 @@
 
 (* init *)
 
-fun init in_fifo out_fifo =
+fun init in_fifo out_fifo = ignore (Simple_Thread.fork false (fn () =>
   let
+    val _ = OS.Process.sleep (Time.fromMilliseconds 500);  (*yield to raw ML toplevel*)
+    val _ = Output.std_output Symbol.STX;
+
+    val _ = quick_and_dirty := true;  (* FIXME !? *)
+    val _ = Context.set_thread_data NONE;
     val _ = Unsynchronized.change print_mode
       (fold (update op =) [isabelle_processN, Keyword.keyword_statusN, Pretty.symbolicN]);
+
     val (in_stream, out_stream) = setup_channels in_fifo out_fifo;
     val _ = init_message out_stream;
-    val _ = quick_and_dirty := true;  (* FIXME !? *)
     val _ = Keyword.status ();
     val _ = Output.status (Markup.markup Markup.ready "");
-    val _ = Context.set_thread_data NONE;
-    val _ = Simple_Thread.fork false (fn () => (loop in_stream; quit ()));
-  in () end;
+  in loop in_stream end));
 
 end;
+
--- a/src/Pure/System/isabelle_process.scala	Sun Sep 19 22:20:48 2010 +0200
+++ b/src/Pure/System/isabelle_process.scala	Sun Sep 19 22:40:22 2010 +0200
@@ -60,7 +60,7 @@
 }
 
 
-class Isabelle_Process(system: Isabelle_System, receiver: Actor, args: String*)
+class Isabelle_Process(system: Isabelle_System, timeout: Int, receiver: Actor, args: String*)
 {
   import Isabelle_Process._
 
@@ -68,14 +68,81 @@
   /* demo constructor */
 
   def this(args: String*) =
-    this(new Isabelle_System,
+    this(new Isabelle_System, 10000,
       actor { loop { react { case res => Console.println(res) } } }, args: _*)
 
 
-  /* process information */
+  /* input actors */
+
+  private case class Input_Text(text: String)
+  private case class Input_Chunks(chunks: List[Array[Byte]])
+
+  private case object Close
+  private def close(a: Actor) { if (a != null) a ! Close }
+
+  @volatile private var standard_input: Actor = null
+  @volatile private var command_input: Actor = null
+
+
+  /* process manager */
+
+  private val in_fifo = system.mk_fifo()
+  private val out_fifo = system.mk_fifo()
+  private def rm_fifos() { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
+
+  private val proc =
+    try {
+      val cmdline =
+        List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
+      system.execute(true, cmdline: _*)
+    }
+    catch { case e: IOException => rm_fifos(); throw(e) }
+
+  private val stdout_reader =
+    new BufferedReader(new InputStreamReader(proc.getInputStream, Standard_System.charset))
+
+  private val stdin_writer =
+    new BufferedWriter(new OutputStreamWriter(proc.getOutputStream, Standard_System.charset))
 
-  @volatile private var proc: Option[Process] = None
-  @volatile private var pid: Option[String] = None
+  Simple_Thread.actor("process_manager") {
+    val (startup_failed, startup_output) =
+    {
+      val expired = System.currentTimeMillis() + timeout
+      val result = new StringBuilder(100)
+
+      var finished = false
+      while (!finished && System.currentTimeMillis() <= expired) {
+        while (!finished && stdout_reader.ready) {
+          val c = stdout_reader.read
+          if (c == 2) finished = true
+          else result += c.toChar
+        }
+        Thread.sleep(10)
+      }
+      (!finished, result.toString)
+    }
+    if (startup_failed) {
+      put_result(Markup.STDOUT, startup_output)
+      put_result(Markup.EXIT, "127")
+      stdin_writer.close
+      Thread.sleep(300)  // FIXME !?
+      proc.destroy  // FIXME reliable!?
+    }
+    else {
+      put_result(Markup.SYSTEM, startup_output)
+
+      standard_input = stdin_actor()
+      stdout_actor()
+      command_input = input_actor()
+      message_actor()
+
+      val rc = proc.waitFor()
+      Thread.sleep(300)  // FIXME !?
+      system_result("Isabelle process terminated")
+      put_result(Markup.EXIT, rc.toString)
+    }
+    rm_fifos()
+  }
 
 
   /* results */
@@ -110,34 +177,33 @@
 
   /* signals */
 
+  @volatile private var pid: Option[String] = None
+
   def interrupt()
   {
-    if (proc.isEmpty) system_result("Cannot interrupt Isabelle: no process")
-    else
-      pid match {
-        case None => system_result("Cannot interrupt Isabelle: unknowd pid")
-        case Some(i) =>
-          try {
-            if (system.execute(true, "kill", "-INT", i).waitFor == 0)
-              system_result("Interrupt Isabelle")
-            else
-              system_result("Cannot interrupt Isabelle: kill command failed")
-          }
-          catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
-      }
+    pid match {
+      case None => system_result("Cannot interrupt Isabelle: unknowd pid")
+      case Some(i) =>
+        try {
+          if (system.execute(true, "kill", "-INT", i).waitFor == 0)
+            system_result("Interrupt Isabelle")
+          else
+            system_result("Cannot interrupt Isabelle: kill command failed")
+        }
+        catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
+    }
   }
 
   def kill()
   {
-    proc match {
-      case None => system_result("Cannot kill Isabelle: no process")
-      case Some(p) =>
-        close()
-        Thread.sleep(500)  // FIXME !?
-        system_result("Kill Isabelle")
-        p.destroy
-        proc = None
-        pid = None
+    val running =
+      try { proc.exitValue; false }
+      catch { case e: java.lang.IllegalThreadStateException => true }
+    if (running) {
+      close()
+      Thread.sleep(500)  // FIXME !?
+      system_result("Kill Isabelle")
+      proc.destroy
     }
   }
 
@@ -145,26 +211,22 @@
 
   /** stream actors **/
 
-  private case class Input_Text(text: String)
-  private case class Input_Chunks(chunks: List[Array[Byte]])
-  private case object Close
-
-
   /* raw stdin */
 
-  private def stdin_actor(name: String, stream: OutputStream): Actor =
+  private def stdin_actor(): Actor =
+  {
+    val name = "standard_input"
     Simple_Thread.actor(name) {
-      val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset))
       var finished = false
       while (!finished) {
         try {
           //{{{
           receive {
             case Input_Text(text) =>
-              writer.write(text)
-              writer.flush
+              stdin_writer.write(text)
+              stdin_writer.flush
             case Close =>
-              writer.close
+              stdin_writer.close
               finished = true
             case bad => System.err.println(name + ": ignoring bad message " + bad)
           }
@@ -174,13 +236,15 @@
       }
       system_result(name + " terminated")
     }
+  }
 
 
   /* raw stdout */
 
-  private def stdout_actor(name: String, stream: InputStream): Actor =
+  private def stdout_actor(): Actor =
+  {
+    val name = "standard_output"
     Simple_Thread.actor(name) {
-      val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset))
       var result = new StringBuilder(100)
 
       var finished = false
@@ -189,8 +253,8 @@
           //{{{
           var c = -1
           var done = false
-          while (!done && (result.length == 0 || reader.ready)) {
-            c = reader.read
+          while (!done && (result.length == 0 || stdout_reader.ready)) {
+            c = stdout_reader.read
             if (c >= 0) result.append(c.asInstanceOf[Char])
             else done = true
           }
@@ -199,9 +263,8 @@
             result.length = 0
           }
           else {
-            reader.close
+            stdout_reader.close
             finished = true
-            close()
           }
           //}}}
         }
@@ -209,13 +272,16 @@
       }
       system_result(name + " terminated")
     }
+  }
 
 
   /* command input */
 
-  private def input_actor(name: String, fifo: String): Actor =
+  private def input_actor(): Actor =
+  {
+    val name = "command_input"
     Simple_Thread.actor(name) {
-      val stream = new BufferedOutputStream(system.fifo_output_stream(fifo))  // FIXME potentially blocking forever
+      val stream = new BufferedOutputStream(system.fifo_output_stream(in_fifo))
       var finished = false
       while (!finished) {
         try {
@@ -237,17 +303,19 @@
       }
       system_result(name + " terminated")
     }
+  }
 
 
   /* message output */
 
-  private def message_actor(name: String, fifo: String): Actor =
+  private def message_actor(): Actor =
   {
     class EOF extends Exception
     class Protocol_Error(msg: String) extends Exception(msg)
 
+    val name = "message_output"
     Simple_Thread.actor(name) {
-      val stream = system.fifo_input_stream(fifo)  // FIXME potentially blocking forever
+      val stream = system.fifo_input_stream(out_fifo)
       val default_buffer = new Array[Byte](65536)
       var c = -1
 
@@ -300,55 +368,12 @@
         }
       } while (c != -1)
       stream.close
-      close()
 
       system_result(name + " terminated")
     }
   }
 
 
-
-  /** init **/
-
-  /* exec process */
-
-  private val in_fifo = system.mk_fifo()
-  private val out_fifo = system.mk_fifo()
-  private def rm_fifos() { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
-
-  try {
-    val cmdline =
-      List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
-    proc = Some(system.execute(true, cmdline: _*))
-  }
-  catch { case e: IOException => rm_fifos(); throw(e) }
-
-
-  /* I/O actors */
-
-  private val command_input = input_actor("command_input", in_fifo)
-  message_actor("message_output", out_fifo)
-
-  private val standard_input = stdin_actor("standard_input", proc.get.getOutputStream)
-  stdout_actor("standard_output", proc.get.getInputStream)
-
-
-  /* exit process */
-
-  Simple_Thread.actor("process_exit") {
-    proc match {
-      case None =>
-      case Some(p) =>
-        val rc = p.waitFor()
-        Thread.sleep(300)  // FIXME property!?
-        system_result("Isabelle process terminated")
-        put_result(Markup.EXIT, rc.toString)
-    }
-    rm_fifos()
-  }
-
-
-
   /** main methods **/
 
   def input_raw(text: String): Unit = standard_input ! Input_Text(text)
@@ -359,5 +384,5 @@
   def input(name: String, args: String*): Unit =
     input_bytes(name, args.map(Standard_System.string_bytes): _*)
 
-  def close(): Unit = { standard_input ! Close; command_input ! Close }
+  def close(): Unit = { close(command_input); close(standard_input) }
 }
--- a/src/Pure/System/session.scala	Sun Sep 19 22:20:48 2010 +0200
+++ b/src/Pure/System/session.scala	Sun Sep 19 22:40:22 2010 +0200
@@ -270,7 +270,7 @@
 
         case Started(timeout, args) =>
           if (prover == null) {
-            prover = new Isabelle_Process(system, self, args:_*) with Isar_Document
+            prover = new Isabelle_Process(system, timeout, self, args:_*) with Isar_Document
             val origin = sender
             val opt_err = prover_startup(timeout)
             if (opt_err.isDefined) prover = null