less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace;
authorwenzelm
Tue, 30 Jul 2013 21:22:37 +0200
changeset 52800 1baa5d19ac44
parent 52799 6a4498b048b7
child 52801 6f88e379aa3e
less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace; propagate flush from ML to Scala via special protocol message; more formal type Message_Channel.message;
src/Pure/PIDE/markup.ML
src/Pure/PIDE/markup.scala
src/Pure/System/isabelle_process.ML
src/Pure/System/message_channel.ML
src/Pure/System/session.scala
--- a/src/Pure/PIDE/markup.ML	Tue Jul 30 19:53:06 2013 +0200
+++ b/src/Pure/PIDE/markup.ML	Tue Jul 30 21:22:37 2013 +0200
@@ -142,6 +142,7 @@
   val padding_command: Properties.entry
   val dialogN: string val dialog: serial -> string -> T
   val functionN: string
+  val flush: Properties.T
   val assign_update: Properties.T
   val removed_versions: Properties.T
   val protocol_handler: string -> Properties.T
@@ -463,6 +464,8 @@
 
 val functionN = "function"
 
+val flush = [(functionN, "flush")];
+
 val assign_update = [(functionN, "assign_update")];
 val removed_versions = [(functionN, "removed_versions")];
 
--- a/src/Pure/PIDE/markup.scala	Tue Jul 30 19:53:06 2013 +0200
+++ b/src/Pure/PIDE/markup.scala	Tue Jul 30 21:22:37 2013 +0200
@@ -315,6 +315,8 @@
   val FUNCTION = "function"
   val Function = new Properties.String(FUNCTION)
 
+  val Flush: Properties.T = List((FUNCTION, "flush"))
+
   val Assign_Update: Properties.T = List((FUNCTION, "assign_update"))
   val Removed_Versions: Properties.T = List((FUNCTION, "removed_versions"))
 
--- a/src/Pure/System/isabelle_process.ML	Tue Jul 30 19:53:06 2013 +0200
+++ b/src/Pure/System/isabelle_process.ML	Tue Jul 30 21:22:37 2013 +0200
@@ -33,7 +33,8 @@
 local
 
 val commands =
-  Synchronized.var "Isabelle_Process.commands" (Symtab.empty: (string list -> unit) Symtab.table);
+  Synchronized.var "Isabelle_Process.commands"
+    (Symtab.empty: (string list -> unit) Symtab.table);
 
 in
 
@@ -90,20 +91,7 @@
       end);
 
 
-(* message channels *)
-
-local
-
-fun chunk s = [string_of_int (size s), "\n", s];
-
-fun message do_flush msg_channel name raw_props body =
-  let
-    val robust_props = map (pairself YXML.embed_controls) raw_props;
-    val header = YXML.string_of (XML.Elem ((name, robust_props), []));
-    val msg = chunk header @ chunk body;
-  in Message_Channel.send msg_channel msg do_flush end;
-
-in
+(* output channels *)
 
 fun init_channels channel =
   let
@@ -112,10 +100,13 @@
 
     val msg_channel = Message_Channel.make channel;
 
+    fun message name props body =
+      Message_Channel.send msg_channel (Message_Channel.message name props body);
+
     fun standard_message opt_serial name body =
       if body = "" then ()
       else
-        message false msg_channel name
+        message name
           ((case opt_serial of SOME i => cons (Markup.serialN, Markup.print_int i) | _ => I)
             (Position.properties_of (Position.thread_data ()))) body;
   in
@@ -130,15 +121,13 @@
       (fn s => standard_message (SOME (serial ())) Markup.warningN s);
     Output.Private_Hooks.error_fn :=
       (fn (i, s) => standard_message (SOME i) Markup.errorN s);
-    Output.Private_Hooks.protocol_message_fn := message true msg_channel Markup.protocolN;
+    Output.Private_Hooks.protocol_message_fn := message Markup.protocolN;
     Output.Private_Hooks.urgent_message_fn := ! Output.Private_Hooks.writeln_fn;
     Output.Private_Hooks.prompt_fn := ignore;
-    message true msg_channel Markup.initN [] (Session.welcome ());
+    message Markup.initN [] (Session.welcome ());
     msg_channel
   end;
 
-end;
-
 
 (* protocol loop -- uninterruptible *)
 
--- a/src/Pure/System/message_channel.ML	Tue Jul 30 19:53:06 2013 +0200
+++ b/src/Pure/System/message_channel.ML	Tue Jul 30 21:22:37 2013 +0200
@@ -6,8 +6,10 @@
 
 signature MESSAGE_CHANNEL =
 sig
+  type message
+  val message: string -> Properties.T -> string -> message
   type T
-  val send: T -> string list -> bool -> unit
+  val send: T -> message -> unit
   val shutdown: T -> unit
   val make: System_Channel.T -> T
 end;
