--- a/src/Pure/Concurrent/mailbox.ML Wed Jul 10 22:04:57 2013 +0200
+++ b/src/Pure/Concurrent/mailbox.ML Wed Jul 10 22:56:48 2013 +0200
@@ -12,6 +12,7 @@
val send: 'a T -> 'a -> unit
val receive: 'a T -> 'a
val receive_timeout: Time.time -> 'a T -> 'a option
+ val await_empty: 'a T -> unit
end;
structure Mailbox: MAILBOX =
@@ -31,4 +32,8 @@
Synchronized.timed_access mailbox
(fn _ => SOME (Time.+ (Time.now (), timeout))) (try Queue.dequeue);
+fun await_empty (Mailbox mailbox) =
+ Synchronized.guarded_access mailbox
+ (fn queue => if Queue.is_empty queue then SOME ((), queue) else NONE);
+
end;
--- a/src/Pure/Concurrent/simple_thread.ML Wed Jul 10 22:04:57 2013 +0200
+++ b/src/Pure/Concurrent/simple_thread.ML Wed Jul 10 22:56:48 2013 +0200
@@ -9,6 +9,7 @@
val is_self: Thread.thread -> bool
val attributes: bool -> Thread.threadAttribute list
val fork: bool -> (unit -> unit) -> Thread.thread
+ val join: Thread.thread -> unit
val interrupt_unsynchronized: Thread.thread -> unit
val synchronized: string -> Mutex.mutex -> (unit -> 'a) -> 'a
end;
@@ -27,6 +28,10 @@
body () handle exn => if Exn.is_interrupt exn then () else reraise exn),
attributes interrupts);
+fun join thread =
+ while Thread.isActive thread
+ do OS.Process.sleep (seconds 0.1);
+
fun interrupt_unsynchronized thread = Thread.interrupt thread handle Thread _ => ();
--- a/src/Pure/System/isabelle_process.ML Wed Jul 10 22:04:57 2013 +0200
+++ b/src/Pure/System/isabelle_process.ML Wed Jul 10 22:56:48 2013 +0200
@@ -106,31 +106,43 @@
fun flush channel = ignore (try System_Channel.flush channel);
datatype target =
- Channel of System_Channel.T |
- Mailbox of (string list * bool) Mailbox.T;
+ Sync of {send: string list -> unit} |
+ Async of {send: string list -> bool -> unit, shutdown: unit -> unit};
fun message do_flush target name raw_props body =
let
val robust_props = map (pairself YXML.embed_controls) raw_props;
val header = YXML.string_of (XML.Elem ((name, robust_props), []));
val msg = chunk header @ chunk body;
- in
- (case target of
- Channel channel => (List.app (fn s => System_Channel.output channel s) msg; flush channel)
- | Mailbox mbox => Mailbox.send mbox (msg, do_flush))
- end;
+ in (case target of Sync {send} => send msg | Async {send, ...} => send msg do_flush) end;
fun message_output mbox channel =
let
fun loop receive =
(case receive mbox of
- SOME (msg, do_flush) =>
+ SOME NONE => flush channel
+ | SOME (SOME (msg, do_flush)) =>
(List.app (fn s => System_Channel.output channel s) msg;
if do_flush then flush channel else ();
loop (Mailbox.receive_timeout (seconds 0.02)))
| NONE => (flush channel; loop (SOME o Mailbox.receive)));
in fn () => loop (SOME o Mailbox.receive) end;
+fun make_target channel =
+ if Multithreading.available then
+ let
+ val mbox = Mailbox.create ();
+ val thread = Simple_Thread.fork false (message_output mbox channel);
+ in
+ Async {
+ send = fn msg => fn do_flush => Mailbox.send mbox (SOME (msg, do_flush)),
+ shutdown = fn () =>
+ (Mailbox.send mbox NONE; Mailbox.await_empty mbox; Simple_Thread.join thread)
+ }
+ end
+ else
+ Sync {send = fn msg => (List.app (fn s => System_Channel.output channel s) msg; flush channel)};
+
in
fun init_channels channel =
@@ -138,13 +150,7 @@
val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdOut, IO.LINE_BUF);
val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdErr, IO.LINE_BUF);
- val target =
- if Multithreading.available then
- let
- val mbox = Mailbox.create ();
- val _ = Simple_Thread.fork false (message_output mbox channel);
- in Mailbox mbox end
- else Channel channel;
+ val target = make_target channel;
fun standard_message opt_serial name body =
if body = "" then ()
@@ -167,7 +173,8 @@
Output.Private_Hooks.protocol_message_fn := message true target Markup.protocolN;
Output.Private_Hooks.urgent_message_fn := ! Output.Private_Hooks.writeln_fn;
Output.Private_Hooks.prompt_fn := ignore;
- message true target Markup.initN [] (Session.welcome ())
+ message true target Markup.initN [] (Session.welcome ());
+ (case target of Async {shutdown, ...} => shutdown | _ => fn () => ())
end;
end;
@@ -235,9 +242,10 @@
(fn mode => (mode @ default_modes1) |> fold (update op =) default_modes2);
val channel = rendezvous ();
- val _ = init_channels channel;
+ val shutdown_channels = init_channels channel;
val _ = Session.init_protocol_handlers ();
- in loop channel end);
+ val _ = loop channel;
+ in shutdown_channels () end);
fun init_fifos fifo1 fifo2 = init (fn () => System_Channel.fifo_rendezvous fifo1 fifo2);
fun init_socket name = init (fn () => System_Channel.socket_rendezvous name);