clarified command_input: Consumer_Thread;
authorwenzelm
Thu Apr 24 15:02:13 2014 +0200 (2014-04-24)
changeset 56700c84bf6f63dfe
parent 56699 60ad80f5cb62
child 56701 ac5b66fa2a56
clarified command_input: Consumer_Thread;
src/Pure/System/isabelle_process.scala
     1.1 --- a/src/Pure/System/isabelle_process.scala	Thu Apr 24 14:59:46 2014 +0200
     1.2 +++ b/src/Pure/System/isabelle_process.scala	Thu Apr 24 15:02:13 2014 +0200
     1.3 @@ -55,22 +55,6 @@
     1.4    }
     1.5  
     1.6  
     1.7 -  /* command input actor */
     1.8 -
     1.9 -  @volatile private var command_input: (Thread, Actor) = null
    1.10 -
    1.11 -  private case class Input_Chunks(chunks: List[Bytes])
    1.12 -
    1.13 -  private case object Close
    1.14 -  private def close_input()
    1.15 -  {
    1.16 -    if (command_input != null && command_input._1.isAlive) {
    1.17 -      command_input._2 ! Close
    1.18 -      command_input._1.join
    1.19 -    }
    1.20 -  }
    1.21 -
    1.22 -
    1.23  
    1.24    /** process manager **/
    1.25  
    1.26 @@ -126,15 +110,14 @@
    1.27      else {
    1.28        val (command_stream, message_stream) = system_channel.rendezvous()
    1.29  
    1.30 +      command_input_init(command_stream)
    1.31        val stdout = physical_output(false)
    1.32        val stderr = physical_output(true)
    1.33        val message = message_output(message_stream)
    1.34  
    1.35 -      command_input = input_actor(command_stream)
    1.36 -
    1.37        val rc = process_result.join
    1.38        system_output("process terminated")
    1.39 -      close_input()
    1.40 +      command_input_close()
    1.41        for (thread <- List(stdout, stderr, message)) thread.join
    1.42        system_output("process_manager terminated")
    1.43        exit_message(rc)
    1.44 @@ -155,7 +138,7 @@
    1.45  
    1.46    def terminate()
    1.47    {
    1.48 -    close_input()
    1.49 +    command_input_close()
    1.50      system_output("Terminating Isabelle process")
    1.51      terminate_process()
    1.52    }
    1.53 @@ -164,6 +147,36 @@
    1.54  
    1.55    /** stream actors **/
    1.56  
    1.57 +  /* command input */
    1.58 +
    1.59 +  private var command_input: Option[Consumer_Thread[List[Bytes]]] = None
    1.60 +
    1.61 +  private def command_input_close(): Unit = command_input.foreach(_.shutdown)
    1.62 +
    1.63 +  private def command_input_init(raw_stream: OutputStream)
    1.64 +  {
    1.65 +    val name = "command_input"
    1.66 +    val stream = new BufferedOutputStream(raw_stream)
    1.67 +    command_input =
    1.68 +      Some(
    1.69 +        Consumer_Thread.fork(name)(
    1.70 +          consume =
    1.71 +            {
    1.72 +              case chunks =>
    1.73 +                try {
    1.74 +                  Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream)
    1.75 +                  chunks.foreach(_.write(stream))
    1.76 +                  stream.flush
    1.77 +                  true
    1.78 +                }
    1.79 +                catch { case e: IOException => system_output(name + ": " + e.getMessage); false }
    1.80 +            },
    1.81 +          finish = { case () => stream.close; system_output(name + " terminated") }
    1.82 +        )
    1.83 +      )
    1.84 +  }
    1.85 +
    1.86 +
    1.87    /* physical output */
    1.88  
    1.89    private def physical_output(err: Boolean): Thread =
    1.90 @@ -202,36 +215,6 @@
    1.91    }
    1.92  
    1.93  
    1.94 -  /* command input */
    1.95 -
    1.96 -  private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
    1.97 -  {
    1.98 -    val name = "command_input"
    1.99 -    Simple_Thread.actor(name) {
   1.100 -      try {
   1.101 -        val stream = new BufferedOutputStream(raw_stream)
   1.102 -        var finished = false
   1.103 -        while (!finished) {
   1.104 -          //{{{
   1.105 -          receive {
   1.106 -            case Input_Chunks(chunks) =>
   1.107 -              Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream)
   1.108 -              chunks.foreach(_.write(stream))
   1.109 -              stream.flush
   1.110 -            case Close =>
   1.111 -              stream.close
   1.112 -              finished = true
   1.113 -            case bad => System.err.println(name + ": ignoring bad message " + bad)
   1.114 -          }
   1.115 -          //}}}
   1.116 -        }
   1.117 -      }
   1.118 -      catch { case e: IOException => system_output(name + ": " + e.getMessage) }
   1.119 -      system_output(name + " terminated")
   1.120 -    }
   1.121 -  }
   1.122 -
   1.123 -
   1.124    /* message output */
   1.125  
   1.126    private def message_output(stream: InputStream): Thread =
   1.127 @@ -328,7 +311,10 @@
   1.128    /** protocol commands **/
   1.129  
   1.130    def protocol_command_bytes(name: String, args: Bytes*): Unit =
   1.131 -    command_input._2 ! Input_Chunks(Bytes(name) :: args.toList)
   1.132 +    command_input match {
   1.133 +      case Some(thread) => thread.send(Bytes(name) :: args.toList)
   1.134 +      case None => error("Uninitialized command input thread")
   1.135 +    }
   1.136  
   1.137    def protocol_command(name: String, args: String*)
   1.138    {