src/Pure/System/isabelle_process.scala
changeset 56700 c84bf6f63dfe
parent 56697 76b38be47feb
child 56703 2d0ca179e749
equal deleted inserted replaced
56699:60ad80f5cb62 56700:c84bf6f63dfe
    50   }
    50   }
    51 
    51 
    52   private def exit_message(rc: Int)
    52   private def exit_message(rc: Int)
    53   {
    53   {
    54     output(Markup.EXIT, Markup.Return_Code(rc), List(XML.Text("Return code: " + rc.toString)))
    54     output(Markup.EXIT, Markup.Return_Code(rc), List(XML.Text("Return code: " + rc.toString)))
    55   }
       
    56 
       
    57 
       
    58   /* command input actor */
       
    59 
       
    60   @volatile private var command_input: (Thread, Actor) = null
       
    61 
       
    62   private case class Input_Chunks(chunks: List[Bytes])
       
    63 
       
    64   private case object Close
       
    65   private def close_input()
       
    66   {
       
    67     if (command_input != null && command_input._1.isAlive) {
       
    68       command_input._2 ! Close
       
    69       command_input._1.join
       
    70     }
       
    71   }
    55   }
    72 
    56 
    73 
    57 
    74 
    58 
    75   /** process manager **/
    59   /** process manager **/
   124       process_result.join
   108       process_result.join
   125     }
   109     }
   126     else {
   110     else {
   127       val (command_stream, message_stream) = system_channel.rendezvous()
   111       val (command_stream, message_stream) = system_channel.rendezvous()
   128 
   112 
       
   113       command_input_init(command_stream)
   129       val stdout = physical_output(false)
   114       val stdout = physical_output(false)
   130       val stderr = physical_output(true)
   115       val stderr = physical_output(true)
   131       val message = message_output(message_stream)
   116       val message = message_output(message_stream)
   132 
   117 
   133       command_input = input_actor(command_stream)
       
   134 
       
   135       val rc = process_result.join
   118       val rc = process_result.join
   136       system_output("process terminated")
   119       system_output("process terminated")
   137       close_input()
   120       command_input_close()
   138       for (thread <- List(stdout, stderr, message)) thread.join
   121       for (thread <- List(stdout, stderr, message)) thread.join
   139       system_output("process_manager terminated")
   122       system_output("process_manager terminated")
   140       exit_message(rc)
   123       exit_message(rc)
   141     }
   124     }
   142     system_channel.accepted()
   125     system_channel.accepted()
   153     catch { case e: IOException => system_output("Failed to interrupt Isabelle: " + e.getMessage) }
   136     catch { case e: IOException => system_output("Failed to interrupt Isabelle: " + e.getMessage) }
   154   }
   137   }
   155 
   138 
   156   def terminate()
   139   def terminate()
   157   {
   140   {
   158     close_input()
   141     command_input_close()
   159     system_output("Terminating Isabelle process")
   142     system_output("Terminating Isabelle process")
   160     terminate_process()
   143     terminate_process()
   161   }
   144   }
   162 
   145 
   163 
   146 
   164 
   147 
   165   /** stream actors **/
   148   /** stream actors **/
       
   149 
       
   150   /* command input */
       
   151 
       
   152   private var command_input: Option[Consumer_Thread[List[Bytes]]] = None
       
   153 
       
   154   private def command_input_close(): Unit = command_input.foreach(_.shutdown)
       
   155 
       
   156   private def command_input_init(raw_stream: OutputStream)
       
   157   {
       
   158     val name = "command_input"
       
   159     val stream = new BufferedOutputStream(raw_stream)
       
   160     command_input =
       
   161       Some(
       
   162         Consumer_Thread.fork(name)(
       
   163           consume =
       
   164             {
       
   165               case chunks =>
       
   166                 try {
       
   167                   Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream)
       
   168                   chunks.foreach(_.write(stream))
       
   169                   stream.flush
       
   170                   true
       
   171                 }
       
   172                 catch { case e: IOException => system_output(name + ": " + e.getMessage); false }
       
   173             },
       
   174           finish = { case () => stream.close; system_output(name + " terminated") }
       
   175         )
       
   176       )
       
   177   }
       
   178 
   166 
   179 
   167   /* physical output */
   180   /* physical output */
   168 
   181 
   169   private def physical_output(err: Boolean): Thread =
   182   private def physical_output(err: Boolean): Thread =
   170   {
   183   {
   190             result.length = 0
   203             result.length = 0
   191           }
   204           }
   192           else {
   205           else {
   193             reader.close
   206             reader.close
   194             finished = true
   207             finished = true
   195           }
       
   196           //}}}
       
   197         }
       
   198       }
       
   199       catch { case e: IOException => system_output(name + ": " + e.getMessage) }
       
   200       system_output(name + " terminated")
       
   201     }
       
   202   }
       
   203 
       
   204 
       
   205   /* command input */
       
   206 
       
   207   private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
       
   208   {
       
   209     val name = "command_input"
       
   210     Simple_Thread.actor(name) {
       
   211       try {
       
   212         val stream = new BufferedOutputStream(raw_stream)
       
   213         var finished = false
       
   214         while (!finished) {
       
   215           //{{{
       
   216           receive {
       
   217             case Input_Chunks(chunks) =>
       
   218               Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream)
       
   219               chunks.foreach(_.write(stream))
       
   220               stream.flush
       
   221             case Close =>
       
   222               stream.close
       
   223               finished = true
       
   224             case bad => System.err.println(name + ": ignoring bad message " + bad)
       
   225           }
   208           }
   226           //}}}
   209           //}}}
   227         }
   210         }
   228       }
   211       }
   229       catch { case e: IOException => system_output(name + ": " + e.getMessage) }
   212       catch { case e: IOException => system_output(name + ": " + e.getMessage) }
   326 
   309 
   327 
   310 
   328   /** protocol commands **/
   311   /** protocol commands **/
   329 
   312 
   330   def protocol_command_bytes(name: String, args: Bytes*): Unit =
   313   def protocol_command_bytes(name: String, args: Bytes*): Unit =
   331     command_input._2 ! Input_Chunks(Bytes(name) :: args.toList)
   314     command_input match {
       
   315       case Some(thread) => thread.send(Bytes(name) :: args.toList)
       
   316       case None => error("Uninitialized command input thread")
       
   317     }
   332 
   318 
   333   def protocol_command(name: String, args: String*)
   319   def protocol_command(name: String, args: String*)
   334   {
   320   {
   335     receiver(new Prover.Input(name, args.toList))
   321     receiver(new Prover.Input(name, args.toList))
   336     protocol_command_bytes(name, args.map(Bytes(_)): _*)
   322     protocol_command_bytes(name, args.map(Bytes(_)): _*)