native Isabelle_Process commands, based on efficient byte channel protocol for string lists;
authorwenzelm
Wed, 11 Aug 2010 00:44:48 +0200
changeset 38270 71bb3c273dd1
parent 38269 cd6906d9199f
child 38271 36187e8443dd
native Isabelle_Process commands, based on efficient byte channel protocol for string lists; misc clarification of proc/pid state, eliminated closing flag; misc tuning and simplification;
src/Pure/System/isabelle_process.ML
src/Pure/System/isabelle_process.scala
--- a/src/Pure/System/isabelle_process.ML	Wed Aug 11 00:42:40 2010 +0200
+++ b/src/Pure/System/isabelle_process.ML	Wed Aug 11 00:44:48 2010 +0200
@@ -11,6 +11,9 @@
 signature ISABELLE_PROCESS =
 sig
   val isabelle_processN: string
+  val add_command: string -> (string list -> unit) -> unit
+  val command: string -> string list -> unit
+  val crashes: exn list Unsynchronized.ref
   val init: string -> string -> unit
 end;
 
@@ -25,6 +28,28 @@
 val _ = Markup.add_mode isabelle_processN YXML.output_markup;
 
 
+(* commands *)
+
+local
+
+val global_commands = Unsynchronized.ref (Symtab.empty: (string list -> unit) Symtab.table);
+
+in
+
+fun add_command name cmd = CRITICAL (fn () =>
+  Unsynchronized.change global_commands (fn cmds =>
+   (if not (Symtab.defined cmds name) then ()
+    else warning ("Redefining Isabelle process command " ^ quote name);
+    Symtab.update (name, cmd) cmds)));
+
+fun command name args =
+  (case Symtab.lookup (! global_commands) name of
+    NONE => error ("Undefined Isabelle process command " ^ quote name)
+  | SOME cmd => cmd args);
+
+end;
+
+
 (* message markup *)
 
 local
@@ -94,6 +119,53 @@
 end;
 
 
+(* protocol loop *)
+
+val crashes = Unsynchronized.ref ([]: exn list);
+
+local
+
+fun recover crash =
+  (CRITICAL (fn () => Unsynchronized.change crashes (cons crash));
+    warning "Recovering from Isabelle process crash -- see also Isabelle_Process.crashes");
+
+fun read_chunk stream len =
+  let
+    val n =
+      (case Int.fromString len of
+        SOME n => n
+      | NONE => error ("Isabelle process: malformed chunk header " ^ quote len));
+    val chunk = TextIO.inputN (stream, n);
+    val m = size chunk;
+  in
+    if m = n then chunk
+    else error ("Isabelle process: bad chunk (" ^ string_of_int m ^ " vs. " ^ string_of_int n ^ ")")
+  end;
+
+fun read_command stream =
+  (case TextIO.inputLine stream of
+    NONE => raise Runtime.TERMINATE
+  | SOME line => map (read_chunk stream) (space_explode "," line));
+
+fun run_command name args =
+  Runtime.debugging (command name) args
+    handle exn =>
+      error ("Isabelle process command failure: " ^ name ^ "\n" ^ ML_Compiler.exn_message exn);
+
+in
+
+fun loop stream =
+  let val continue =
+    (case read_command stream of
+      [] => (Output.error_msg "Isabelle process: no input"; true)
+    | name :: args => (run_command name args; true))
+    handle Runtime.TERMINATE => false
+      | exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true);
+  in if continue then loop stream else () end;
+
+end;
+
+
 (* init *)
 
 fun init in_fifo out_fifo =
@@ -105,10 +177,8 @@
     val _ = quick_and_dirty := true;  (* FIXME !? *)
     val _ = Keyword.status ();
     val _ = Output.status (Markup.markup Markup.ready "");
-    val _ =
-      Simple_Thread.fork false (fn () =>
-        (Isar.toplevel_loop in_stream {init = true, welcome = false, sync = true, secure = true};
-          quit ()));
+    val _ = Context.set_thread_data NONE;
+    val _ = Simple_Thread.fork false (fn () => (loop in_stream; quit ()));
   in () end;
 
 end;
