distinguish proper Isabelle_Process INPUT vs. raw STDIN, tuned corresponding method names;
authorwenzelm
Tue, 10 Aug 2010 12:29:11 +0200
changeset 38259 2b61c5e27399
parent 38258 dd7dcb9b2637
child 38260 d4a1c7a19be3
distinguish proper Isabelle_Process INPUT vs. raw STDIN, tuned corresponding method names; asynchronous Isabelle_Process.init -- raw ML toplevel stays active; simplified Isabelle_Process using actors; misc tuning;
src/Pure/General/markup.scala
src/Pure/Isar/isar_document.scala
src/Pure/System/isabelle_process.ML
src/Pure/System/isabelle_process.scala
--- a/src/Pure/General/markup.scala	Tue Aug 10 12:09:53 2010 +0200
+++ b/src/Pure/General/markup.scala	Tue Aug 10 12:29:11 2010 +0200
@@ -203,6 +203,7 @@
   val ERROR = "error"
   val DEBUG = "debug"
   val SYSTEM = "system"
+  val INPUT = "input"
   val STDIN = "stdin"
   val STDOUT = "stdout"
   val SIGNAL = "signal"
--- a/src/Pure/Isar/isar_document.scala	Tue Aug 10 12:09:53 2010 +0200
+++ b/src/Pure/Isar/isar_document.scala	Tue Aug 10 12:29:11 2010 +0200
@@ -38,7 +38,7 @@
   /* commands */
 
   def define_command(id: Document.Command_ID, text: String) {
-    output_sync("Isar.define_command " + Isabelle_Syntax.encode_string(id) + " " +
+    input("Isar.define_command " + Isabelle_Syntax.encode_string(id) + " " +
       Isabelle_Syntax.encode_string(text))
   }
 
@@ -80,6 +80,6 @@
     Isabelle_Syntax.append_string(new_id, buf)
     buf.append(" ")
     Isabelle_Syntax.append_list(append_node_edit, edits, buf)
-    output_sync(buf.toString)
+    input(buf.toString)
   }
 }
--- a/src/Pure/System/isabelle_process.ML	Tue Aug 10 12:09:53 2010 +0200
+++ b/src/Pure/System/isabelle_process.ML	Tue Aug 10 12:29:11 2010 +0200
@@ -105,6 +105,10 @@
     val _ = quick_and_dirty := true;  (* FIXME !? *)
     val _ = Keyword.status ();
     val _ = Output.status (Markup.markup Markup.ready "");
-  in Isar.toplevel_loop in_stream {init = true, welcome = false, sync = true, secure = true} end;
+    val _ =
+      Simple_Thread.fork false (fn () =>
+        (Isar.toplevel_loop in_stream {init = true, welcome = false, sync = true, secure = true};
+          quit ()));
+  in () end;
 
 end;
--- a/src/Pure/System/isabelle_process.scala	Tue Aug 10 12:09:53 2010 +0200
+++ b/src/Pure/System/isabelle_process.scala	Tue Aug 10 12:29:11 2010 +0200
@@ -38,6 +38,7 @@
       kind == Markup.EXIT
     def is_system(kind: String) =
       kind == Markup.SYSTEM ||
+      kind == Markup.INPUT ||
       kind == Markup.STDIN ||
       kind == Markup.SIGNAL ||
       kind == Markup.EXIT ||
@@ -111,7 +112,7 @@
 
   /* signals */
 
