180 val all_messages = new Session.Outlet[Prover.Message](dispatcher) // potential bottle-neck |
180 val all_messages = new Session.Outlet[Prover.Message](dispatcher) // potential bottle-neck |
181 val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher) |
181 val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher) |
182 |
182 |
183 |
183 |
184 |
184 |
185 /** buffered changes: to be dispatched to clients **/ |
|
186 |
|
187 private case class Received_Change(assignment: Boolean, commands: List[Command]) |
|
188 |
|
189 private val change_buffer: Consumer_Thread[Received_Change] = |
|
190 { |
|
191 object changed |
|
192 { |
|
193 private var assignment: Boolean = false |
|
194 private var nodes: Set[Document.Node.Name] = Set.empty |
|
195 private var commands: Set[Command] = Set.empty |
|
196 |
|
197 def flush(): Unit = synchronized { |
|
198 if (assignment || !nodes.isEmpty || !commands.isEmpty) |
|
199 commands_changed.post(Session.Commands_Changed(assignment, nodes, commands)) |
|
200 assignment = false |
|
201 nodes = Set.empty |
|
202 commands = Set.empty |
|
203 } |
|
204 |
|
205 def invoke(change: Received_Change): Unit = synchronized { |
|
206 assignment |= change.assignment |
|
207 for (command <- change.commands) { |
|
208 nodes += command.node_name |
|
209 commands += command |
|
210 } |
|
211 } |
|
212 } |
|
213 |
|
214 val timer = new Timer("change_buffer", true) |
|
215 timer.schedule(new TimerTask { def run = changed.flush() }, output_delay.ms, output_delay.ms) |
|
216 |
|
217 Consumer_Thread.fork[Received_Change]("change_buffer", daemon = true)( |
|
218 consume = { case change => changed.invoke(change); true }, |
|
219 finish = () => { timer.cancel(); changed.flush() } |
|
220 ) |
|
221 } |
|
222 |
|
223 |
|
224 |
|
225 /** pipelined change parsing **/ |
185 /** pipelined change parsing **/ |
226 |
186 |
227 private case class Text_Edits( |
187 private case class Text_Edits( |
228 previous: Future[Document.Version], |
188 previous: Future[Document.Version], |
229 doc_blobs: Document.Blobs, |
189 doc_blobs: Document.Blobs, |
301 private case object Stop |
261 private case object Stop |
302 private case class Cancel_Exec(exec_id: Document_ID.Exec) |
262 private case class Cancel_Exec(exec_id: Document_ID.Exec) |
303 private case class Protocol_Command(name: String, args: List[String]) |
263 private case class Protocol_Command(name: String, args: List[String]) |
304 private case class Messages(msgs: List[Prover.Message]) |
264 private case class Messages(msgs: List[Prover.Message]) |
305 private case class Update_Options(options: Options) |
265 private case class Update_Options(options: Options) |
|
266 |
|
267 |
|
268 /* buffered changes */ |
|
269 |
|
270 private object change_buffer |
|
271 { |
|
272 private var assignment: Boolean = false |
|
273 private var nodes: Set[Document.Node.Name] = Set.empty |
|
274 private var commands: Set[Command] = Set.empty |
|
275 |
|
276 def flush(): Unit = synchronized { |
|
277 if (assignment || !nodes.isEmpty || !commands.isEmpty) |
|
278 commands_changed.post(Session.Commands_Changed(assignment, nodes, commands)) |
|
279 assignment = false |
|
280 nodes = Set.empty |
|
281 commands = Set.empty |
|
282 } |
|
283 |
|
284 def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized { |
|
285 assignment |= assign |
|
286 for (command <- cmds) { |
|
287 nodes += command.node_name |
|
288 commands += command |
|
289 } |
|
290 } |
|
291 |
|
292 private val timer = new Timer("change_buffer", true) |
|
293 timer.schedule(new TimerTask { def run = flush() }, output_delay.ms, output_delay.ms) |
|
294 |
|
295 def shutdown() |
|
296 { |
|
297 timer.cancel() |
|
298 flush() |
|
299 } |
|
300 } |
306 |
301 |
307 |
302 |
308 /* buffered prover messages */ |
303 /* buffered prover messages */ |
309 |
304 |
310 private object receiver |
305 private object receiver |
441 |
436 |
442 def accumulate(state_id: Document_ID.Generic, message: XML.Elem) |
437 def accumulate(state_id: Document_ID.Generic, message: XML.Elem) |
443 { |
438 { |
444 try { |
439 try { |
445 val st = global_state.change_result(_.accumulate(state_id, message)) |
440 val st = global_state.change_result(_.accumulate(state_id, message)) |
446 change_buffer.send(Received_Change(false, List(st.command))) |
441 change_buffer.invoke(false, List(st.command)) |
447 } |
442 } |
448 catch { |
443 catch { |
449 case _: Document.State.Fail => bad_output() |
444 case _: Document.State.Fail => bad_output() |
450 } |
445 } |
451 } |
446 } |
465 case Markup.Assign_Update => |
460 case Markup.Assign_Update => |
466 msg.text match { |
461 msg.text match { |
467 case Protocol.Assign_Update(id, update) => |
462 case Protocol.Assign_Update(id, update) => |
468 try { |
463 try { |
469 val cmds = global_state.change_result(_.assign(id, update)) |
464 val cmds = global_state.change_result(_.assign(id, update)) |
470 change_buffer.send(Received_Change(true, cmds)) |
465 change_buffer.invoke(true, cmds) |
471 } |
466 } |
472 catch { case _: Document.State.Fail => bad_output() } |
467 catch { case _: Document.State.Fail => bad_output() } |
473 postponed_changes.flush() |
468 postponed_changes.flush() |
474 case _ => bad_output() |
469 case _ => bad_output() |
475 } |
470 } |