--- a/src/Pure/System/isabelle_process.scala	Wed Aug 11 00:42:40 2010 +0200
+++ b/src/Pure/System/isabelle_process.scala	Wed Aug 11 00:44:48 2010 +0200
@@ -9,7 +9,7 @@
 
 import java.util.concurrent.LinkedBlockingQueue
 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
-  InputStream, OutputStream, IOException}
+  InputStream, OutputStream, BufferedOutputStream, IOException}
 
 import scala.actors.Actor
 import Actor._
@@ -89,9 +89,8 @@
 
   /* process information */
 
-  @volatile private var proc: Process = null
-  @volatile private var closing = false
-  @volatile private var pid: String = null
+  @volatile private var proc: Option[Process] = None
+  @volatile private var pid: Option[String] = None
 
 
   /* results */
@@ -99,7 +98,7 @@
   private def put_result(kind: String, props: List[(String, String)], body: List[XML.Tree])
   {
     if (kind == Markup.INIT) {
-      for ((Markup.PID, p) <- props) pid = p
+      for ((Markup.PID, p) <- props) pid = Some(p)
     }
     receiver ! new Result(XML.Elem(Markup(kind, props), body))
   }
@@ -112,29 +111,34 @@
 
   /* signals */
 
-  def interrupt() = synchronized {  // FIXME avoid synchronized
-    if (proc == null) error("Cannot interrupt Isabelle: no process")
-    if (pid == null) put_result(Markup.SYSTEM, "Cannot interrupt: unknown pid")
-    else {
-      try {
-        if (system.execute(true, "kill", "-INT", pid).waitFor == 0)
-          put_result(Markup.SIGNAL, "INT")
-        else
-          put_result(Markup.SYSTEM, "Cannot interrupt: kill command failed")
+  def interrupt()
+  {
+    if (proc.isEmpty) put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: no process")
+    else
+      pid match {
+        case None => put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: unknowd pid")
+        case Some(i) =>
+          try {
+            if (system.execute(true, "kill", "-INT", i).waitFor == 0)
+              put_result(Markup.SIGNAL, "INT")
+            else
+              put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: kill command failed")
+          }
+          catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
       }
-      catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
-    }
   }
 
-  def kill() = synchronized {  // FIXME avoid synchronized
-    if (proc == 0) error("Cannot kill Isabelle: no process")
-    else {
-      try_close()
-      Thread.sleep(500)  // FIXME property!?
-      put_result(Markup.SIGNAL, "KILL")
-      proc.destroy
-      proc = null
-      pid = null
+  def kill()
+  {
+    proc match {
+      case None => put_result(Markup.SYSTEM, "Cannot kill Isabelle: no process")
+      case Some(p) =>
+        close()
+        Thread.sleep(500)  // FIXME !?
+        put_result(Markup.SIGNAL, "KILL")
+        p.destroy
+        proc = None
+        pid = None
     }
   }
 
@@ -142,12 +146,14 @@
 
   /** stream actors **/
 
-  /* input */
-
-  case class Input(cmd: String)
+  case class Input_Text(text: String)
+  case class Input_Chunks(chunks: List[Array[Byte]])
   case object Close
 
-  private def input_actor(name: String, kind: String, stream: => OutputStream): Actor =
+
+  /* raw stdin */
+
+  private def stdin_actor(name: String, stream: OutputStream): Actor =
     Library.thread_actor(name) {
       val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset))
       var finished = false
@@ -155,8 +161,8 @@
         try {
           //{{{
           receive {
-            case Input(text) =>
-              put_result(kind, text)
+            case Input_Text(text) =>
+              // FIXME echo input?!
               writer.write(text)
               writer.flush
             case Close =>
@@ -174,9 +180,9 @@
     }
 
 
-  /* raw output */
+  /* raw stdout */
 
-  private def output_actor(name: String, kind: String, stream: => InputStream): Actor =
+  private def stdout_actor(name: String, stream: InputStream): Actor =
     Library.thread_actor(name) {
       val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset))
       var result = new StringBuilder(100)
@@ -193,13 +199,43 @@
             else done = true
           }
           if (result.length > 0) {
-            put_result(kind, result.toString)
+            put_result(Markup.STDOUT, result.toString)
             result.length = 0
           }
           else {
             reader.close
             finished = true
-            try_close()
+            close()
+          }
+          //}}}
+        }
+        catch {
+          case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
+        }
+      }
+      put_result(Markup.SYSTEM, name + " terminated")
+    }
+
+
+  /* command input */
+
+  private def input_actor(name: String, raw_stream: OutputStream): Actor =
+    Library.thread_actor(name) {
+      val stream = new BufferedOutputStream(raw_stream)
+      var finished = false
+      while (!finished) {
+        try {
+          //{{{
+          receive {
+            case Input_Chunks(chunks) =>
+              stream.write(Standard_System.string_bytes(
+                  chunks.map(_.length).mkString("", ",", "\n")));
+              chunks.foreach(stream.write(_));
+              stream.flush
+            case Close =>
+              stream.close
+              finished = true
+            case bad => System.err.println(name + ": ignoring bad message " + bad)
           }
           //}}}
         }
@@ -281,7 +317,7 @@
         }
       } while (c != -1)
       stream.close