-  def interrupt() = synchronized {
+  def interrupt() = synchronized {  // FIXME avoid synchronized
     if (proc == null) error("Cannot interrupt Isabelle: no process")
     if (pid == null) put_result(Markup.SYSTEM, "Cannot interrupt: unknown pid")
     else {
@@ -125,7 +126,7 @@
     }
   }
 
-  def kill() = synchronized {
+  def kill() = synchronized {  // FIXME avoid synchronized
     if (proc == 0) error("Cannot kill Isabelle: no process")
     else {
       try_close()
@@ -138,85 +139,45 @@
   }
 
 
-  /* output being piped into the process */
-
-  private val output = new LinkedBlockingQueue[String]
-
-  private def output_raw(text: String) = synchronized {
-    if (proc == null) error("Cannot output to Isabelle: no process")
-    if (closing) error("Cannot output to Isabelle: already closing")
-    output.put(text)
-  }
 
-  def output_sync(text: String) =
-    output_raw(" \\<^sync>\n; " + text + " \\<^sync>;\n")
-
-
-  def command(text: String) =
-    output_sync("Isabelle.command " + Isabelle_Syntax.encode_string(text))
+  /** stream actors **/
 
-  def command(props: List[(String, String)], text: String) =
-    output_sync("Isabelle.command " + Isabelle_Syntax.encode_properties(props) + " " +
-      Isabelle_Syntax.encode_string(text))
-
-  def ML_val(text: String) =
-    output_sync("ML_val " + Isabelle_Syntax.encode_string(text))
+  /* input */
 
-  def ML_command(text: String) =
-    output_sync("ML_command " + Isabelle_Syntax.encode_string(text))
-
-  def close() = synchronized {    // FIXME watchdog/timeout
-    output_raw("\u0000")
-    closing = true
-  }
+  case class Input(cmd: String)
+  case object Close
 
-  def try_close() = synchronized {
-    if (proc != null && !closing) {
-      try { close() }
-      catch { case _: RuntimeException => }
-    }
-  }
-
-
-  /* commands */
-
-  private class Command_Thread(fifo: String) extends Thread("isabelle: commands")
-  {
-    override def run()
-    {
-      val stream = system.fifo_output_stream(fifo)
+  private def input_actor(name: String, kind: String, stream: => OutputStream): Actor =
+    Library.thread_actor(name) {
       val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset))
       var finished = false
       while (!finished) {
         try {
           //{{{
-          val s = output.take
-          if (s == "\u0000") {
-            writer.close
-            finished = true
-          }
-          else {
-            put_result(Markup.STDIN, s)
-            writer.write(s)
-            writer.flush
+          receive {
+            case Input(text) =>
+              put_result(kind, text)
+              writer.write(text)
+              writer.flush
+            case Close =>
+              writer.close
+              finished = true
+            case bad => System.err.println(name + ": ignoring bad message " + bad)
           }
           //}}}
         }
         catch {
-          case e: IOException => put_result(Markup.SYSTEM, "Command thread: " + e.getMessage)
+          case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
         }
       }
-      put_result(Markup.SYSTEM, "Command thread terminated")
+      put_result(Markup.SYSTEM, name + " terminated")
     }
-  }
 
 
-  /* raw stdout */
+  /* raw output */
 
-  private class Stdout_Thread(stream: InputStream) extends Thread("isabelle: stdout")
-  {
-    override def run() =
-    {
+  private def output_actor(name: String, kind: String, stream: => InputStream): Actor =
+    Library.thread_actor(name) {
       val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset))
       var result = new StringBuilder(100)
 
@@ -232,7 +193,7 @@
             else done = true
           }
           if (result.length > 0) {
-            put_result(Markup.STDOUT, result.toString)
+            put_result(kind, result.toString)
             result.length = 0
           }
           else {
@@ -243,22 +204,19 @@
           //}}}
         }
         catch {
-          case e: IOException => put_result(Markup.SYSTEM, "Stdout thread: " + e.getMessage)
+          case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
         }
       }
-      put_result(Markup.SYSTEM, "Stdout thread terminated")
+      put_result(Markup.SYSTEM, name + " terminated")
     }
-  }
 
 
-  /* messages */
+  /* message output */
 
