| author | wenzelm |
| Sat, 27 Feb 2021 19:45:33 +0100 | |
| changeset 73319 | a7d9edd2e63b |
| parent 73262 | 71b7a5775342 |
| child 73557 | 225486d9c960 |
| permissions | -rw-r--r-- |
| 52584 | 1 |
(* Title: Pure/System/message_channel.ML |
2 |
Author: Makarius |
|
3 |
||
4 |
Preferably asynchronous channel for Isabelle messages. |
|
5 |
*) |
|
6 |
||
7 |
signature MESSAGE_CHANNEL = |
|
8 |
sig |
|
|
52800
1baa5d19ac44
less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace;
wenzelm
parents:
52584
diff
changeset
|
9 |
type message |
|
70995
2c17fa0f5187
more direct output of XML material -- bypass Buffer.T;
wenzelm
parents:
69451
diff
changeset
|
10 |
val message: string -> Properties.T -> XML.body -> message |
| 52584 | 11 |
type T |
|
52800
1baa5d19ac44
less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace;
wenzelm
parents:
52584
diff
changeset
|
12 |
val send: T -> message -> unit |
| 52584 | 13 |
val shutdown: T -> unit |
| 69449 | 14 |
val make: BinIO.outstream -> T |
| 52584 | 15 |
end; |
16 |
||
17 |
structure Message_Channel: MESSAGE_CHANNEL = |
|
18 |
struct |
|
19 |
||
|
52800
1baa5d19ac44
less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace;
wenzelm
parents:
52584
diff
changeset
|
20 |
(* message *) |
|
1baa5d19ac44
less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace;
wenzelm
parents:
52584
diff
changeset
|
21 |
|
|
73262
71b7a5775342
more reactive protocol messages, e.g. for Scala.function (relevant for Bash.process);
wenzelm
parents:
71692
diff
changeset
|
22 |
datatype message = Message of {body: XML.body, flush: bool};
|
|
52800
1baa5d19ac44
less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace;
wenzelm
parents:
52584
diff
changeset
|
23 |
|
|
70995
2c17fa0f5187
more direct output of XML material -- bypass Buffer.T;
wenzelm
parents:
69451
diff
changeset
|
24 |
fun body_size body = fold (YXML.traverse (Integer.add o size)) body 0; |
|
2c17fa0f5187
more direct output of XML material -- bypass Buffer.T;
wenzelm
parents:
69451
diff
changeset
|
25 |
|
|
2c17fa0f5187
more direct output of XML material -- bypass Buffer.T;
wenzelm
parents:
69451
diff
changeset
|
26 |
fun chunk body = XML.Text (string_of_int (body_size body) ^ "\n") :: body; |
|
52800
1baa5d19ac44
less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace;
wenzelm
parents:
52584
diff
changeset
|
27 |
|
|
1baa5d19ac44
less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace;
wenzelm
parents:
52584
diff
changeset
|
28 |
fun message name raw_props body = |
|
1baa5d19ac44
less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace;
wenzelm
parents:
52584
diff
changeset
|
29 |
let |
|
59058
a78612c67ec0
renamed "pairself" to "apply2", in accordance to @{apply 2};
wenzelm
parents:
58857
diff
changeset
|
30 |
val robust_props = map (apply2 YXML.embed_controls) raw_props; |
|
70995
2c17fa0f5187
more direct output of XML material -- bypass Buffer.T;
wenzelm
parents:
69451
diff
changeset
|
31 |
val header = XML.Elem ((name, robust_props), []); |
|
73262
71b7a5775342
more reactive protocol messages, e.g. for Scala.function (relevant for Bash.process);
wenzelm
parents:
71692
diff
changeset
|
32 |
in Message {body = chunk [header] @ chunk body, flush = name = Markup.protocolN} end;
|
|
52800
1baa5d19ac44
less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace;
wenzelm
parents:
52584
diff
changeset
|
33 |
|
|
1baa5d19ac44
less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace;
wenzelm
parents:
52584
diff
changeset
|
34 |
|
|
1baa5d19ac44
less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace;
wenzelm
parents:
52584
diff
changeset
|
35 |
(* channel *) |
|
1baa5d19ac44
less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace;
wenzelm
parents:
52584
diff
changeset
|
36 |
|
|
1baa5d19ac44
less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace;
wenzelm
parents:
52584
diff
changeset
|
37 |
datatype T = Message_Channel of {send: message -> unit, shutdown: unit -> unit};
|
| 52584 | 38 |
|
39 |
fun send (Message_Channel {send, ...}) = send;
|
|
40 |
fun shutdown (Message_Channel {shutdown, ...}) = shutdown ();
|
|
41 |
||
|
57417
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents:
56733
diff
changeset
|
42 |
val flush_timeout = SOME (seconds 0.02); |
|
56733
f7700146678d
manager is direct receiver of prover output -- discontinued old performance tuning (329320fc88df, 1baa5d19ac44);
wenzelm
parents:
56333
diff
changeset
|
43 |
|
| 69449 | 44 |
fun message_output mbox stream = |
| 52584 | 45 |
let |
|
57417
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents:
56733
diff
changeset
|
46 |
fun continue timeout = |
|
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents:
56733
diff
changeset
|
47 |
(case Mailbox.receive timeout mbox of |
| 69451 | 48 |
[] => (Byte_Message.flush stream; continue NONE) |
|
57417
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents:
56733
diff
changeset
|
49 |
| msgs => received timeout msgs) |
| 69451 | 50 |
and received _ (NONE :: _) = Byte_Message.flush stream |
|
73262
71b7a5775342
more reactive protocol messages, e.g. for Scala.function (relevant for Bash.process);
wenzelm
parents:
71692
diff
changeset
|
51 |
| received _ (SOME (Message {body, flush}) :: rest) =
|
|
71b7a5775342
more reactive protocol messages, e.g. for Scala.function (relevant for Bash.process);
wenzelm
parents:
71692
diff
changeset
|
52 |
let |
|
71b7a5775342
more reactive protocol messages, e.g. for Scala.function (relevant for Bash.process);
wenzelm
parents:
71692
diff
changeset
|
53 |
val _ = fold (YXML.traverse (fn s => fn () => File.output stream s)) body (); |
|
71b7a5775342
more reactive protocol messages, e.g. for Scala.function (relevant for Bash.process);
wenzelm
parents:
71692
diff
changeset
|
54 |
val timeout = if flush then (Byte_Message.flush stream; NONE) else flush_timeout; |
|
71b7a5775342
more reactive protocol messages, e.g. for Scala.function (relevant for Bash.process);
wenzelm
parents:
71692
diff
changeset
|
55 |
in received timeout rest end |
|
57417
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents:
56733
diff
changeset
|
56 |
| received timeout [] = continue timeout; |
|
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents:
56733
diff
changeset
|
57 |
in fn () => continue NONE end; |
| 52584 | 58 |
|
| 69449 | 59 |
fun make stream = |
| 62359 | 60 |
let |
61 |
val mbox = Mailbox.create (); |
|
62 |
val thread = |
|
| 71692 | 63 |
Isabelle_Thread.fork {name = "channel", stack_limit = NONE, interrupts = false}
|
| 69449 | 64 |
(message_output mbox stream); |
| 62359 | 65 |
fun send msg = Mailbox.send mbox (SOME msg); |
66 |
fun shutdown () = |
|
| 71692 | 67 |
(Mailbox.send mbox NONE; Mailbox.await_empty mbox; Isabelle_Thread.join thread); |
| 62359 | 68 |
in Message_Channel {send = send, shutdown = shutdown} end;
|
| 52584 | 69 |
|
70 |
end; |