explicit message_output thread, with flushing after timeout, ensure atomic user-operations without the danger of IO Interrupt;
authorwenzelm
Mon, 25 Oct 2010 22:47:02 +0200
changeset 40134 8baded087d34
parent 40133 b61d52de66f0
child 40148 8728165d366e
explicit message_output thread, with flushing after timeout, ensure atomic user-operations without the danger of IO Interrupt; eliminated auto_flush threads -- use plain line buffering for stdout/stderr; tuned;
src/Pure/System/isabelle_process.ML
--- a/src/Pure/System/isabelle_process.ML	Mon Oct 25 21:23:09 2010 +0200
+++ b/src/Pure/System/isabelle_process.ML	Mon Oct 25 22:47:02 2010 +0200
@@ -57,42 +57,33 @@
 end;
 
 
-(* message markup *)
+(* message channels *)
 
 local
 
-fun chunk s = string_of_int (size s) ^ "\n" ^ s;
+fun chunk s = [string_of_int (size s), "\n", s];
 
 fun message _ _ _ "" = ()
-  | message out_stream ch raw_props body =
+  | message mbox ch raw_props body =
       let
         val robust_props = map (pairself YXML.escape_controls) raw_props;
         val header = YXML.string_of (XML.Elem ((ch, robust_props), []));
-      in TextIO.output (out_stream, chunk header ^ chunk body) (*atomic output!*) end;
+      in Mailbox.send mbox (chunk header @ chunk body) end;
 
-in
-
-fun standard_message out_stream with_serial ch body =
-  message out_stream ch
+fun standard_message mbox with_serial ch body =
+  message mbox ch
     ((if with_serial then cons (Markup.serialN, serial_string ()) else I)
       (Position.properties_of (Position.thread_data ()))) body;
 
-fun init_message out_stream =
-  message out_stream "A" [] (Session.welcome ());
-
-end;
-
-
-(* channels *)
-
-local
-
-fun auto_flush stream =
+fun message_output mbox out_stream =
   let
-    val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream stream, IO.BLOCK_BUF);
-    fun loop () =
-      (OS.Process.sleep (Time.fromMilliseconds 20); try TextIO.flushOut stream; loop ());
-  in loop end;
+    fun loop receive =
+      (case receive mbox of
+        SOME msg =>
+         (List.app (fn s => TextIO.output (out_stream, s)) msg;
+          loop (Mailbox.receive_timeout (Time.fromMilliseconds 20)))
+      | NONE => (try TextIO.flushOut out_stream; loop (SOME o Mailbox.receive)));
+  in fn () => loop (SOME o Mailbox.receive) end;
 
 in
 
@@ -102,19 +93,22 @@
     val in_stream = TextIO.openIn in_fifo;
     val out_stream = TextIO.openOut out_fifo;
 
-    val _ = Simple_Thread.fork false (auto_flush out_stream);
-    val _ = Simple_Thread.fork false (auto_flush TextIO.stdOut);
-    val _ = Simple_Thread.fork false (auto_flush TextIO.stdErr);
+    val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdOut, IO.LINE_BUF);
+    val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdErr, IO.LINE_BUF);
+
+    val mbox = Mailbox.create () : string list Mailbox.T;
+    val _ = Simple_Thread.fork false (message_output mbox out_stream);
   in
-    Output.Private_Hooks.status_fn := standard_message out_stream false "B";
-    Output.Private_Hooks.report_fn := standard_message out_stream false "C";
-    Output.Private_Hooks.writeln_fn := standard_message out_stream true "D";
-    Output.Private_Hooks.tracing_fn := standard_message out_stream true "E";
-    Output.Private_Hooks.warning_fn := standard_message out_stream true "F";
-    Output.Private_Hooks.error_fn := standard_message out_stream true "G";
+    Output.Private_Hooks.status_fn := standard_message mbox false "B";
+    Output.Private_Hooks.report_fn := standard_message mbox false "C";
+    Output.Private_Hooks.writeln_fn := standard_message mbox true "D";
+    Output.Private_Hooks.tracing_fn := standard_message mbox true "E";
+    Output.Private_Hooks.warning_fn := standard_message mbox true "F";
+    Output.Private_Hooks.error_fn := standard_message mbox true "G";
     Output.Private_Hooks.urgent_message_fn := ! Output.Private_Hooks.writeln_fn;
     Output.Private_Hooks.prompt_fn := ignore;
-    (in_stream, out_stream)
+    message mbox "A" [] (Session.welcome ());
+    in_stream
   end;
 
 end;
@@ -179,8 +173,7 @@
     val _ = Unsynchronized.change print_mode
       (fold (update op =) [isabelle_processN, Keyword.keyword_statusN, Pretty.symbolicN]);
 
-    val (in_stream, out_stream) = setup_channels in_fifo out_fifo;
-    val _ = init_message out_stream;
+    val in_stream = setup_channels in_fifo out_fifo;
     val _ = Keyword.status ();
     val _ = Output.status (Markup.markup Markup.ready "process ready");
   in loop in_stream end));