explicit shutdown of message output thread;
authorwenzelm
Wed, 10 Jul 2013 22:56:48 +0200
changeset 52583 0a7240d88e09
parent 52582 31467a4b1466
child 52584 5cad4a5f5615
explicit shutdown of message output thread;
src/Pure/Concurrent/mailbox.ML
src/Pure/Concurrent/simple_thread.ML
src/Pure/System/isabelle_process.ML
--- a/src/Pure/Concurrent/mailbox.ML	Wed Jul 10 22:04:57 2013 +0200
+++ b/src/Pure/Concurrent/mailbox.ML	Wed Jul 10 22:56:48 2013 +0200
@@ -12,6 +12,7 @@
   val send: 'a T -> 'a -> unit
   val receive: 'a T -> 'a
   val receive_timeout: Time.time -> 'a T -> 'a option
+  val await_empty: 'a T -> unit
 end;
 
 structure Mailbox: MAILBOX =
@@ -31,4 +32,8 @@
   Synchronized.timed_access mailbox
     (fn _ => SOME (Time.+ (Time.now (), timeout))) (try Queue.dequeue);
 
+fun await_empty (Mailbox mailbox) =
+  Synchronized.guarded_access mailbox
+    (fn queue => if Queue.is_empty queue then SOME ((), queue) else NONE);
+
 end;
--- a/src/Pure/Concurrent/simple_thread.ML	Wed Jul 10 22:04:57 2013 +0200
+++ b/src/Pure/Concurrent/simple_thread.ML	Wed Jul 10 22:56:48 2013 +0200
@@ -9,6 +9,7 @@
   val is_self: Thread.thread -> bool
   val attributes: bool -> Thread.threadAttribute list
   val fork: bool -> (unit -> unit) -> Thread.thread
+  val join: Thread.thread -> unit
   val interrupt_unsynchronized: Thread.thread -> unit
   val synchronized: string -> Mutex.mutex -> (unit -> 'a) -> 'a
 end;
@@ -27,6 +28,10 @@
       body () handle exn => if Exn.is_interrupt exn then () else reraise exn),
     attributes interrupts);
 
+fun join thread =
+  while Thread.isActive thread
+  do OS.Process.sleep (seconds 0.1);
+
 fun interrupt_unsynchronized thread = Thread.interrupt thread handle Thread _ => ();
 
 
--- a/src/Pure/System/isabelle_process.ML	Wed Jul 10 22:04:57 2013 +0200
+++ b/src/Pure/System/isabelle_process.ML	Wed Jul 10 22:56:48 2013 +0200
@@ -106,31 +106,43 @@
 fun flush channel = ignore (try System_Channel.flush channel);
 
 datatype target =
-  Channel of System_Channel.T |
-  Mailbox of (string list * bool) Mailbox.T;
+  Sync of {send: string list -> unit} |
+  Async of {send: string list -> bool -> unit, shutdown: unit -> unit};
 
 fun message do_flush target name raw_props body =
   let
     val robust_props = map (pairself YXML.embed_controls) raw_props;
     val header = YXML.string_of (XML.Elem ((name, robust_props), []));
     val msg = chunk header @ chunk body;
-  in
-    (case target of
-      Channel channel => (List.app (fn s => System_Channel.output channel s) msg; flush channel)
-    | Mailbox mbox => Mailbox.send mbox (msg, do_flush))
-  end;
+  in (case target of Sync {send} => send msg | Async {send, ...} => send msg do_flush) end;
 
 fun message_output mbox channel =
   let
     fun loop receive =
       (case receive mbox of
-        SOME (msg, do_flush) =>
+        SOME NONE => flush channel
+      | SOME (SOME (msg, do_flush)) =>
          (List.app (fn s => System_Channel.output channel s) msg;
           if do_flush then flush channel else ();
           loop (Mailbox.receive_timeout (seconds 0.02)))
       | NONE => (flush channel; loop (SOME o Mailbox.receive)));
   in fn () => loop (SOME o Mailbox.receive) end;
 
+fun make_target channel =
+  if Multithreading.available then
+    let
+      val mbox = Mailbox.create ();
+      val thread = Simple_Thread.fork false (message_output mbox channel);
+    in
+      Async {
+        send = fn msg => fn do_flush => Mailbox.send mbox (SOME (msg, do_flush)),
+        shutdown = fn () =>
+          (Mailbox.send mbox NONE; Mailbox.await_empty mbox; Simple_Thread.join thread)
+      }
+    end
+  else
+    Sync {send = fn msg => (List.app (fn s => System_Channel.output channel s) msg; flush channel)};
+
 in
 
 fun init_channels channel =
@@ -138,13 +150,7 @@
     val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdOut, IO.LINE_BUF);
     val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdErr, IO.LINE_BUF);
 
-    val target =
-      if Multithreading.available then
-        let
-          val mbox = Mailbox.create ();
-          val _ = Simple_Thread.fork false (message_output mbox channel);
-        in Mailbox mbox end
-      else Channel channel;
+    val target = make_target channel;
 
     fun standard_message opt_serial name body =
       if body = "" then ()
@@ -167,7 +173,8 @@
     Output.Private_Hooks.protocol_message_fn := message true target Markup.protocolN;
     Output.Private_Hooks.urgent_message_fn := ! Output.Private_Hooks.writeln_fn;
     Output.Private_Hooks.prompt_fn := ignore;
-    message true target Markup.initN [] (Session.welcome ())
+    message true target Markup.initN [] (Session.welcome ());
+    (case target of Async {shutdown, ...} => shutdown | _ => fn () => ())
   end;
 
 end;
@@ -235,9 +242,10 @@
         (fn mode => (mode @ default_modes1) |> fold (update op =) default_modes2);
 
     val channel = rendezvous ();
-    val _ = init_channels channel;
+    val shutdown_channels = init_channels channel;
     val _ = Session.init_protocol_handlers ();
-  in loop channel end);
+    val _ = loop channel;
+  in shutdown_channels () end);
 
 fun init_fifos fifo1 fifo2 = init (fn () => System_Channel.fifo_rendezvous fifo1 fifo2);
 fun init_socket name = init (fn () => System_Channel.socket_rendezvous name);