generic process wrapping in Prover;
authorwenzelm
Tue Aug 12 18:36:43 2014 +0200 (2014-08-12)
changeset 579162c2c24dbf0a4
parent 57915 448325de6e4f
child 57917 8ce97e5d545f
generic process wrapping in Prover;
clarified module arrangement;
src/Pure/PIDE/protocol.scala
src/Pure/PIDE/prover.scala
src/Pure/PIDE/resources.scala
src/Pure/System/isabelle_process.ML
src/Pure/System/isabelle_process.scala
     1.1 --- a/src/Pure/PIDE/protocol.scala	Tue Aug 12 17:28:07 2014 +0200
     1.2 +++ b/src/Pure/PIDE/protocol.scala	Tue Aug 12 18:36:43 2014 +0200
     1.3 @@ -347,8 +347,25 @@
     1.4  }
     1.5  
     1.6  
     1.7 -trait Protocol extends Prover
     1.8 +trait Protocol
     1.9  {
    1.10 +  /* text */
    1.11 +
    1.12 +  def encode(s: String): String
    1.13 +  def decode(s: String): String
    1.14 +
    1.15 +  object Encode
    1.16 +  {
    1.17 +    val string: XML.Encode.T[String] = (s => XML.Encode.string(encode(s)))
    1.18 +  }
    1.19 +
    1.20 +
    1.21 +  /* protocol commands */
    1.22 +
    1.23 +  def protocol_command_bytes(name: String, args: Bytes*): Unit
    1.24 +  def protocol_command(name: String, args: String*): Unit
    1.25 +
    1.26 +
    1.27    /* options */
    1.28  
    1.29    def options(opts: Options): Unit =
     2.1 --- a/src/Pure/PIDE/prover.scala	Tue Aug 12 17:28:07 2014 +0200
     2.2 +++ b/src/Pure/PIDE/prover.scala	Tue Aug 12 18:36:43 2014 +0200
     2.3 @@ -1,13 +1,13 @@
     2.4  /*  Title:      Pure/PIDE/prover.scala
     2.5      Author:     Makarius
     2.6  
     2.7 -General prover operations.
     2.8 +General prover operations and process wrapping.
     2.9  */
    2.10  
    2.11  package isabelle
    2.12  
    2.13  
    2.14 -import java.io.BufferedReader
    2.15 +import java.io.{InputStream, OutputStream, BufferedReader, BufferedOutputStream, IOException}
    2.16  
    2.17  
    2.18  object Prover
    2.19 @@ -83,44 +83,286 @@
    2.20  }
    2.21  
    2.22  
    2.23 -trait Prover
    2.24 +abstract class Prover(
    2.25 +  receiver: Prover.Message => Unit,
    2.26 +  system_process: Prover.System_Process) extends Protocol
    2.27  {
    2.28 -  /* text and tree data */
    2.29 +  /* output */
    2.30 +
    2.31 +  val xml_cache: XML.Cache = new XML.Cache()
    2.32 +
    2.33 +  private def system_output(text: String)
    2.34 +  {
    2.35 +    receiver(new Prover.Output(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text)))))
    2.36 +  }
    2.37 +
    2.38 +  private def protocol_output(props: Properties.T, bytes: Bytes)
    2.39 +  {
    2.40 +    receiver(new Prover.Protocol_Output(props, bytes))
    2.41 +  }
    2.42 +
    2.43 +  private def output(kind: String, props: Properties.T, body: XML.Body)
    2.44 +  {
    2.45 +    if (kind == Markup.INIT) system_process.channel.accepted()
    2.46  
    2.47 -  def encode(s: String): String
    2.48 -  def decode(s: String): String
    2.49 +    val main = XML.Elem(Markup(kind, props), Protocol.clean_message(body))
    2.50 +    val reports = Protocol.message_reports(props, body)
    2.51 +    for (msg <- main :: reports) receiver(new Prover.Output(xml_cache.elem(msg)))
    2.52 +  }
    2.53 +
    2.54 +  private def exit_message(rc: Int)
    2.55 +  {
    2.56 +    output(Markup.EXIT, Markup.Return_Code(rc), List(XML.Text("Return code: " + rc.toString)))
    2.57 +  }
    2.58 +
    2.59 +
    2.60  
    2.61 -  object Encode
    2.62 +  /** process manager **/
    2.63 +
    2.64 +  private val (_, process_result) =
    2.65 +    Simple_Thread.future("process_result") { system_process.join }
    2.66 +
    2.67 +  private def terminate_process()
    2.68    {
    2.69 -    val string: XML.Encode.T[String] = (s => XML.Encode.string(encode(s)))
    2.70 +    try { system_process.terminate }
    2.71 +    catch {
    2.72 +      case exn @ ERROR(_) => system_output("Failed to terminate prover process: " + exn.getMessage)
    2.73 +    }
    2.74    }
    2.75  
    2.76 -  def xml_cache: XML.Cache
    2.77 +  private val process_manager = Simple_Thread.fork("process_manager")
    2.78 +  {
    2.79 +    val (startup_failed, startup_errors) =
    2.80 +    {
    2.81 +      var finished: Option[Boolean] = None
    2.82 +      val result = new StringBuilder(100)
    2.83 +      while (finished.isEmpty && (system_process.stderr.ready || !process_result.is_finished)) {
    2.84 +        while (finished.isEmpty && system_process.stderr.ready) {
    2.85 +          try {
    2.86 +            val c = system_process.stderr.read
    2.87 +            if (c == 2) finished = Some(true)
    2.88 +            else result += c.toChar
    2.89 +          }
    2.90 +          catch { case _: IOException => finished = Some(false) }
    2.91 +        }
    2.92 +        Thread.sleep(10)
    2.93 +      }
    2.94 +      (finished.isEmpty || !finished.get, result.toString.trim)
    2.95 +    }
    2.96 +    if (startup_errors != "") system_output(startup_errors)
    2.97 +
    2.98 +    if (startup_failed) {
    2.99 +      terminate_process()
   2.100 +      process_result.join
   2.101 +      exit_message(127)
   2.102 +    }
   2.103 +    else {
   2.104 +      val (command_stream, message_stream) = system_process.channel.rendezvous()
   2.105 +
   2.106 +      command_input_init(command_stream)
   2.107 +      val stdout = physical_output(false)
   2.108 +      val stderr = physical_output(true)
   2.109 +      val message = message_output(message_stream)
   2.110 +
   2.111 +      val rc = process_result.join
   2.112 +      system_output("process terminated")
   2.113 +      command_input_close()
   2.114 +      for (thread <- List(stdout, stderr, message)) thread.join
   2.115 +      system_output("process_manager terminated")
   2.116 +      exit_message(rc)
   2.117 +    }
   2.118 +    system_process.channel.accepted()
   2.119 +  }
   2.120 +
   2.121 +
   2.122 +  /* management methods */
   2.123 +
   2.124 +  def join() { process_manager.join() }
   2.125 +
   2.126 +  def terminate()
   2.127 +  {
   2.128 +    command_input_close()
   2.129 +    system_output("Terminating prover process")
   2.130 +    terminate_process()
   2.131 +  }
   2.132 +
   2.133 +
   2.134 +
   2.135 +  /** process streams **/
   2.136 +
   2.137 +  /* command input */
   2.138 +
   2.139 +  private var command_input: Option[Consumer_Thread[List[Bytes]]] = None
   2.140 +
   2.141 +  private def command_input_close(): Unit = command_input.foreach(_.shutdown)
   2.142 +
   2.143 +  private def command_input_init(raw_stream: OutputStream)
   2.144 +  {
   2.145 +    val name = "command_input"
   2.146 +    val stream = new BufferedOutputStream(raw_stream)
   2.147 +    command_input =
   2.148 +      Some(
   2.149 +        Consumer_Thread.fork(name)(
   2.150 +          consume =
   2.151 +            {
   2.152 +              case chunks =>
   2.153 +                try {
   2.154 +                  Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream)
   2.155 +                  chunks.foreach(_.write(stream))
   2.156 +                  stream.flush
   2.157 +                  true
   2.158 +                }
   2.159 +                catch { case e: IOException => system_output(name + ": " + e.getMessage); false }
   2.160 +            },
   2.161 +          finish = { case () => stream.close; system_output(name + " terminated") }
   2.162 +        )
   2.163 +      )
   2.164 +  }
   2.165  
   2.166  
   2.167 -  /* process management */
   2.168 +  /* physical output */
   2.169 +
   2.170 +  private def physical_output(err: Boolean): Thread =
   2.171 +  {
   2.172 +    val (name, reader, markup) =
   2.173 +      if (err) ("standard_error", system_process.stderr, Markup.STDERR)
   2.174 +      else ("standard_output", system_process.stdout, Markup.STDOUT)
   2.175  
   2.176 -  def join(): Unit
   2.177 -  def terminate(): Unit
   2.178 -
   2.179 -  def protocol_command_bytes(name: String, args: Bytes*): Unit
   2.180 -  def protocol_command(name: String, args: String*): Unit
   2.181 +    Simple_Thread.fork(name) {
   2.182 +      try {
   2.183 +        var result = new StringBuilder(100)
   2.184 +        var finished = false
   2.185 +        while (!finished) {
   2.186 +          //{{{
   2.187 +          var c = -1
   2.188 +          var done = false
   2.189 +          while (!done && (result.length == 0 || reader.ready)) {
   2.190 +            c = reader.read
   2.191 +            if (c >= 0) result.append(c.asInstanceOf[Char])
   2.192 +            else done = true
   2.193 +          }
   2.194 +          if (result.length > 0) {
   2.195 +            output(markup, Nil, List(XML.Text(decode(result.toString))))
   2.196 +            result.length = 0
   2.197 +          }
   2.198 +          else {
   2.199 +            reader.close
   2.200 +            finished = true
   2.201 +          }
   2.202 +          //}}}
   2.203 +        }
   2.204 +      }
   2.205 +      catch { case e: IOException => system_output(name + ": " + e.getMessage) }
   2.206 +      system_output(name + " terminated")
   2.207 +    }
   2.208 +  }
   2.209  
   2.210  
   2.211 -  /* PIDE protocol commands */
   2.212 +  /* message output */
   2.213 +
   2.214 +  private def message_output(stream: InputStream): Thread =
   2.215 +  {
   2.216 +    class EOF extends Exception
   2.217 +    class Protocol_Error(msg: String) extends Exception(msg)
   2.218 +
   2.219 +    val name = "message_output"
   2.220 +    Simple_Thread.fork(name) {
   2.221 +      val default_buffer = new Array[Byte](65536)
   2.222 +      var c = -1
   2.223  
   2.224 -  def options(opts: Options): Unit
   2.225 +      def read_int(): Int =
   2.226 +      //{{{
   2.227 +      {
   2.228 +        var n = 0
   2.229 +        c = stream.read
   2.230 +        if (c == -1) throw new EOF
   2.231 +        while (48 <= c && c <= 57) {
   2.232 +          n = 10 * n + (c - 48)
   2.233 +          c = stream.read
   2.234 +        }
   2.235 +        if (c != 10)
   2.236 +          throw new Protocol_Error("malformed header: expected integer followed by newline")
   2.237 +        else n
   2.238 +      }
   2.239 +      //}}}
   2.240  
   2.241 -  def define_blob(digest: SHA1.Digest, bytes: Bytes): Unit
   2.242 -  def define_command(command: Command): Unit
   2.243 +      def read_chunk_bytes(): (Array[Byte], Int) =
   2.244 +      //{{{
   2.245 +      {
   2.246 +        val n = read_int()
   2.247 +        val buf =
   2.248 +          if (n <= default_buffer.size) default_buffer
   2.249 +          else new Array[Byte](n)
   2.250 +
   2.251 +        var i = 0
   2.252 +        var m = 0
   2.253 +        do {
   2.254 +          m = stream.read(buf, i, n - i)
   2.255 +          if (m != -1) i += m
   2.256 +        }
   2.257 +        while (m != -1 && n > i)
   2.258 +
   2.259 +        if (i != n)
   2.260 +          throw new Protocol_Error("bad chunk (unexpected EOF after " + i + " of " + n + " bytes)")
   2.261 +
   2.262 +        (buf, n)
   2.263 +      }
   2.264 +      //}}}
   2.265  
   2.266 -  def discontinue_execution(): Unit
   2.267 -  def cancel_exec(id: Document_ID.Exec): Unit
   2.268 +      def read_chunk(): XML.Body =
   2.269 +      {
   2.270 +        val (buf, n) = read_chunk_bytes()
   2.271 +        YXML.parse_body_failsafe(UTF8.decode_chars(decode, buf, 0, n))
   2.272 +      }
   2.273  
   2.274 -  def update(old_id: Document_ID.Version, new_id: Document_ID.Version,
   2.275 -    edits: List[Document.Edit_Command]): Unit
   2.276 -  def remove_versions(versions: List[Document.Version]): Unit
   2.277 +      try {
   2.278 +        do {
   2.279 +          try {
   2.280 +            val header = read_chunk()
   2.281 +            header match {
   2.282 +              case List(XML.Elem(Markup(name, props), Nil)) =>
   2.283 +                val kind = name.intern
   2.284 +                if (kind == Markup.PROTOCOL) {
   2.285 +                  val (buf, n) = read_chunk_bytes()
   2.286 +                  protocol_output(props, Bytes(buf, 0, n))
   2.287 +                }
   2.288 +                else {
   2.289 +                  val body = read_chunk()
   2.290 +                  output(kind, props, body)
   2.291 +                }
   2.292 +              case _ =>
   2.293 +                read_chunk()
   2.294 +                throw new Protocol_Error("bad header: " + header.toString)
   2.295 +            }
   2.296 +          }
   2.297 +          catch { case _: EOF => }
   2.298 +        }
   2.299 +        while (c != -1)
   2.300 +      }
   2.301 +      catch {
   2.302 +        case e: IOException => system_output("Cannot read message:\n" + e.getMessage)
   2.303 +        case e: Protocol_Error => system_output("Malformed message:\n" + e.getMessage)
   2.304 +      }
   2.305 +      stream.close
   2.306  
   2.307 -  def dialog_result(serial: Long, result: String): Unit
   2.308 +      system_output(name + " terminated")
   2.309 +    }
   2.310 +  }
   2.311 +
   2.312 +
   2.313 +
   2.314 +  /** protocol commands **/
   2.315 +
   2.316 +  def protocol_command_bytes(name: String, args: Bytes*): Unit =
   2.317 +    command_input match {
   2.318 +      case Some(thread) => thread.send(Bytes(name) :: args.toList)
   2.319 +      case None => error("Uninitialized command input thread")
   2.320 +    }
   2.321 +
   2.322 +  def protocol_command(name: String, args: String*)
   2.323 +  {
   2.324 +    receiver(new Prover.Input(name, args.toList))
   2.325 +    protocol_command_bytes(name, args.map(Bytes(_)): _*)
   2.326 +  }
   2.327  }
   2.328  
     3.1 --- a/src/Pure/PIDE/resources.scala	Tue Aug 12 17:28:07 2014 +0200
     3.2 +++ b/src/Pure/PIDE/resources.scala	Tue Aug 12 18:36:43 2014 +0200
     3.3 @@ -126,6 +126,6 @@
     3.4    /* prover process */
     3.5  
     3.6    def start_prover(receiver: Prover.Message => Unit, name: String, args: List[String]): Prover =
     3.7 -    new Isabelle_Process(receiver, args) with Protocol
     3.8 +    new Isabelle_Process(receiver, args)
     3.9  }
    3.10  
     4.1 --- a/src/Pure/System/isabelle_process.ML	Tue Aug 12 17:28:07 2014 +0200
     4.2 +++ b/src/Pure/System/isabelle_process.ML	Tue Aug 12 18:36:43 2014 +0200
     4.3 @@ -1,8 +1,7 @@
     4.4  (*  Title:      Pure/System/isabelle_process.ML
     4.5      Author:     Makarius
     4.6  
     4.7 -Isabelle process wrapper, based on private fifos for maximum
     4.8 -robustness and performance, or local socket for maximum portability.
     4.9 +Isabelle process wrapper.
    4.10  *)
    4.11  
    4.12  signature ISABELLE_PROCESS =
     5.1 --- a/src/Pure/System/isabelle_process.scala	Tue Aug 12 17:28:07 2014 +0200
     5.2 +++ b/src/Pure/System/isabelle_process.scala	Tue Aug 12 18:36:43 2014 +0200
     5.3 @@ -1,320 +1,31 @@
     5.4  /*  Title:      Pure/System/isabelle_process.scala
     5.5      Author:     Makarius
     5.6 -    Options:    :folding=explicit:collapseFolds=1:
     5.7  
     5.8 -Isabelle process management -- always reactive due to multi-threaded I/O.
     5.9 +Isabelle process wrapper.
    5.10  */
    5.11  
    5.12  package isabelle
    5.13  
    5.14  
    5.15 -import java.io.{InputStream, OutputStream, BufferedOutputStream, IOException}
    5.16 -
    5.17 -
    5.18  class Isabelle_Process(
    5.19    receiver: Prover.Message => Unit = Console.println(_),
    5.20 -  prover_args: List[String] = Nil)
    5.21 +  prover_args: List[String] = Nil) extends Prover(receiver,
    5.22 +    {
    5.23 +      val system_channel = System_Channel()
    5.24 +      try {
    5.25 +        val cmdline =
    5.26 +          Isabelle_System.getenv_strict("ISABELLE_PROCESS") ::
    5.27 +            (system_channel.prover_args ::: prover_args)
    5.28 +        val process =
    5.29 +          new Isabelle_System.Managed_Process(null, null, false, cmdline: _*) with
    5.30 +            Prover.System_Process { def channel = system_channel }
    5.31 +        process.stdin.close
    5.32 +        process
    5.33 +      }
    5.34 +      catch { case exn @ ERROR(_) => system_channel.accepted(); throw(exn) }
    5.35 +    })
    5.36  {
    5.37 -  /* system process -- default implementation */
    5.38 -
    5.39 -  protected val system_process: Prover.System_Process =
    5.40 -  {
    5.41 -    val system_channel = System_Channel()
    5.42 -    try {
    5.43 -      val cmdline =
    5.44 -        Isabelle_System.getenv_strict("ISABELLE_PROCESS") ::
    5.45 -          (system_channel.prover_args ::: prover_args)
    5.46 -      val process =
    5.47 -        new Isabelle_System.Managed_Process(null, null, false, cmdline: _*) with
    5.48 -          Prover.System_Process { def channel = system_channel }
    5.49 -      process.stdin.close
    5.50 -      process
    5.51 -    }
    5.52 -    catch { case exn @ ERROR(_) => system_channel.accepted(); throw(exn) }
    5.53 -  }
    5.54 -
    5.55 -
    5.56 -  /* text and tree data */
    5.57 -
    5.58    def encode(s: String): String = Symbol.encode(s)
    5.59    def decode(s: String): String = Symbol.decode(s)
    5.60 -
    5.61 -  val xml_cache: XML.Cache = new XML.Cache()
    5.62 -
    5.63 -
    5.64 -  /* output */
    5.65 -
    5.66 -  private def system_output(text: String)
    5.67 -  {
    5.68 -    receiver(new Prover.Output(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text)))))
    5.69 -  }
    5.70 -
    5.71 -  private def protocol_output(props: Properties.T, bytes: Bytes)
    5.72 -  {
    5.73 -    receiver(new Prover.Protocol_Output(props, bytes))
    5.74 -  }
    5.75 -
    5.76 -  private def output(kind: String, props: Properties.T, body: XML.Body)
    5.77 -  {
    5.78 -    if (kind == Markup.INIT) system_process.channel.accepted()
    5.79 -
    5.80 -    val main = XML.Elem(Markup(kind, props), Protocol.clean_message(body))
    5.81 -    val reports = Protocol.message_reports(props, body)
    5.82 -    for (msg <- main :: reports) receiver(new Prover.Output(xml_cache.elem(msg)))
    5.83 -  }
    5.84 -
    5.85 -  private def exit_message(rc: Int)
    5.86 -  {
    5.87 -    output(Markup.EXIT, Markup.Return_Code(rc), List(XML.Text("Return code: " + rc.toString)))
    5.88 -  }
    5.89 -
    5.90 -
    5.91 -
    5.92 -  /** process manager **/
    5.93 -
    5.94 -  private val (_, process_result) =
    5.95 -    Simple_Thread.future("process_result") { system_process.join }
    5.96 -
    5.97 -  private def terminate_process()
    5.98 -  {
    5.99 -    try { system_process.terminate }
   5.100 -    catch {
   5.101 -      case exn @ ERROR(_) => system_output("Failed to terminate prover process: " + exn.getMessage)
   5.102 -    }
   5.103 -  }
   5.104 -
   5.105 -  private val process_manager = Simple_Thread.fork("process_manager")
   5.106 -  {
   5.107 -    val (startup_failed, startup_errors) =
   5.108 -    {
   5.109 -      var finished: Option[Boolean] = None
   5.110 -      val result = new StringBuilder(100)
   5.111 -      while (finished.isEmpty && (system_process.stderr.ready || !process_result.is_finished)) {
   5.112 -        while (finished.isEmpty && system_process.stderr.ready) {
   5.113 -          try {
   5.114 -            val c = system_process.stderr.read
   5.115 -            if (c == 2) finished = Some(true)
   5.116 -            else result += c.toChar
   5.117 -          }
   5.118 -          catch { case _: IOException => finished = Some(false) }
   5.119 -        }
   5.120 -        Thread.sleep(10)
   5.121 -      }
   5.122 -      (finished.isEmpty || !finished.get, result.toString.trim)
   5.123 -    }
   5.124 -    if (startup_errors != "") system_output(startup_errors)
   5.125 -
   5.126 -    if (startup_failed) {
   5.127 -      terminate_process()
   5.128 -      process_result.join
   5.129 -      exit_message(127)
   5.130 -    }
   5.131 -    else {
   5.132 -      val (command_stream, message_stream) = system_process.channel.rendezvous()
   5.133 -
   5.134 -      command_input_init(command_stream)
   5.135 -      val stdout = physical_output(false)
   5.136 -      val stderr = physical_output(true)
   5.137 -      val message = message_output(message_stream)
   5.138 -
   5.139 -      val rc = process_result.join
   5.140 -      system_output("process terminated")
   5.141 -      command_input_close()
   5.142 -      for (thread <- List(stdout, stderr, message)) thread.join
   5.143 -      system_output("process_manager terminated")
   5.144 -      exit_message(rc)
   5.145 -    }
   5.146 -    system_process.channel.accepted()
   5.147 -  }
   5.148 -
   5.149 -
   5.150 -  /* management methods */
   5.151 -
   5.152 -  def join() { process_manager.join() }
   5.153 -
   5.154 -  def terminate()
   5.155 -  {
   5.156 -    command_input_close()
   5.157 -    system_output("Terminating prover process")
   5.158 -    terminate_process()
   5.159 -  }
   5.160 -
   5.161 -
   5.162 -
   5.163 -  /** process streams **/
   5.164 -
   5.165 -  /* command input */
   5.166 -
   5.167 -  private var command_input: Option[Consumer_Thread[List[Bytes]]] = None
   5.168 -
   5.169 -  private def command_input_close(): Unit = command_input.foreach(_.shutdown)
   5.170 -
   5.171 -  private def command_input_init(raw_stream: OutputStream)
   5.172 -  {
   5.173 -    val name = "command_input"
   5.174 -    val stream = new BufferedOutputStream(raw_stream)
   5.175 -    command_input =
   5.176 -      Some(
   5.177 -        Consumer_Thread.fork(name)(
   5.178 -          consume =
   5.179 -            {
   5.180 -              case chunks =>
   5.181 -                try {
   5.182 -                  Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream)
   5.183 -                  chunks.foreach(_.write(stream))
   5.184 -                  stream.flush
   5.185 -                  true
   5.186 -                }
   5.187 -                catch { case e: IOException => system_output(name + ": " + e.getMessage); false }
   5.188 -            },
   5.189 -          finish = { case () => stream.close; system_output(name + " terminated") }
   5.190 -        )
   5.191 -      )
   5.192 -  }
   5.193 -
   5.194 -
   5.195 -  /* physical output */
   5.196 +}
   5.197  
   5.198 -  private def physical_output(err: Boolean): Thread =
   5.199 -  {
   5.200 -    val (name, reader, markup) =
   5.201 -      if (err) ("standard_error", system_process.stderr, Markup.STDERR)
   5.202 -      else ("standard_output", system_process.stdout, Markup.STDOUT)
   5.203 -
   5.204 -    Simple_Thread.fork(name) {
   5.205 -      try {
   5.206 -        var result = new StringBuilder(100)
   5.207 -        var finished = false
   5.208 -        while (!finished) {
   5.209 -          //{{{
   5.210 -          var c = -1
   5.211 -          var done = false
   5.212 -          while (!done && (result.length == 0 || reader.ready)) {
   5.213 -            c = reader.read
   5.214 -            if (c >= 0) result.append(c.asInstanceOf[Char])
   5.215 -            else done = true
   5.216 -          }
   5.217 -          if (result.length > 0) {
   5.218 -            output(markup, Nil, List(XML.Text(decode(result.toString))))
   5.219 -            result.length = 0
   5.220 -          }
   5.221 -          else {
   5.222 -            reader.close
   5.223 -            finished = true
   5.224 -          }
   5.225 -          //}}}
   5.226 -        }
   5.227 -      }
   5.228 -      catch { case e: IOException => system_output(name + ": " + e.getMessage) }
   5.229 -      system_output(name + " terminated")
   5.230 -    }
   5.231 -  }
   5.232 -
   5.233 -
   5.234 -  /* message output */
   5.235 -
   5.236 -  private def message_output(stream: InputStream): Thread =
   5.237 -  {
   5.238 -    class EOF extends Exception
   5.239 -    class Protocol_Error(msg: String) extends Exception(msg)
   5.240 -
   5.241 -    val name = "message_output"
   5.242 -    Simple_Thread.fork(name) {
   5.243 -      val default_buffer = new Array[Byte](65536)
   5.244 -      var c = -1
   5.245 -
   5.246 -      def read_int(): Int =
   5.247 -      //{{{
   5.248 -      {
   5.249 -        var n = 0
   5.250 -        c = stream.read
   5.251 -        if (c == -1) throw new EOF
   5.252 -        while (48 <= c && c <= 57) {
   5.253 -          n = 10 * n + (c - 48)
   5.254 -          c = stream.read
   5.255 -        }
   5.256 -        if (c != 10)
   5.257 -          throw new Protocol_Error("malformed header: expected integer followed by newline")
   5.258 -        else n
   5.259 -      }
   5.260 -      //}}}
   5.261 -
   5.262 -      def read_chunk_bytes(): (Array[Byte], Int) =
   5.263 -      //{{{
   5.264 -      {
   5.265 -        val n = read_int()
   5.266 -        val buf =
   5.267 -          if (n <= default_buffer.size) default_buffer
   5.268 -          else new Array[Byte](n)
   5.269 -
   5.270 -        var i = 0
   5.271 -        var m = 0
   5.272 -        do {
   5.273 -          m = stream.read(buf, i, n - i)
   5.274 -          if (m != -1) i += m
   5.275 -        }
   5.276 -        while (m != -1 && n > i)
   5.277 -
   5.278 -        if (i != n)
   5.279 -          throw new Protocol_Error("bad chunk (unexpected EOF after " + i + " of " + n + " bytes)")
   5.280 -
   5.281 -        (buf, n)
   5.282 -      }
   5.283 -      //}}}
   5.284 -
   5.285 -      def read_chunk(): XML.Body =
   5.286 -      {
   5.287 -        val (buf, n) = read_chunk_bytes()
   5.288 -        YXML.parse_body_failsafe(UTF8.decode_chars(decode, buf, 0, n))
   5.289 -      }
   5.290 -
   5.291 -      try {
   5.292 -        do {
   5.293 -          try {
   5.294 -            val header = read_chunk()
   5.295 -            header match {
   5.296 -              case List(XML.Elem(Markup(name, props), Nil)) =>
   5.297 -                val kind = name.intern
   5.298 -                if (kind == Markup.PROTOCOL) {
   5.299 -                  val (buf, n) = read_chunk_bytes()
   5.300 -                  protocol_output(props, Bytes(buf, 0, n))
   5.301 -                }
   5.302 -                else {
   5.303 -                  val body = read_chunk()
   5.304 -                  output(kind, props, body)
   5.305 -                }
   5.306 -              case _ =>
   5.307 -                read_chunk()
   5.308 -                throw new Protocol_Error("bad header: " + header.toString)
   5.309 -            }
   5.310 -          }
   5.311 -          catch { case _: EOF => }
   5.312 -        }
   5.313 -        while (c != -1)
   5.314 -      }
   5.315 -      catch {
   5.316 -        case e: IOException => system_output("Cannot read message:\n" + e.getMessage)
   5.317 -        case e: Protocol_Error => system_output("Malformed message:\n" + e.getMessage)
   5.318 -      }
   5.319 -      stream.close
   5.320 -
   5.321 -      system_output(name + " terminated")
   5.322 -    }
   5.323 -  }
   5.324 -
   5.325 -
   5.326 -
   5.327 -  /** protocol commands **/
   5.328 -
   5.329 -  def protocol_command_bytes(name: String, args: Bytes*): Unit =
   5.330 -    command_input match {
   5.331 -      case Some(thread) => thread.send(Bytes(name) :: args.toList)
   5.332 -      case None => error("Uninitialized command input thread")
   5.333 -    }
   5.334 -
   5.335 -  def protocol_command(name: String, args: String*)
   5.336 -  {
   5.337 -    receiver(new Prover.Input(name, args.toList))
   5.338 -    protocol_command_bytes(name, args.map(Bytes(_)): _*)
   5.339 -  }
   5.340 -}