equal
deleted
inserted
replaced
50 } |
50 } |
51 |
51 |
52 private def exit_message(rc: Int) |
52 private def exit_message(rc: Int) |
53 { |
53 { |
54 output(Markup.EXIT, Markup.Return_Code(rc), List(XML.Text("Return code: " + rc.toString))) |
54 output(Markup.EXIT, Markup.Return_Code(rc), List(XML.Text("Return code: " + rc.toString))) |
55 } |
|
56 |
|
57 |
|
58 /* command input actor */ |
|
59 |
|
60 @volatile private var command_input: (Thread, Actor) = null |
|
61 |
|
62 private case class Input_Chunks(chunks: List[Bytes]) |
|
63 |
|
64 private case object Close |
|
65 private def close_input() |
|
66 { |
|
67 if (command_input != null && command_input._1.isAlive) { |
|
68 command_input._2 ! Close |
|
69 command_input._1.join |
|
70 } |
|
71 } |
55 } |
72 |
56 |
73 |
57 |
74 |
58 |
75 /** process manager **/ |
59 /** process manager **/ |
124 process_result.join |
108 process_result.join |
125 } |
109 } |
126 else { |
110 else { |
127 val (command_stream, message_stream) = system_channel.rendezvous() |
111 val (command_stream, message_stream) = system_channel.rendezvous() |
128 |
112 |
|
113 command_input_init(command_stream) |
129 val stdout = physical_output(false) |
114 val stdout = physical_output(false) |
130 val stderr = physical_output(true) |
115 val stderr = physical_output(true) |
131 val message = message_output(message_stream) |
116 val message = message_output(message_stream) |
132 |
117 |
133 command_input = input_actor(command_stream) |
|
134 |
|
135 val rc = process_result.join |
118 val rc = process_result.join |
136 system_output("process terminated") |
119 system_output("process terminated") |
137 close_input() |
120 command_input_close() |
138 for (thread <- List(stdout, stderr, message)) thread.join |
121 for (thread <- List(stdout, stderr, message)) thread.join |
139 system_output("process_manager terminated") |
122 system_output("process_manager terminated") |
140 exit_message(rc) |
123 exit_message(rc) |
141 } |
124 } |
142 system_channel.accepted() |
125 system_channel.accepted() |
153 catch { case e: IOException => system_output("Failed to interrupt Isabelle: " + e.getMessage) } |
136 catch { case e: IOException => system_output("Failed to interrupt Isabelle: " + e.getMessage) } |
154 } |
137 } |
155 |
138 |
156 def terminate() |
139 def terminate() |
157 { |
140 { |
158 close_input() |
141 command_input_close() |
159 system_output("Terminating Isabelle process") |
142 system_output("Terminating Isabelle process") |
160 terminate_process() |
143 terminate_process() |
161 } |
144 } |
162 |
145 |
163 |
146 |
164 |
147 |
165 /** stream actors **/ |
148 /** stream actors **/ |
|
149 |
|
150 /* command input */ |
|
151 |
|
152 private var command_input: Option[Consumer_Thread[List[Bytes]]] = None |
|
153 |
|
154 private def command_input_close(): Unit = command_input.foreach(_.shutdown) |
|
155 |
|
156 private def command_input_init(raw_stream: OutputStream) |
|
157 { |
|
158 val name = "command_input" |
|
159 val stream = new BufferedOutputStream(raw_stream) |
|
160 command_input = |
|
161 Some( |
|
162 Consumer_Thread.fork(name)( |
|
163 consume = |
|
164 { |
|
165 case chunks => |
|
166 try { |
|
167 Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream) |
|
168 chunks.foreach(_.write(stream)) |
|
169 stream.flush |
|
170 true |
|
171 } |
|
172 catch { case e: IOException => system_output(name + ": " + e.getMessage); false } |
|
173 }, |
|
174 finish = { case () => stream.close; system_output(name + " terminated") } |
|
175 ) |
|
176 ) |
|
177 } |
|
178 |
166 |
179 |
167 /* physical output */ |
180 /* physical output */ |
168 |
181 |
169 private def physical_output(err: Boolean): Thread = |
182 private def physical_output(err: Boolean): Thread = |
170 { |
183 { |
190 result.length = 0 |
203 result.length = 0 |
191 } |
204 } |
192 else { |
205 else { |
193 reader.close |
206 reader.close |
194 finished = true |
207 finished = true |
195 } |
|
196 //}}} |
|
197 } |
|
198 } |
|
199 catch { case e: IOException => system_output(name + ": " + e.getMessage) } |
|
200 system_output(name + " terminated") |
|
201 } |
|
202 } |
|
203 |
|
204 |
|
205 /* command input */ |
|
206 |
|
207 private def input_actor(raw_stream: OutputStream): (Thread, Actor) = |
|
208 { |
|
209 val name = "command_input" |
|
210 Simple_Thread.actor(name) { |
|
211 try { |
|
212 val stream = new BufferedOutputStream(raw_stream) |
|
213 var finished = false |
|
214 while (!finished) { |
|
215 //{{{ |
|
216 receive { |
|
217 case Input_Chunks(chunks) => |
|
218 Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream) |
|
219 chunks.foreach(_.write(stream)) |
|
220 stream.flush |
|
221 case Close => |
|
222 stream.close |
|
223 finished = true |
|
224 case bad => System.err.println(name + ": ignoring bad message " + bad) |
|
225 } |
208 } |
226 //}}} |
209 //}}} |
227 } |
210 } |
228 } |
211 } |
229 catch { case e: IOException => system_output(name + ": " + e.getMessage) } |
212 catch { case e: IOException => system_output(name + ": " + e.getMessage) } |
326 |
309 |
327 |
310 |
328 /** protocol commands **/ |
311 /** protocol commands **/ |
329 |
312 |
330 def protocol_command_bytes(name: String, args: Bytes*): Unit = |
313 def protocol_command_bytes(name: String, args: Bytes*): Unit = |
331 command_input._2 ! Input_Chunks(Bytes(name) :: args.toList) |
314 command_input match { |
|
315 case Some(thread) => thread.send(Bytes(name) :: args.toList) |
|
316 case None => error("Uninitialized command input thread") |
|
317 } |
332 |
318 |
333 def protocol_command(name: String, args: String*) |
319 def protocol_command(name: String, args: String*) |
334 { |
320 { |
335 receiver(new Prover.Input(name, args.toList)) |
321 receiver(new Prover.Input(name, args.toList)) |
336 protocol_command_bytes(name, args.map(Bytes(_)): _*) |
322 protocol_command_bytes(name, args.map(Bytes(_)): _*) |