less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace;
propagate flush from ML to Scala via special protocol message;
more formal type Message_Channel.message;
--- a/src/Pure/PIDE/markup.ML Tue Jul 30 19:53:06 2013 +0200
+++ b/src/Pure/PIDE/markup.ML Tue Jul 30 21:22:37 2013 +0200
@@ -142,6 +142,7 @@
val padding_command: Properties.entry
val dialogN: string val dialog: serial -> string -> T
val functionN: string
+ val flush: Properties.T
val assign_update: Properties.T
val removed_versions: Properties.T
val protocol_handler: string -> Properties.T
@@ -463,6 +464,8 @@
val functionN = "function"
+val flush = [(functionN, "flush")];
+
val assign_update = [(functionN, "assign_update")];
val removed_versions = [(functionN, "removed_versions")];
--- a/src/Pure/PIDE/markup.scala Tue Jul 30 19:53:06 2013 +0200
+++ b/src/Pure/PIDE/markup.scala Tue Jul 30 21:22:37 2013 +0200
@@ -315,6 +315,8 @@
val FUNCTION = "function"
val Function = new Properties.String(FUNCTION)
+ val Flush: Properties.T = List((FUNCTION, "flush"))
+
val Assign_Update: Properties.T = List((FUNCTION, "assign_update"))
val Removed_Versions: Properties.T = List((FUNCTION, "removed_versions"))
--- a/src/Pure/System/isabelle_process.ML Tue Jul 30 19:53:06 2013 +0200
+++ b/src/Pure/System/isabelle_process.ML Tue Jul 30 21:22:37 2013 +0200
@@ -33,7 +33,8 @@
local
val commands =
- Synchronized.var "Isabelle_Process.commands" (Symtab.empty: (string list -> unit) Symtab.table);
+ Synchronized.var "Isabelle_Process.commands"
+ (Symtab.empty: (string list -> unit) Symtab.table);
in
@@ -90,20 +91,7 @@
end);
-(* message channels *)
-
-local
-
-fun chunk s = [string_of_int (size s), "\n", s];
-
-fun message do_flush msg_channel name raw_props body =
- let
- val robust_props = map (pairself YXML.embed_controls) raw_props;
- val header = YXML.string_of (XML.Elem ((name, robust_props), []));
- val msg = chunk header @ chunk body;
- in Message_Channel.send msg_channel msg do_flush end;
-
-in
+(* output channels *)
fun init_channels channel =
let
@@ -112,10 +100,13 @@
val msg_channel = Message_Channel.make channel;
+ fun message name props body =
+ Message_Channel.send msg_channel (Message_Channel.message name props body);
+
fun standard_message opt_serial name body =
if body = "" then ()
else
- message false msg_channel name
+ message name
((case opt_serial of SOME i => cons (Markup.serialN, Markup.print_int i) | _ => I)
(Position.properties_of (Position.thread_data ()))) body;
in
@@ -130,15 +121,13 @@
(fn s => standard_message (SOME (serial ())) Markup.warningN s);
Output.Private_Hooks.error_fn :=
(fn (i, s) => standard_message (SOME i) Markup.errorN s);
- Output.Private_Hooks.protocol_message_fn := message true msg_channel Markup.protocolN;
+ Output.Private_Hooks.protocol_message_fn := message Markup.protocolN;
Output.Private_Hooks.urgent_message_fn := ! Output.Private_Hooks.writeln_fn;
Output.Private_Hooks.prompt_fn := ignore;
- message true msg_channel Markup.initN [] (Session.welcome ());
+ message Markup.initN [] (Session.welcome ());
msg_channel
end;
-end;
-
(* protocol loop -- uninterruptible *)
--- a/src/Pure/System/message_channel.ML Tue Jul 30 19:53:06 2013 +0200
+++ b/src/Pure/System/message_channel.ML Tue Jul 30 21:22:37 2013 +0200
@@ -6,8 +6,10 @@
signature MESSAGE_CHANNEL =
sig
+ type message
+ val message: string -> Properties.T -> string -> message
type T
- val send: T -> string list -> bool -> unit
+ val send: T -> message -> unit
val shutdown: T -> unit
val make: System_Channel.T -> T
end;
@@ -15,25 +17,46 @@
structure Message_Channel: MESSAGE_CHANNEL =
struct
-datatype T = Message_Channel of
- {send: string list -> bool -> unit,
- shutdown: unit -> unit};
+(* message *)
+
+datatype message = Message of string list;
+
+fun chunk s = [string_of_int (size s), "\n", s];
+
+fun message name raw_props body =
+ let
+ val robust_props = map (pairself YXML.embed_controls) raw_props;
+ val header = YXML.string_of (XML.Elem ((name, robust_props), []));
+ in Message (chunk header @ chunk body) end;
+
+fun output_message channel (Message ss) =
+ List.app (System_Channel.output channel) ss;
+
+
+(* flush *)
+
+fun flush channel = ignore (try System_Channel.flush channel);
+
+val flush_message = message Markup.protocolN Markup.flush "";
+fun flush_output channel = (output_message channel flush_message; flush channel);
+
+
+(* channel *)
+
+datatype T = Message_Channel of {send: message -> unit, shutdown: unit -> unit};
fun send (Message_Channel {send, ...}) = send;
fun shutdown (Message_Channel {shutdown, ...}) = shutdown ();
-fun flush channel = ignore (try System_Channel.flush channel);
-
fun message_output mbox channel =
let
fun loop receive =
(case receive mbox of
- SOME NONE => flush channel
- | SOME (SOME (msg, do_flush)) =>
- (List.app (fn s => System_Channel.output channel s) msg;
- if do_flush then flush channel else ();
+ SOME NONE => flush_output channel
+ | SOME (SOME msg) =>
+ (output_message channel msg;
loop (Mailbox.receive_timeout (seconds 0.02)))
- | NONE => (flush channel; loop (SOME o Mailbox.receive)));
+ | NONE => (flush_output channel; loop (SOME o Mailbox.receive)));
in fn () => loop (SOME o Mailbox.receive) end;
fun make channel =
@@ -41,14 +64,14 @@
let
val mbox = Mailbox.create ();
val thread = Simple_Thread.fork false (message_output mbox channel);
- fun send msg do_flush = Mailbox.send mbox (SOME (msg, do_flush));
+ fun send msg = Mailbox.send mbox (SOME msg);
fun shutdown () =
(Mailbox.send mbox NONE; Mailbox.await_empty mbox; Simple_Thread.join thread);
in Message_Channel {send = send, shutdown = shutdown} end
else
let
- fun send msg = (List.app (fn s => System_Channel.output channel s) msg; flush channel);
- in Message_Channel {send = fn msg => fn _ => send msg, shutdown = fn () => ()} end;
+ fun send msg = (output_message channel msg; flush channel);
+ in Message_Channel {send = send, shutdown = fn () => ()} end;
end;
--- a/src/Pure/System/session.scala Tue Jul 30 19:53:06 2013 +0200
+++ b/src/Pure/System/session.scala Tue Jul 30 21:22:37 2013 +0200
@@ -119,7 +119,7 @@
/* tuning parameters */
def output_delay: Time = Time.seconds(0.1) // prover output (markup, common messages)
- def message_delay: Time = Time.seconds(0.01) // incoming prover messages
+ def message_delay: Time = Time.seconds(0.1) // prover input/output messages
def prune_delay: Time = Time.seconds(60.0) // prune history -- delete old versions
def prune_size: Int = 0 // size of retained history
def syslog_limit: Int = 100
@@ -260,7 +260,7 @@
{
private var buffer = new mutable.ListBuffer[Isabelle_Process.Message]
- def flush(): Unit = synchronized {
+ private def flush(): Unit = synchronized {
if (!buffer.isEmpty) {
val msgs = buffer.toList
this_actor ! Messages(msgs)
@@ -268,17 +268,20 @@
}
}
def invoke(msg: Isabelle_Process.Message): Unit = synchronized {
- buffer += msg
msg match {
+ case _: Isabelle_Process.Input =>
+ buffer += msg
case output: Isabelle_Process.Output =>
- if (output.is_syslog)
- syslog >> (queue =>
- {
- val queue1 = queue.enqueue(output.message)
- if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
- })
- if (output.is_protocol) flush()
- case _ =>
+ if (output.is_protocol && output.properties == Markup.Flush) flush()
+ else {
+ buffer += msg
+ if (output.is_syslog)
+ syslog >> (queue =>
+ {
+ val queue1 = queue.enqueue(output.message)
+ if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
+ })
+ }
}
}