-      try_close()
+      close()
 
       put_result(Markup.SYSTEM, name + " terminated")
     }
@@ -299,7 +335,7 @@
   try {
     val cmdline =
       List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
-    proc = system.execute(true, cmdline: _*)
+    proc = Some(system.execute(true, cmdline: _*))
   }
   catch {
     case e: IOException =>
@@ -308,49 +344,37 @@
   }
 
 
+  /* I/O actors */
+
+  private val standard_input = stdin_actor("standard_input", proc.get.getOutputStream)
+  stdout_actor("standard_output", proc.get.getInputStream)
+
+  private val command_input = input_actor("command_input", system.fifo_output_stream(in_fifo))
+  message_actor("message_output", system.fifo_input_stream(out_fifo))
+
+
   /* exit process */
 
   Library.thread_actor("process_exit") {
-    val rc = proc.waitFor()
-    Thread.sleep(300)  // FIXME property!?
-    put_result(Markup.SYSTEM, "process_exit terminated")
-    put_result(Markup.EXIT, rc.toString)
+    proc match {
+      case None =>
+      case Some(p) =>
+        val rc = p.waitFor()
+        Thread.sleep(300)  // FIXME property!?
+        put_result(Markup.SYSTEM, "process_exit terminated")
+        put_result(Markup.EXIT, rc.toString)
+    }
     rm_fifos()
   }
 
 
-  /* I/O actors */
-
-  private val standard_input =
-    input_actor("standard_input", Markup.STDIN, proc.getOutputStream)
-
-  private val command_input =
-    input_actor("command_input", Markup.INPUT, system.fifo_output_stream(in_fifo))
-
-  output_actor("standard_output", Markup.STDOUT, proc.getInputStream)
-  message_actor("message_output", system.fifo_input_stream(out_fifo))
-
-
 
   /** main methods **/
 
-  def input_raw(text: String) = standard_input ! Input(text)
-
-  def input(text: String) = synchronized {  // FIXME avoid synchronized
-    if (proc == null) error("Cannot output to Isabelle: no process")
-    if (closing) error("Cannot output to Isabelle: already closing")
-    command_input ! Input(" \\<^sync>\n; " + text + " \\<^sync>;\n")
-  }
+  def input_raw(text: String): Unit = standard_input ! Input_Text(text)
 
-  def close() = synchronized {    // FIXME avoid synchronized
-    command_input ! Close
-    closing = true
-  }
+  def input(name: String, args: String*): Unit =
+    command_input ! Input_Chunks((name :: args.toList).map(Standard_System.string_bytes))
 
-  def try_close() = synchronized {
-    if (proc != null && !closing) {
-      try { close() }
-      catch { case _: RuntimeException => }
-    }
-  }
+  def close(): Unit = command_input ! Close
 }