native Isabelle_Process commands, based on efficient byte channel protocol for string lists;
authorwenzelm
Wed Aug 11 00:44:48 2010 +0200 (2010-08-11 ago)
changeset 3827071bb3c273dd1
parent 38269 cd6906d9199f
child 38271 36187e8443dd
native Isabelle_Process commands, based on efficient byte channel protocol for string lists;
misc clarification of proc/pid state, eliminated closing flag;
misc tuning and simplification;
src/Pure/System/isabelle_process.ML
src/Pure/System/isabelle_process.scala
     1.1 --- a/src/Pure/System/isabelle_process.ML	Wed Aug 11 00:42:40 2010 +0200
     1.2 +++ b/src/Pure/System/isabelle_process.ML	Wed Aug 11 00:44:48 2010 +0200
     1.3 @@ -11,6 +11,9 @@
     1.4  signature ISABELLE_PROCESS =
     1.5  sig
     1.6    val isabelle_processN: string
     1.7 +  val add_command: string -> (string list -> unit) -> unit
     1.8 +  val command: string -> string list -> unit
     1.9 +  val crashes: exn list Unsynchronized.ref
    1.10    val init: string -> string -> unit
    1.11  end;
    1.12  
    1.13 @@ -25,6 +28,28 @@
    1.14  val _ = Markup.add_mode isabelle_processN YXML.output_markup;
    1.15  
    1.16  
    1.17 +(* commands *)
    1.18 +
    1.19 +local
    1.20 +
    1.21 +val global_commands = Unsynchronized.ref (Symtab.empty: (string list -> unit) Symtab.table);
    1.22 +
    1.23 +in
    1.24 +
    1.25 +fun add_command name cmd = CRITICAL (fn () =>
    1.26 +  Unsynchronized.change global_commands (fn cmds =>
    1.27 +   (if not (Symtab.defined cmds name) then ()
    1.28 +    else warning ("Redefining Isabelle process command " ^ quote name);
    1.29 +    Symtab.update (name, cmd) cmds)));
    1.30 +
    1.31 +fun command name args =
    1.32 +  (case Symtab.lookup (! global_commands) name of
    1.33 +    NONE => error ("Undefined Isabelle process command " ^ quote name)
    1.34 +  | SOME cmd => cmd args);
    1.35 +
    1.36 +end;
    1.37 +
    1.38 +
    1.39  (* message markup *)
    1.40  
    1.41  local
    1.42 @@ -94,6 +119,53 @@
    1.43  end;
    1.44  
    1.45  
    1.46 +(* protocol loop *)
    1.47 +
    1.48 +val crashes = Unsynchronized.ref ([]: exn list);
    1.49 +
    1.50 +local
    1.51 +
    1.52 +fun recover crash =
    1.53 +  (CRITICAL (fn () => Unsynchronized.change crashes (cons crash));
    1.54 +    warning "Recovering from Isabelle process crash -- see also Isabelle_Process.crashes");
    1.55 +
    1.56 +fun read_chunk stream len =
    1.57 +  let
    1.58 +    val n =
    1.59 +      (case Int.fromString len of
    1.60 +        SOME n => n
    1.61 +      | NONE => error ("Isabelle process: malformed chunk header " ^ quote len));
    1.62 +    val chunk = TextIO.inputN (stream, n);
    1.63 +    val m = size chunk;
    1.64 +  in
    1.65 +    if m = n then chunk
    1.66 +    else error ("Isabelle process: bad chunk (" ^ string_of_int m ^ " vs. " ^ string_of_int n ^ ")")
    1.67 +  end;
    1.68 +
    1.69 +fun read_command stream =
    1.70 +  (case TextIO.inputLine stream of
    1.71 +    NONE => raise Runtime.TERMINATE
    1.72 +  | SOME line => map (read_chunk stream) (space_explode "," line));
    1.73 +
    1.74 +fun run_command name args =
    1.75 +  Runtime.debugging (command name) args
    1.76 +    handle exn =>
    1.77 +      error ("Isabelle process command failure: " ^ name ^ "\n" ^ ML_Compiler.exn_message exn);
    1.78 +
    1.79 +in
    1.80 +
    1.81 +fun loop stream =
    1.82 +  let val continue =
    1.83 +    (case read_command stream of
    1.84 +      [] => (Output.error_msg "Isabelle process: no input"; true)
    1.85 +    | name :: args => (run_command name args; true))
    1.86 +    handle Runtime.TERMINATE => false
    1.87 +      | exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true);
    1.88 +  in if continue then loop stream else () end;
    1.89 +
    1.90 +end;
    1.91 +
    1.92 +
    1.93  (* init *)
    1.94  
    1.95  fun init in_fifo out_fifo =
    1.96 @@ -105,10 +177,8 @@
    1.97      val _ = quick_and_dirty := true;  (* FIXME !? *)
    1.98      val _ = Keyword.status ();
    1.99      val _ = Output.status (Markup.markup Markup.ready "");
   1.100 -    val _ =
   1.101 -      Simple_Thread.fork false (fn () =>
   1.102 -        (Isar.toplevel_loop in_stream {init = true, welcome = false, sync = true, secure = true};
   1.103 -          quit ()));
   1.104 +    val _ = Context.set_thread_data NONE;
   1.105 +    val _ = Simple_Thread.fork false (fn () => (loop in_stream; quit ()));
   1.106    in () end;
   1.107  
   1.108  end;
     2.1 --- a/src/Pure/System/isabelle_process.scala	Wed Aug 11 00:42:40 2010 +0200
     2.2 +++ b/src/Pure/System/isabelle_process.scala	Wed Aug 11 00:44:48 2010 +0200
     2.3 @@ -9,7 +9,7 @@
     2.4  
     2.5  import java.util.concurrent.LinkedBlockingQueue
     2.6  import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
     2.7 -  InputStream, OutputStream, IOException}
     2.8 +  InputStream, OutputStream, BufferedOutputStream, IOException}
     2.9  
    2.10  import scala.actors.Actor
    2.11  import Actor._
    2.12 @@ -89,9 +89,8 @@
    2.13  
    2.14    /* process information */
    2.15  
    2.16 -  @volatile private var proc: Process = null
    2.17 -  @volatile private var closing = false
    2.18 -  @volatile private var pid: String = null
    2.19 +  @volatile private var proc: Option[Process] = None
    2.20 +  @volatile private var pid: Option[String] = None
    2.21  
    2.22  
    2.23    /* results */
    2.24 @@ -99,7 +98,7 @@
    2.25    private def put_result(kind: String, props: List[(String, String)], body: List[XML.Tree])
    2.26    {
    2.27      if (kind == Markup.INIT) {
    2.28 -      for ((Markup.PID, p) <- props) pid = p
    2.29 +      for ((Markup.PID, p) <- props) pid = Some(p)
    2.30      }
    2.31      receiver ! new Result(XML.Elem(Markup(kind, props), body))
    2.32    }
    2.33 @@ -112,29 +111,34 @@
    2.34  
    2.35    /* signals */
    2.36  
    2.37 -  def interrupt() = synchronized {  // FIXME avoid synchronized
    2.38 -    if (proc == null) error("Cannot interrupt Isabelle: no process")
    2.39 -    if (pid == null) put_result(Markup.SYSTEM, "Cannot interrupt: unknown pid")
    2.40 -    else {
    2.41 -      try {
    2.42 -        if (system.execute(true, "kill", "-INT", pid).waitFor == 0)
    2.43 -          put_result(Markup.SIGNAL, "INT")
    2.44 -        else
    2.45 -          put_result(Markup.SYSTEM, "Cannot interrupt: kill command failed")
    2.46 +  def interrupt()
    2.47 +  {
    2.48 +    if (proc.isEmpty) put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: no process")
    2.49 +    else
    2.50 +      pid match {
    2.51 +        case None => put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: unknowd pid")
    2.52 +        case Some(i) =>
    2.53 +          try {
    2.54 +            if (system.execute(true, "kill", "-INT", i).waitFor == 0)
    2.55 +              put_result(Markup.SIGNAL, "INT")
    2.56 +            else
    2.57 +              put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: kill command failed")
    2.58 +          }
    2.59 +          catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
    2.60        }
    2.61 -      catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
    2.62 -    }
    2.63    }
    2.64  
    2.65 -  def kill() = synchronized {  // FIXME avoid synchronized
    2.66 -    if (proc == 0) error("Cannot kill Isabelle: no process")
    2.67 -    else {
    2.68 -      try_close()
    2.69 -      Thread.sleep(500)  // FIXME property!?
    2.70 -      put_result(Markup.SIGNAL, "KILL")
    2.71 -      proc.destroy
    2.72 -      proc = null
    2.73 -      pid = null
    2.74 +  def kill()
    2.75 +  {
    2.76 +    proc match {
    2.77 +      case None => put_result(Markup.SYSTEM, "Cannot kill Isabelle: no process")
    2.78 +      case Some(p) =>
    2.79 +        close()
    2.80 +        Thread.sleep(500)  // FIXME !?
    2.81 +        put_result(Markup.SIGNAL, "KILL")
    2.82 +        p.destroy
    2.83 +        proc = None
    2.84 +        pid = None
    2.85      }
    2.86    }
    2.87  
    2.88 @@ -142,12 +146,14 @@
    2.89  
    2.90    /** stream actors **/
    2.91  
    2.92 -  /* input */
    2.93 -
    2.94 -  case class Input(cmd: String)
    2.95 +  case class Input_Text(text: String)
    2.96 +  case class Input_Chunks(chunks: List[Array[Byte]])
    2.97    case object Close
    2.98  
    2.99 -  private def input_actor(name: String, kind: String, stream: => OutputStream): Actor =
   2.100 +
   2.101 +  /* raw stdin */
   2.102 +
   2.103 +  private def stdin_actor(name: String, stream: OutputStream): Actor =
   2.104      Library.thread_actor(name) {
   2.105        val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset))
   2.106        var finished = false
   2.107 @@ -155,8 +161,8 @@
   2.108          try {
   2.109            //{{{
   2.110            receive {
   2.111 -            case Input(text) =>
   2.112 -              put_result(kind, text)
   2.113 +            case Input_Text(text) =>
   2.114 +              // FIXME echo input?!
   2.115                writer.write(text)
   2.116                writer.flush
   2.117              case Close =>
   2.118 @@ -174,9 +180,9 @@
   2.119      }
   2.120  
   2.121  
   2.122 -  /* raw output */
   2.123 +  /* raw stdout */
   2.124  
   2.125 -  private def output_actor(name: String, kind: String, stream: => InputStream): Actor =
   2.126 +  private def stdout_actor(name: String, stream: InputStream): Actor =
   2.127      Library.thread_actor(name) {
   2.128        val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset))
   2.129        var result = new StringBuilder(100)
   2.130 @@ -193,13 +199,43 @@
   2.131              else done = true
   2.132            }
   2.133            if (result.length > 0) {
   2.134 -            put_result(kind, result.toString)
   2.135 +            put_result(Markup.STDOUT, result.toString)
   2.136              result.length = 0
   2.137            }
   2.138            else {
   2.139              reader.close
   2.140              finished = true
   2.141 -            try_close()
   2.142 +            close()
   2.143 +          }
   2.144 +          //}}}
   2.145 +        }
   2.146 +        catch {
   2.147 +          case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
   2.148 +        }
   2.149 +      }
   2.150 +      put_result(Markup.SYSTEM, name + " terminated")
   2.151 +    }
   2.152 +
   2.153 +
   2.154 +  /* command input */
   2.155 +
   2.156 +  private def input_actor(name: String, raw_stream: OutputStream): Actor =
   2.157 +    Library.thread_actor(name) {
   2.158 +      val stream = new BufferedOutputStream(raw_stream)
   2.159 +      var finished = false
   2.160 +      while (!finished) {
   2.161 +        try {
   2.162 +          //{{{
   2.163 +          receive {
   2.164 +            case Input_Chunks(chunks) =>
   2.165 +              stream.write(Standard_System.string_bytes(
   2.166 +                  chunks.map(_.length).mkString("", ",", "\n")));
   2.167 +              chunks.foreach(stream.write(_));
   2.168 +              stream.flush
   2.169 +            case Close =>
   2.170 +              stream.close
   2.171 +              finished = true
   2.172 +            case bad => System.err.println(name + ": ignoring bad message " + bad)
   2.173            }
   2.174            //}}}
   2.175          }
   2.176 @@ -281,7 +317,7 @@
   2.177          }
   2.178        } while (c != -1)
   2.179        stream.close
   2.180 -      try_close()
   2.181 +      close()
   2.182  
   2.183        put_result(Markup.SYSTEM, name + " terminated")
   2.184      }
   2.185 @@ -299,7 +335,7 @@
   2.186    try {
   2.187      val cmdline =
   2.188        List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
   2.189 -    proc = system.execute(true, cmdline: _*)
   2.190 +    proc = Some(system.execute(true, cmdline: _*))
   2.191    }
   2.192    catch {
   2.193      case e: IOException =>
   2.194 @@ -308,49 +344,37 @@
   2.195    }
   2.196  
   2.197  
   2.198 +  /* I/O actors */
   2.199 +
   2.200 +  private val standard_input = stdin_actor("standard_input", proc.get.getOutputStream)
   2.201 +  stdout_actor("standard_output", proc.get.getInputStream)
   2.202 +
   2.203 +  private val command_input = input_actor("command_input", system.fifo_output_stream(in_fifo))
   2.204 +  message_actor("message_output", system.fifo_input_stream(out_fifo))
   2.205 +
   2.206 +
   2.207    /* exit process */
   2.208  
   2.209    Library.thread_actor("process_exit") {
   2.210 -    val rc = proc.waitFor()
   2.211 -    Thread.sleep(300)  // FIXME property!?
   2.212 -    put_result(Markup.SYSTEM, "process_exit terminated")
   2.213 -    put_result(Markup.EXIT, rc.toString)
   2.214 +    proc match {
   2.215 +      case None =>
   2.216 +      case Some(p) =>
   2.217 +        val rc = p.waitFor()
   2.218 +        Thread.sleep(300)  // FIXME property!?
   2.219 +        put_result(Markup.SYSTEM, "process_exit terminated")
   2.220 +        put_result(Markup.EXIT, rc.toString)
   2.221 +    }
   2.222      rm_fifos()
   2.223    }
   2.224  
   2.225  
   2.226 -  /* I/O actors */
   2.227 -
   2.228 -  private val standard_input =
   2.229 -    input_actor("standard_input", Markup.STDIN, proc.getOutputStream)
   2.230 -
   2.231 -  private val command_input =
   2.232 -    input_actor("command_input", Markup.INPUT, system.fifo_output_stream(in_fifo))
   2.233 -
   2.234 -  output_actor("standard_output", Markup.STDOUT, proc.getInputStream)
   2.235 -  message_actor("message_output", system.fifo_input_stream(out_fifo))
   2.236 -
   2.237 -
   2.238  
   2.239    /** main methods **/
   2.240  
   2.241 -  def input_raw(text: String) = standard_input ! Input(text)
   2.242 -
   2.243 -  def input(text: String) = synchronized {  // FIXME avoid synchronized
   2.244 -    if (proc == null) error("Cannot output to Isabelle: no process")
   2.245 -    if (closing) error("Cannot output to Isabelle: already closing")
   2.246 -    command_input ! Input(" \\<^sync>\n; " + text + " \\<^sync>;\n")
   2.247 -  }
   2.248 +  def input_raw(text: String): Unit = standard_input ! Input_Text(text)
   2.249  
   2.250 -  def close() = synchronized {    // FIXME avoid synchronized
   2.251 -    command_input ! Close
   2.252 -    closing = true
   2.253 -  }
   2.254 +  def input(name: String, args: String*): Unit =
   2.255 +    command_input ! Input_Chunks((name :: args.toList).map(Standard_System.string_bytes))
   2.256  
   2.257 -  def try_close() = synchronized {
   2.258 -    if (proc != null && !closing) {
   2.259 -      try { close() }
   2.260 -      catch { case _: RuntimeException => }
   2.261 -    }
   2.262 -  }
   2.263 +  def close(): Unit = command_input ! Close
   2.264  }