14 import scala.collection.immutable.Queue |
14 import scala.collection.immutable.Queue |
15 |
15 |
16 |
16 |
17 object Session |
17 object Session |
18 { |
18 { |
|
19 /* outlets */ |
|
20 |
|
21 object Consumer |
|
22 { |
|
23 def apply[A](name: String)(consume: A => Unit): Consumer[A] = |
|
24 new Consumer[A](name, consume) |
|
25 } |
|
26 final class Consumer[-A] private(val name: String, val consume: A => Unit) |
|
27 |
|
28 class Outlet[A](dispatcher: Consumer_Thread[() => Unit]) |
|
29 { |
|
30 private val consumers = Synchronized(List.empty[Consumer[A]]) |
|
31 |
|
32 def += (c: Consumer[A]) { consumers.change(Library.update(c)) } |
|
33 def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) } |
|
34 |
|
35 def post(a: A) |
|
36 { |
|
37 for (c <- consumers.value.iterator) { |
|
38 dispatcher.send(() => |
|
39 try { c.consume(a) } |
|
40 catch { |
|
41 case exn: Throwable => |
|
42 System.err.println("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn)) |
|
43 }) |
|
44 } |
|
45 } |
|
46 } |
|
47 |
|
48 |
19 /* change */ |
49 /* change */ |
20 |
50 |
21 sealed case class Change( |
51 sealed case class Change( |
22 previous: Document.Version, |
52 previous: Document.Version, |
23 doc_blobs: Document.Blobs, |
53 doc_blobs: Document.Blobs, |
132 def prune_size: Int = 0 // size of retained history |
162 def prune_size: Int = 0 // size of retained history |
133 def syslog_limit: Int = 100 |
163 def syslog_limit: Int = 100 |
134 def reparse_limit: Int = 0 |
164 def reparse_limit: Int = 0 |
135 |
165 |
136 |
166 |
137 /* pervasive event buses */ |
167 /* outlets */ |
138 |
168 |
139 val statistics = new Event_Bus[Session.Statistics] |
169 private val dispatcher = |
140 val global_options = new Event_Bus[Session.Global_Options] |
170 Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true } |
141 val caret_focus = new Event_Bus[Session.Caret_Focus.type] |
171 |
142 val raw_edits = new Event_Bus[Session.Raw_Edits] |
172 val statistics = new Session.Outlet[Session.Statistics](dispatcher) |
143 val commands_changed = new Event_Bus[Session.Commands_Changed] |
173 val global_options = new Session.Outlet[Session.Global_Options](dispatcher) |
144 val phase_changed = new Event_Bus[Session.Phase] |
174 val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher) |
145 val syslog_messages = new Event_Bus[Prover.Output] |
175 val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher) |
146 val raw_output_messages = new Event_Bus[Prover.Output] |
176 val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher) |
147 val all_messages = new Event_Bus[Prover.Message] // potential bottle-neck |
177 val phase_changed = new Session.Outlet[Session.Phase](dispatcher) |
148 val trace_events = new Event_Bus[Simplifier_Trace.Event.type] |
178 val syslog_messages = new Session.Outlet[Prover.Output](dispatcher) |
|
179 val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher) |
|
180 val all_messages = new Session.Outlet[Prover.Message](dispatcher) // potential bottle-neck |
|
181 val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher) |
149 |
182 |
150 |
183 |
151 |
184 |
152 /** buffered changes: to be dispatched to clients **/ |
185 /** buffered changes: to be dispatched to clients **/ |
153 |
186 |
161 private var nodes: Set[Document.Node.Name] = Set.empty |
194 private var nodes: Set[Document.Node.Name] = Set.empty |
162 private var commands: Set[Command] = Set.empty |
195 private var commands: Set[Command] = Set.empty |
163 |
196 |
164 def flush(): Unit = synchronized { |
197 def flush(): Unit = synchronized { |
165 if (assignment || !nodes.isEmpty || !commands.isEmpty) |
198 if (assignment || !nodes.isEmpty || !commands.isEmpty) |
166 commands_changed.event(Session.Commands_Changed(assignment, nodes, commands)) |
199 commands_changed.post(Session.Commands_Changed(assignment, nodes, commands)) |
167 assignment = false |
200 assignment = false |
168 nodes = Set.empty |
201 nodes = Set.empty |
169 commands = Set.empty |
202 commands = Set.empty |
170 } |
203 } |
171 |
204 |
221 |
254 |
222 @volatile private var _phase: Session.Phase = Session.Inactive |
255 @volatile private var _phase: Session.Phase = Session.Inactive |
223 private def phase_=(new_phase: Session.Phase) |
256 private def phase_=(new_phase: Session.Phase) |
224 { |
257 { |
225 _phase = new_phase |
258 _phase = new_phase |
226 phase_changed.event(new_phase) |
259 phase_changed.post(new_phase) |
227 } |
260 } |
228 def phase = _phase |
261 def phase = _phase |
229 def is_ready: Boolean = phase == Session.Ready |
262 def is_ready: Boolean = phase == Session.Ready |
230 |
263 |
231 private val global_state = Synchronized(Document.State.init) |
264 private val global_state = Synchronized(Document.State.init) |
347 |
380 |
348 val previous = global_state.value.history.tip.version |
381 val previous = global_state.value.history.tip.version |
349 val version = Future.promise[Document.Version] |
382 val version = Future.promise[Document.Version] |
350 global_state.change(_.continue_history(previous, edits, version)) |
383 global_state.change(_.continue_history(previous, edits, version)) |
351 |
384 |
352 raw_edits.event(Session.Raw_Edits(doc_blobs, edits)) |
385 raw_edits.post(Session.Raw_Edits(doc_blobs, edits)) |
353 change_parser.send(Text_Edits(previous, doc_blobs, edits, version)) |
386 change_parser.send(Text_Edits(previous, doc_blobs, edits, version)) |
354 } |
387 } |
355 //}}} |
388 //}}} |
356 |
389 |
357 |
390 |
456 catch { case _: Document.State.Fail => bad_output() } |
489 catch { case _: Document.State.Fail => bad_output() } |
457 case _ => bad_output() |
490 case _ => bad_output() |
458 } |
491 } |
459 |
492 |
460 case Markup.ML_Statistics(props) => |
493 case Markup.ML_Statistics(props) => |
461 statistics.event(Session.Statistics(props)) |
494 statistics.post(Session.Statistics(props)) |
462 |
495 |
463 case Markup.Task_Statistics(props) => |
496 case Markup.Task_Statistics(props) => |
464 // FIXME |
497 // FIXME |
465 |
498 |
466 case _ => bad_output() |
499 case _ => bad_output() |
510 case Update_Options(options) => |
543 case Update_Options(options) => |
511 if (prover.isDefined && is_ready) { |
544 if (prover.isDefined && is_ready) { |
512 prover.get.options(options) |
545 prover.get.options(options) |
513 handle_raw_edits(Document.Blobs.empty, Nil) |
546 handle_raw_edits(Document.Blobs.empty, Nil) |
514 } |
547 } |
515 global_options.event(Session.Global_Options(options)) |
548 global_options.post(Session.Global_Options(options)) |
516 |
549 |
517 case Cancel_Exec(exec_id) if prover.isDefined => |
550 case Cancel_Exec(exec_id) if prover.isDefined => |
518 prover.get.cancel_exec(exec_id) |
551 prover.get.cancel_exec(exec_id) |
519 |
552 |
520 case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined => |
553 case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined => |
528 prover.get.protocol_command(name, args:_*) |
561 prover.get.protocol_command(name, args:_*) |
529 |
562 |
530 case Messages(msgs) => |
563 case Messages(msgs) => |
531 msgs foreach { |
564 msgs foreach { |
532 case input: Prover.Input => |
565 case input: Prover.Input => |
533 all_messages.event(input) |
566 all_messages.post(input) |
534 |
567 |
535 case output: Prover.Output => |
568 case output: Prover.Output => |
536 if (output.is_stdout || output.is_stderr) raw_output_messages.event(output) |
569 if (output.is_stdout || output.is_stderr) raw_output_messages.post(output) |
537 else handle_output(output) |
570 else handle_output(output) |
538 if (output.is_syslog) syslog_messages.event(output) |
571 if (output.is_syslog) syslog_messages.post(output) |
539 all_messages.event(output) |
572 all_messages.post(output) |
540 } |
573 } |
541 |
574 |
542 case change: Session.Change if prover.isDefined => |
575 case change: Session.Change if prover.isDefined => |
543 if (global_state.value.is_assigned(change.previous)) |
576 if (global_state.value.is_assigned(change.previous)) |
544 handle_change(change) |
577 handle_change(change) |
560 manager.send_wait(Stop) |
593 manager.send_wait(Stop) |
561 receiver.shutdown() |
594 receiver.shutdown() |
562 change_parser.shutdown() |
595 change_parser.shutdown() |
563 change_buffer.shutdown() |
596 change_buffer.shutdown() |
564 manager.shutdown() |
597 manager.shutdown() |
|
598 dispatcher.shutdown() |
565 } |
599 } |
566 |
600 |
567 def protocol_command(name: String, args: String*) |
601 def protocol_command(name: String, args: String*) |
568 { manager.send(Protocol_Command(name, args.toList)) } |
602 { manager.send(Protocol_Command(name, args.toList)) } |
569 |
603 |