-  private class Message_Thread(fifo: String) extends Thread("isabelle: messages")
-  {
-    private class Protocol_Error(msg: String) extends Exception(msg)
-    override def run()
-    {
-      val stream = system.fifo_input_stream(fifo)
+  private class Protocol_Error(msg: String) extends Exception(msg)
+
+  private def message_actor(name: String, stream: InputStream): Actor =
+    Library.thread_actor(name) {
       val default_buffer = new Array[Byte](65536)
       var c = -1
 
@@ -325,54 +283,83 @@
       stream.close
       try_close()
 
-      put_result(Markup.SYSTEM, "Message thread terminated")
+      put_result(Markup.SYSTEM, name + " terminated")
     }
-  }
 
 
 
-  /** main **/
+  /** init **/
+
+  /* exec process */
 
-  {
-    /* private communication channels */
+  private val in_fifo = system.mk_fifo()
+  private val out_fifo = system.mk_fifo()
+  private def rm_fifos() = { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
 
-    val in_fifo = system.mk_fifo()
-    val out_fifo = system.mk_fifo()
-    def rm_fifos() = { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
+  try {
+    val cmdline =
+      List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
+    proc = system.execute(true, cmdline: _*)
+  }
+  catch {
+    case e: IOException =>
+      rm_fifos()
+      error("Failed to execute Isabelle process: " + e.getMessage)
+  }
 
-    val command_thread = new Command_Thread(in_fifo)
-    val message_thread = new Message_Thread(out_fifo)
+
+  /* exit process */
 
-    command_thread.start
-    message_thread.start
+  Library.thread_actor("process_exit") {
+    val rc = proc.waitFor()
+    Thread.sleep(300)  // FIXME property!?
+    put_result(Markup.SYSTEM, "process_exit terminated")
+    put_result(Markup.EXIT, rc.toString)
+    rm_fifos()
+  }
 
 
-    /* exec process */
+  /* I/O actors */
+
+  private val standard_input =
+    input_actor("standard_input", Markup.STDIN, proc.getOutputStream)
 
-    try {
-      val cmdline =
-        List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
-      proc = system.execute(true, cmdline: _*)
-    }
-    catch {
-      case e: IOException =>
-        rm_fifos()
-        error("Failed to execute Isabelle process: " + e.getMessage)
-    }
-    new Stdout_Thread(proc.getInputStream).start
+  private val command_input =
+    input_actor("command_input", Markup.INPUT, system.fifo_output_stream(in_fifo))
+
+  output_actor("standard_output", Markup.STDOUT, proc.getInputStream)
+  message_actor("message_output", system.fifo_input_stream(out_fifo))
+
 
 
-    /* exit */
+  /** main methods **/
+
+  def input_raw(text: String) = standard_input ! Input(text)
+
+  def input(text: String) = synchronized {  // FIXME avoid synchronized
+    if (proc == null) error("Cannot output to Isabelle: no process")
+    if (closing) error("Cannot output to Isabelle: already closing")
+    command_input ! Input(" \\<^sync>\n; " + text + " \\<^sync>;\n")
+  }
+
+  def command(text: String) = input("Isabelle.command " + Isabelle_Syntax.encode_string(text))
 
-    new Thread("isabelle: exit") {
-      override def run()
-      {
-        val rc = proc.waitFor()
-        Thread.sleep(300)  // FIXME property!?
-        put_result(Markup.SYSTEM, "Exit thread terminated")
-        put_result(Markup.EXIT, rc.toString)
-        rm_fifos()
-      }
-    }.start
+  def command(props: List[(String, String)], text: String) =
+    input("Isabelle.command " + Isabelle_Syntax.encode_properties(props) + " " +
+      Isabelle_Syntax.encode_string(text))
+
+  def ML_val(text: String) = input("ML_val " + Isabelle_Syntax.encode_string(text))
+  def ML_command(text: String) = input("ML_command " + Isabelle_Syntax.encode_string(text))
+
+  def close() = synchronized {    // FIXME avoid synchronized
+    command_input ! Close
+    closing = true
+  }
+
+  def try_close() = synchronized {
+    if (proc != null && !closing) {
+      try { close() }
+      catch { case _: RuntimeException => }
+    }
   }
 }