--- a/src/Pure/System/isabelle_process.ML Wed Aug 11 00:42:40 2010 +0200
+++ b/src/Pure/System/isabelle_process.ML Wed Aug 11 00:44:48 2010 +0200
@@ -11,6 +11,9 @@
signature ISABELLE_PROCESS =
sig
val isabelle_processN: string
+ val add_command: string -> (string list -> unit) -> unit
+ val command: string -> string list -> unit
+ val crashes: exn list Unsynchronized.ref
val init: string -> string -> unit
end;
@@ -25,6 +28,28 @@
val _ = Markup.add_mode isabelle_processN YXML.output_markup;
+(* commands *)
+
+local
+
+val global_commands = Unsynchronized.ref (Symtab.empty: (string list -> unit) Symtab.table);
+
+in
+
+fun add_command name cmd = CRITICAL (fn () =>
+ Unsynchronized.change global_commands (fn cmds =>
+ (if not (Symtab.defined cmds name) then ()
+ else warning ("Redefining Isabelle process command " ^ quote name);
+ Symtab.update (name, cmd) cmds)));
+
+fun command name args =
+ (case Symtab.lookup (! global_commands) name of
+ NONE => error ("Undefined Isabelle process command " ^ quote name)
+ | SOME cmd => cmd args);
+
+end;
+
+
(* message markup *)
local
@@ -94,6 +119,53 @@
end;
+(* protocol loop *)
+
+val crashes = Unsynchronized.ref ([]: exn list);
+
+local
+
+fun recover crash =
+ (CRITICAL (fn () => Unsynchronized.change crashes (cons crash));
+ warning "Recovering from Isabelle process crash -- see also Isabelle_Process.crashes");
+
+fun read_chunk stream len =
+ let
+ val n =
+ (case Int.fromString len of
+ SOME n => n
+ | NONE => error ("Isabelle process: malformed chunk header " ^ quote len));
+ val chunk = TextIO.inputN (stream, n);
+ val m = size chunk;
+ in
+ if m = n then chunk
+ else error ("Isabelle process: bad chunk (" ^ string_of_int m ^ " vs. " ^ string_of_int n ^ ")")
+ end;
+
+fun read_command stream =
+ (case TextIO.inputLine stream of
+ NONE => raise Runtime.TERMINATE
+ | SOME line => map (read_chunk stream) (space_explode "," line));
+
+fun run_command name args =
+ Runtime.debugging (command name) args
+ handle exn =>
+ error ("Isabelle process command failure: " ^ name ^ "\n" ^ ML_Compiler.exn_message exn);
+
+in
+
+fun loop stream =
+ let val continue =
+ (case read_command stream of
+ [] => (Output.error_msg "Isabelle process: no input"; true)
+ | name :: args => (run_command name args; true))
+ handle Runtime.TERMINATE => false
+ | exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true);
+ in if continue then loop stream else () end;
+
+end;
+
+
(* init *)
fun init in_fifo out_fifo =
@@ -105,10 +177,8 @@
val _ = quick_and_dirty := true; (* FIXME !? *)
val _ = Keyword.status ();
val _ = Output.status (Markup.markup Markup.ready "");
- val _ =
- Simple_Thread.fork false (fn () =>
- (Isar.toplevel_loop in_stream {init = true, welcome = false, sync = true, secure = true};
- quit ()));
+ val _ = Context.set_thread_data NONE;
+ val _ = Simple_Thread.fork false (fn () => (loop in_stream; quit ()));
in () end;
end;
--- a/src/Pure/System/isabelle_process.scala Wed Aug 11 00:42:40 2010 +0200
+++ b/src/Pure/System/isabelle_process.scala Wed Aug 11 00:44:48 2010 +0200
@@ -9,7 +9,7 @@
import java.util.concurrent.LinkedBlockingQueue
import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
- InputStream, OutputStream, IOException}
+ InputStream, OutputStream, BufferedOutputStream, IOException}
import scala.actors.Actor
import Actor._
@@ -89,9 +89,8 @@
/* process information */
- @volatile private var proc: Process = null
- @volatile private var closing = false
- @volatile private var pid: String = null
+ @volatile private var proc: Option[Process] = None
+ @volatile private var pid: Option[String] = None
/* results */
@@ -99,7 +98,7 @@
private def put_result(kind: String, props: List[(String, String)], body: List[XML.Tree])
{
if (kind == Markup.INIT) {
- for ((Markup.PID, p) <- props) pid = p
+ for ((Markup.PID, p) <- props) pid = Some(p)
}
receiver ! new Result(XML.Elem(Markup(kind, props), body))
}
@@ -112,29 +111,34 @@
/* signals */
- def interrupt() = synchronized { // FIXME avoid synchronized
- if (proc == null) error("Cannot interrupt Isabelle: no process")
- if (pid == null) put_result(Markup.SYSTEM, "Cannot interrupt: unknown pid")
- else {
- try {
- if (system.execute(true, "kill", "-INT", pid).waitFor == 0)
- put_result(Markup.SIGNAL, "INT")
- else
- put_result(Markup.SYSTEM, "Cannot interrupt: kill command failed")
+ def interrupt()
+ {
+ if (proc.isEmpty) put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: no process")
+ else
+ pid match {
+ case None => put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: unknowd pid")
+ case Some(i) =>
+ try {
+ if (system.execute(true, "kill", "-INT", i).waitFor == 0)
+ put_result(Markup.SIGNAL, "INT")
+ else
+ put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: kill command failed")
+ }
+ catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
}
- catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
- }
}
- def kill() = synchronized { // FIXME avoid synchronized
- if (proc == 0) error("Cannot kill Isabelle: no process")
- else {
- try_close()
- Thread.sleep(500) // FIXME property!?
- put_result(Markup.SIGNAL, "KILL")
- proc.destroy
- proc = null
- pid = null
+ def kill()
+ {
+ proc match {
+ case None => put_result(Markup.SYSTEM, "Cannot kill Isabelle: no process")
+ case Some(p) =>
+ close()
+ Thread.sleep(500) // FIXME !?
+ put_result(Markup.SIGNAL, "KILL")
+ p.destroy
+ proc = None
+ pid = None
}
}
@@ -142,12 +146,14 @@
/** stream actors **/
- /* input */
-
- case class Input(cmd: String)
+ case class Input_Text(text: String)
+ case class Input_Chunks(chunks: List[Array[Byte]])
case object Close
- private def input_actor(name: String, kind: String, stream: => OutputStream): Actor =
+
+ /* raw stdin */
+
+ private def stdin_actor(name: String, stream: OutputStream): Actor =
Library.thread_actor(name) {
val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset))
var finished = false
@@ -155,8 +161,8 @@
try {
//{{{
receive {
- case Input(text) =>
- put_result(kind, text)
+ case Input_Text(text) =>
+ // FIXME echo input?!
writer.write(text)
writer.flush
case Close =>
@@ -174,9 +180,9 @@
}
- /* raw output */
+ /* raw stdout */
- private def output_actor(name: String, kind: String, stream: => InputStream): Actor =
+ private def stdout_actor(name: String, stream: InputStream): Actor =
Library.thread_actor(name) {
val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset))
var result = new StringBuilder(100)
@@ -193,13 +199,43 @@
else done = true
}
if (result.length > 0) {
- put_result(kind, result.toString)
+ put_result(Markup.STDOUT, result.toString)
result.length = 0
}
else {
reader.close
finished = true
- try_close()
+ close()
+ }
+ //}}}
+ }
+ catch {
+ case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
+ }
+ }
+ put_result(Markup.SYSTEM, name + " terminated")
+ }
+
+
+ /* command input */
+
+ private def input_actor(name: String, raw_stream: OutputStream): Actor =
+ Library.thread_actor(name) {
+ val stream = new BufferedOutputStream(raw_stream)
+ var finished = false
+ while (!finished) {
+ try {
+ //{{{
+ receive {
+ case Input_Chunks(chunks) =>
+ stream.write(Standard_System.string_bytes(
+ chunks.map(_.length).mkString("", ",", "\n")));
+ chunks.foreach(stream.write(_));
+ stream.flush
+ case Close =>
+ stream.close
+ finished = true
+ case bad => System.err.println(name + ": ignoring bad message " + bad)
}
//}}}
}
@@ -281,7 +317,7 @@
}
} while (c != -1)
stream.close
- try_close()
+ close()
put_result(Markup.SYSTEM, name + " terminated")
}
@@ -299,7 +335,7 @@
try {
val cmdline =
List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
- proc = system.execute(true, cmdline: _*)
+ proc = Some(system.execute(true, cmdline: _*))
}
catch {
case e: IOException =>
@@ -308,49 +344,37 @@
}
+ /* I/O actors */
+
+ private val standard_input = stdin_actor("standard_input", proc.get.getOutputStream)
+ stdout_actor("standard_output", proc.get.getInputStream)
+
+ private val command_input = input_actor("command_input", system.fifo_output_stream(in_fifo))
+ message_actor("message_output", system.fifo_input_stream(out_fifo))
+
+
/* exit process */
Library.thread_actor("process_exit") {
- val rc = proc.waitFor()
- Thread.sleep(300) // FIXME property!?
- put_result(Markup.SYSTEM, "process_exit terminated")
- put_result(Markup.EXIT, rc.toString)
+ proc match {
+ case None =>
+ case Some(p) =>
+ val rc = p.waitFor()
+ Thread.sleep(300) // FIXME property!?
+ put_result(Markup.SYSTEM, "process_exit terminated")
+ put_result(Markup.EXIT, rc.toString)
+ }
rm_fifos()
}
- /* I/O actors */
-
- private val standard_input =
- input_actor("standard_input", Markup.STDIN, proc.getOutputStream)
-
- private val command_input =
- input_actor("command_input", Markup.INPUT, system.fifo_output_stream(in_fifo))
-
- output_actor("standard_output", Markup.STDOUT, proc.getInputStream)
- message_actor("message_output", system.fifo_input_stream(out_fifo))
-
-
/** main methods **/
- def input_raw(text: String) = standard_input ! Input(text)
-
- def input(text: String) = synchronized { // FIXME avoid synchronized
- if (proc == null) error("Cannot output to Isabelle: no process")
- if (closing) error("Cannot output to Isabelle: already closing")
- command_input ! Input(" \\<^sync>\n; " + text + " \\<^sync>;\n")
- }
+ def input_raw(text: String): Unit = standard_input ! Input_Text(text)
- def close() = synchronized { // FIXME avoid synchronized
- command_input ! Close
- closing = true
- }
+ def input(name: String, args: String*): Unit =
+ command_input ! Input_Chunks((name :: args.toList).map(Standard_System.string_bytes))
- def try_close() = synchronized {
- if (proc != null && !closing) {
- try { close() }
- catch { case _: RuntimeException => }
- }
- }
+ def close(): Unit = command_input ! Close
}