# HG changeset patch # User wenzelm # Date 1375212157 -7200 # Node ID 1baa5d19ac44580184b96b5a22d6e45ae8565f19 # Parent 6a4498b048b7a51f2e286e8df18d3719f075559c less aggressive flushing: cope with massive amounts of protocol messages, e.g. from threads_trace; propagate flush from ML to Scala via special protocol message; more formal type Message_Channel.message; diff -r 6a4498b048b7 -r 1baa5d19ac44 src/Pure/PIDE/markup.ML --- a/src/Pure/PIDE/markup.ML Tue Jul 30 19:53:06 2013 +0200 +++ b/src/Pure/PIDE/markup.ML Tue Jul 30 21:22:37 2013 +0200 @@ -142,6 +142,7 @@ val padding_command: Properties.entry val dialogN: string val dialog: serial -> string -> T val functionN: string + val flush: Properties.T val assign_update: Properties.T val removed_versions: Properties.T val protocol_handler: string -> Properties.T @@ -463,6 +464,8 @@ val functionN = "function" +val flush = [(functionN, "flush")]; + val assign_update = [(functionN, "assign_update")]; val removed_versions = [(functionN, "removed_versions")]; diff -r 6a4498b048b7 -r 1baa5d19ac44 src/Pure/PIDE/markup.scala --- a/src/Pure/PIDE/markup.scala Tue Jul 30 19:53:06 2013 +0200 +++ b/src/Pure/PIDE/markup.scala Tue Jul 30 21:22:37 2013 +0200 @@ -315,6 +315,8 @@ val FUNCTION = "function" val Function = new Properties.String(FUNCTION) + val Flush: Properties.T = List((FUNCTION, "flush")) + val Assign_Update: Properties.T = List((FUNCTION, "assign_update")) val Removed_Versions: Properties.T = List((FUNCTION, "removed_versions")) diff -r 6a4498b048b7 -r 1baa5d19ac44 src/Pure/System/isabelle_process.ML --- a/src/Pure/System/isabelle_process.ML Tue Jul 30 19:53:06 2013 +0200 +++ b/src/Pure/System/isabelle_process.ML Tue Jul 30 21:22:37 2013 +0200 @@ -33,7 +33,8 @@ local val commands = - Synchronized.var "Isabelle_Process.commands" (Symtab.empty: (string list -> unit) Symtab.table); + Synchronized.var "Isabelle_Process.commands" + (Symtab.empty: (string list -> unit) Symtab.table); in @@ -90,20 +91,7 @@ end); -(* message channels *) - -local - -fun chunk s = [string_of_int (size s), "\n", s]; - -fun message do_flush msg_channel name raw_props body = - let - val robust_props = map (pairself YXML.embed_controls) raw_props; - val header = YXML.string_of (XML.Elem ((name, robust_props), [])); - val msg = chunk header @ chunk body; - in Message_Channel.send msg_channel msg do_flush end; - -in +(* output channels *) fun init_channels channel = let @@ -112,10 +100,13 @@ val msg_channel = Message_Channel.make channel; + fun message name props body = + Message_Channel.send msg_channel (Message_Channel.message name props body); + fun standard_message opt_serial name body = if body = "" then () else - message false msg_channel name + message name ((case opt_serial of SOME i => cons (Markup.serialN, Markup.print_int i) | _ => I) (Position.properties_of (Position.thread_data ()))) body; in @@ -130,15 +121,13 @@ (fn s => standard_message (SOME (serial ())) Markup.warningN s); Output.Private_Hooks.error_fn := (fn (i, s) => standard_message (SOME i) Markup.errorN s); - Output.Private_Hooks.protocol_message_fn := message true msg_channel Markup.protocolN; + Output.Private_Hooks.protocol_message_fn := message Markup.protocolN; Output.Private_Hooks.urgent_message_fn := ! Output.Private_Hooks.writeln_fn; Output.Private_Hooks.prompt_fn := ignore; - message true msg_channel Markup.initN [] (Session.welcome ()); + message Markup.initN [] (Session.welcome ()); msg_channel end; -end; - (* protocol loop -- uninterruptible *) diff -r 6a4498b048b7 -r 1baa5d19ac44 src/Pure/System/message_channel.ML --- a/src/Pure/System/message_channel.ML Tue Jul 30 19:53:06 2013 +0200 +++ b/src/Pure/System/message_channel.ML Tue Jul 30 21:22:37 2013 +0200 @@ -6,8 +6,10 @@ signature MESSAGE_CHANNEL = sig + type message + val message: string -> Properties.T -> string -> message type T - val send: T -> string list -> bool -> unit + val send: T -> message -> unit val shutdown: T -> unit val make: System_Channel.T -> T end; @@ -15,25 +17,46 @@ structure Message_Channel: MESSAGE_CHANNEL = struct -datatype T = Message_Channel of - {send: string list -> bool -> unit, - shutdown: unit -> unit}; +(* message *) + +datatype message = Message of string list; + +fun chunk s = [string_of_int (size s), "\n", s]; + +fun message name raw_props body = + let + val robust_props = map (pairself YXML.embed_controls) raw_props; + val header = YXML.string_of (XML.Elem ((name, robust_props), [])); + in Message (chunk header @ chunk body) end; + +fun output_message channel (Message ss) = + List.app (System_Channel.output channel) ss; + + +(* flush *) + +fun flush channel = ignore (try System_Channel.flush channel); + +val flush_message = message Markup.protocolN Markup.flush ""; +fun flush_output channel = (output_message channel flush_message; flush channel); + + +(* channel *) + +datatype T = Message_Channel of {send: message -> unit, shutdown: unit -> unit}; fun send (Message_Channel {send, ...}) = send; fun shutdown (Message_Channel {shutdown, ...}) = shutdown (); -fun flush channel = ignore (try System_Channel.flush channel); - fun message_output mbox channel = let fun loop receive = (case receive mbox of - SOME NONE => flush channel - | SOME (SOME (msg, do_flush)) => - (List.app (fn s => System_Channel.output channel s) msg; - if do_flush then flush channel else (); + SOME NONE => flush_output channel + | SOME (SOME msg) => + (output_message channel msg; loop (Mailbox.receive_timeout (seconds 0.02))) - | NONE => (flush channel; loop (SOME o Mailbox.receive))); + | NONE => (flush_output channel; loop (SOME o Mailbox.receive))); in fn () => loop (SOME o Mailbox.receive) end; fun make channel = @@ -41,14 +64,14 @@ let val mbox = Mailbox.create (); val thread = Simple_Thread.fork false (message_output mbox channel); - fun send msg do_flush = Mailbox.send mbox (SOME (msg, do_flush)); + fun send msg = Mailbox.send mbox (SOME msg); fun shutdown () = (Mailbox.send mbox NONE; Mailbox.await_empty mbox; Simple_Thread.join thread); in Message_Channel {send = send, shutdown = shutdown} end else let - fun send msg = (List.app (fn s => System_Channel.output channel s) msg; flush channel); - in Message_Channel {send = fn msg => fn _ => send msg, shutdown = fn () => ()} end; + fun send msg = (output_message channel msg; flush channel); + in Message_Channel {send = send, shutdown = fn () => ()} end; end; diff -r 6a4498b048b7 -r 1baa5d19ac44 src/Pure/System/session.scala --- a/src/Pure/System/session.scala Tue Jul 30 19:53:06 2013 +0200 +++ b/src/Pure/System/session.scala Tue Jul 30 21:22:37 2013 +0200 @@ -119,7 +119,7 @@ /* tuning parameters */ def output_delay: Time = Time.seconds(0.1) // prover output (markup, common messages) - def message_delay: Time = Time.seconds(0.01) // incoming prover messages + def message_delay: Time = Time.seconds(0.1) // prover input/output messages def prune_delay: Time = Time.seconds(60.0) // prune history -- delete old versions def prune_size: Int = 0 // size of retained history def syslog_limit: Int = 100 @@ -260,7 +260,7 @@ { private var buffer = new mutable.ListBuffer[Isabelle_Process.Message] - def flush(): Unit = synchronized { + private def flush(): Unit = synchronized { if (!buffer.isEmpty) { val msgs = buffer.toList this_actor ! Messages(msgs) @@ -268,17 +268,20 @@ } } def invoke(msg: Isabelle_Process.Message): Unit = synchronized { - buffer += msg msg match { + case _: Isabelle_Process.Input => + buffer += msg case output: Isabelle_Process.Output => - if (output.is_syslog) - syslog >> (queue => - { - val queue1 = queue.enqueue(output.message) - if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1 - }) - if (output.is_protocol) flush() - case _ => + if (output.is_protocol && output.properties == Markup.Flush) flush() + else { + buffer += msg + if (output.is_syslog) + syslog >> (queue => + { + val queue1 = queue.enqueue(output.message) + if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1 + }) + } } }