explicit message_output thread, with flushing after timeout, ensure atomic user-operations without the danger of IO Interrupt;
eliminated auto_flush threads -- use plain line buffering for stdout/stderr;
tuned;
--- a/src/Pure/System/isabelle_process.ML Mon Oct 25 21:23:09 2010 +0200
+++ b/src/Pure/System/isabelle_process.ML Mon Oct 25 22:47:02 2010 +0200
@@ -57,42 +57,33 @@
end;
-(* message markup *)
+(* message channels *)
local
-fun chunk s = string_of_int (size s) ^ "\n" ^ s;
+fun chunk s = [string_of_int (size s), "\n", s];
fun message _ _ _ "" = ()
- | message out_stream ch raw_props body =
+ | message mbox ch raw_props body =
let
val robust_props = map (pairself YXML.escape_controls) raw_props;
val header = YXML.string_of (XML.Elem ((ch, robust_props), []));
- in TextIO.output (out_stream, chunk header ^ chunk body) (*atomic output!*) end;
+ in Mailbox.send mbox (chunk header @ chunk body) end;
-in
-
-fun standard_message out_stream with_serial ch body =
- message out_stream ch
+fun standard_message mbox with_serial ch body =
+ message mbox ch
((if with_serial then cons (Markup.serialN, serial_string ()) else I)
(Position.properties_of (Position.thread_data ()))) body;
-fun init_message out_stream =
- message out_stream "A" [] (Session.welcome ());
-
-end;
-
-
-(* channels *)
-
-local
-
-fun auto_flush stream =
+fun message_output mbox out_stream =
let
- val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream stream, IO.BLOCK_BUF);
- fun loop () =
- (OS.Process.sleep (Time.fromMilliseconds 20); try TextIO.flushOut stream; loop ());
- in loop end;
+ fun loop receive =
+ (case receive mbox of
+ SOME msg =>
+ (List.app (fn s => TextIO.output (out_stream, s)) msg;
+ loop (Mailbox.receive_timeout (Time.fromMilliseconds 20)))
+ | NONE => (try TextIO.flushOut out_stream; loop (SOME o Mailbox.receive)));
+ in fn () => loop (SOME o Mailbox.receive) end;
in
@@ -102,19 +93,22 @@
val in_stream = TextIO.openIn in_fifo;
val out_stream = TextIO.openOut out_fifo;
- val _ = Simple_Thread.fork false (auto_flush out_stream);
- val _ = Simple_Thread.fork false (auto_flush TextIO.stdOut);
- val _ = Simple_Thread.fork false (auto_flush TextIO.stdErr);
+ val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdOut, IO.LINE_BUF);
+ val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdErr, IO.LINE_BUF);
+
+ val mbox = Mailbox.create () : string list Mailbox.T;
+ val _ = Simple_Thread.fork false (message_output mbox out_stream);
in
- Output.Private_Hooks.status_fn := standard_message out_stream false "B";
- Output.Private_Hooks.report_fn := standard_message out_stream false "C";
- Output.Private_Hooks.writeln_fn := standard_message out_stream true "D";
- Output.Private_Hooks.tracing_fn := standard_message out_stream true "E";
- Output.Private_Hooks.warning_fn := standard_message out_stream true "F";
- Output.Private_Hooks.error_fn := standard_message out_stream true "G";
+ Output.Private_Hooks.status_fn := standard_message mbox false "B";
+ Output.Private_Hooks.report_fn := standard_message mbox false "C";
+ Output.Private_Hooks.writeln_fn := standard_message mbox true "D";
+ Output.Private_Hooks.tracing_fn := standard_message mbox true "E";
+ Output.Private_Hooks.warning_fn := standard_message mbox true "F";
+ Output.Private_Hooks.error_fn := standard_message mbox true "G";
Output.Private_Hooks.urgent_message_fn := ! Output.Private_Hooks.writeln_fn;
Output.Private_Hooks.prompt_fn := ignore;
- (in_stream, out_stream)
+ message mbox "A" [] (Session.welcome ());
+ in_stream
end;
end;
@@ -179,8 +173,7 @@
val _ = Unsynchronized.change print_mode
(fold (update op =) [isabelle_processN, Keyword.keyword_statusN, Pretty.symbolicN]);
- val (in_stream, out_stream) = setup_channels in_fifo out_fifo;
- val _ = init_message out_stream;
+ val in_stream = setup_channels in_fifo out_fifo;
val _ = Keyword.status ();
val _ = Output.status (Markup.markup Markup.ready "process ready");
in loop in_stream end));