@@ -15,25 +17,46 @@
 structure Message_Channel: MESSAGE_CHANNEL =
 struct
 
-datatype T = Message_Channel of
- {send: string list -> bool -> unit,
-  shutdown: unit -> unit};
+(* message *)
+
+datatype message = Message of string list;
+
+fun chunk s = [string_of_int (size s), "\n", s];
+
+fun message name raw_props body =
+  let
+    val robust_props = map (pairself YXML.embed_controls) raw_props;
+    val header = YXML.string_of (XML.Elem ((name, robust_props), []));
+  in Message (chunk header @ chunk body) end;
+
+fun output_message channel (Message ss) =
+  List.app (System_Channel.output channel) ss;
+
+
+(* flush *)
+
+fun flush channel = ignore (try System_Channel.flush channel);
+
+val flush_message = message Markup.protocolN Markup.flush "";
+fun flush_output channel = (output_message channel flush_message; flush channel);
+
+
+(* channel *)
+
+datatype T = Message_Channel of {send: message -> unit, shutdown: unit -> unit};
 
 fun send (Message_Channel {send, ...}) = send;
 fun shutdown (Message_Channel {shutdown, ...}) = shutdown ();
 
-fun flush channel = ignore (try System_Channel.flush channel);
-
 fun message_output mbox channel =
   let
     fun loop receive =
       (case receive mbox of
-        SOME NONE => flush channel
-      | SOME (SOME (msg, do_flush)) =>
-         (List.app (fn s => System_Channel.output channel s) msg;
-          if do_flush then flush channel else ();
+        SOME NONE => flush_output channel
+      | SOME (SOME msg) =>
+         (output_message channel msg;
           loop (Mailbox.receive_timeout (seconds 0.02)))
-      | NONE => (flush channel; loop (SOME o Mailbox.receive)));
+      | NONE => (flush_output channel; loop (SOME o Mailbox.receive)));
   in fn () => loop (SOME o Mailbox.receive) end;
 
 fun make channel =
@@ -41,14 +64,14 @@
     let
       val mbox = Mailbox.create ();
       val thread = Simple_Thread.fork false (message_output mbox channel);
-      fun send msg do_flush = Mailbox.send mbox (SOME (msg, do_flush));
+      fun send msg = Mailbox.send mbox (SOME msg);
       fun shutdown () =
         (Mailbox.send mbox NONE; Mailbox.await_empty mbox; Simple_Thread.join thread);
     in Message_Channel {send = send, shutdown = shutdown} end
   else
     let
-      fun send msg = (List.app (fn s => System_Channel.output channel s) msg; flush channel);
-    in Message_Channel {send = fn msg => fn _ => send msg, shutdown = fn () => ()} end;
+      fun send msg = (output_message channel msg; flush channel);
+    in Message_Channel {send = send, shutdown = fn () => ()} end;
 
 end;
 
--- a/src/Pure/System/session.scala	Tue Jul 30 19:53:06 2013 +0200
+++ b/src/Pure/System/session.scala	Tue Jul 30 21:22:37 2013 +0200
@@ -119,7 +119,7 @@
   /* tuning parameters */
 
   def output_delay: Time = Time.seconds(0.1)  // prover output (markup, common messages)
-  def message_delay: Time = Time.seconds(0.01)  // incoming prover messages
+  def message_delay: Time = Time.seconds(0.1)  // prover input/output messages
   def prune_delay: Time = Time.seconds(60.0)  // prune history -- delete old versions
   def prune_size: Int = 0  // size of retained history
   def syslog_limit: Int = 100
@@ -260,7 +260,7 @@
     {
       private var buffer = new mutable.ListBuffer[Isabelle_Process.Message]
 
-      def flush(): Unit = synchronized {
+      private def flush(): Unit = synchronized {
         if (!buffer.isEmpty) {
           val msgs = buffer.toList
           this_actor ! Messages(msgs)
@@ -268,17 +268,20 @@
         }
       }
       def invoke(msg: Isabelle_Process.Message): Unit = synchronized {
-        buffer += msg
         msg match {
+          case _: Isabelle_Process.Input =>
+            buffer += msg
           case output: Isabelle_Process.Output =>
-            if (output.is_syslog)
-              syslog >> (queue =>
-                {
-                  val queue1 = queue.enqueue(output.message)
-                  if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
-                })
-            if (output.is_protocol) flush()
-          case _ =>
+            if (output.is_protocol && output.properties == Markup.Flush) flush()
+            else {
+              buffer += msg
+              if (output.is_syslog)
+                syslog >> (queue =>
+                  {
+                    val queue1 = queue.enqueue(output.message)
+                    if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
+                  })
+            }
         }
       }