# HG changeset patch # User wenzelm # Date 1373489808 -7200 # Node ID 0a7240d88e0922e57a1983e05a83595cf40d37c8 # Parent 31467a4b14664b40809b17397a5af516854efdf9 explicit shutdown of message output thread; diff -r 31467a4b1466 -r 0a7240d88e09 src/Pure/Concurrent/mailbox.ML --- a/src/Pure/Concurrent/mailbox.ML Wed Jul 10 22:04:57 2013 +0200 +++ b/src/Pure/Concurrent/mailbox.ML Wed Jul 10 22:56:48 2013 +0200 @@ -12,6 +12,7 @@ val send: 'a T -> 'a -> unit val receive: 'a T -> 'a val receive_timeout: Time.time -> 'a T -> 'a option + val await_empty: 'a T -> unit end; structure Mailbox: MAILBOX = @@ -31,4 +32,8 @@ Synchronized.timed_access mailbox (fn _ => SOME (Time.+ (Time.now (), timeout))) (try Queue.dequeue); +fun await_empty (Mailbox mailbox) = + Synchronized.guarded_access mailbox + (fn queue => if Queue.is_empty queue then SOME ((), queue) else NONE); + end; diff -r 31467a4b1466 -r 0a7240d88e09 src/Pure/Concurrent/simple_thread.ML --- a/src/Pure/Concurrent/simple_thread.ML Wed Jul 10 22:04:57 2013 +0200 +++ b/src/Pure/Concurrent/simple_thread.ML Wed Jul 10 22:56:48 2013 +0200 @@ -9,6 +9,7 @@ val is_self: Thread.thread -> bool val attributes: bool -> Thread.threadAttribute list val fork: bool -> (unit -> unit) -> Thread.thread + val join: Thread.thread -> unit val interrupt_unsynchronized: Thread.thread -> unit val synchronized: string -> Mutex.mutex -> (unit -> 'a) -> 'a end; @@ -27,6 +28,10 @@ body () handle exn => if Exn.is_interrupt exn then () else reraise exn), attributes interrupts); +fun join thread = + while Thread.isActive thread + do OS.Process.sleep (seconds 0.1); + fun interrupt_unsynchronized thread = Thread.interrupt thread handle Thread _ => (); diff -r 31467a4b1466 -r 0a7240d88e09 src/Pure/System/isabelle_process.ML --- a/src/Pure/System/isabelle_process.ML Wed Jul 10 22:04:57 2013 +0200 +++ b/src/Pure/System/isabelle_process.ML Wed Jul 10 22:56:48 2013 +0200 @@ -106,31 +106,43 @@ fun flush channel = ignore (try System_Channel.flush channel); datatype target = - Channel of System_Channel.T | - Mailbox of (string list * bool) Mailbox.T; + Sync of {send: string list -> unit} | + Async of {send: string list -> bool -> unit, shutdown: unit -> unit}; fun message do_flush target name raw_props body = let val robust_props = map (pairself YXML.embed_controls) raw_props; val header = YXML.string_of (XML.Elem ((name, robust_props), [])); val msg = chunk header @ chunk body; - in - (case target of - Channel channel => (List.app (fn s => System_Channel.output channel s) msg; flush channel) - | Mailbox mbox => Mailbox.send mbox (msg, do_flush)) - end; + in (case target of Sync {send} => send msg | Async {send, ...} => send msg do_flush) end; fun message_output mbox channel = let fun loop receive = (case receive mbox of - SOME (msg, do_flush) => + SOME NONE => flush channel + | SOME (SOME (msg, do_flush)) => (List.app (fn s => System_Channel.output channel s) msg; if do_flush then flush channel else (); loop (Mailbox.receive_timeout (seconds 0.02))) | NONE => (flush channel; loop (SOME o Mailbox.receive))); in fn () => loop (SOME o Mailbox.receive) end; +fun make_target channel = + if Multithreading.available then + let + val mbox = Mailbox.create (); + val thread = Simple_Thread.fork false (message_output mbox channel); + in + Async { + send = fn msg => fn do_flush => Mailbox.send mbox (SOME (msg, do_flush)), + shutdown = fn () => + (Mailbox.send mbox NONE; Mailbox.await_empty mbox; Simple_Thread.join thread) + } + end + else + Sync {send = fn msg => (List.app (fn s => System_Channel.output channel s) msg; flush channel)}; + in fun init_channels channel = @@ -138,13 +150,7 @@ val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdOut, IO.LINE_BUF); val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdErr, IO.LINE_BUF); - val target = - if Multithreading.available then - let - val mbox = Mailbox.create (); - val _ = Simple_Thread.fork false (message_output mbox channel); - in Mailbox mbox end - else Channel channel; + val target = make_target channel; fun standard_message opt_serial name body = if body = "" then () @@ -167,7 +173,8 @@ Output.Private_Hooks.protocol_message_fn := message true target Markup.protocolN; Output.Private_Hooks.urgent_message_fn := ! Output.Private_Hooks.writeln_fn; Output.Private_Hooks.prompt_fn := ignore; - message true target Markup.initN [] (Session.welcome ()) + message true target Markup.initN [] (Session.welcome ()); + (case target of Async {shutdown, ...} => shutdown | _ => fn () => ()) end; end; @@ -235,9 +242,10 @@ (fn mode => (mode @ default_modes1) |> fold (update op =) default_modes2); val channel = rendezvous (); - val _ = init_channels channel; + val shutdown_channels = init_channels channel; val _ = Session.init_protocol_handlers (); - in loop channel end); + val _ = loop channel; + in shutdown_channels () end); fun init_fifos fifo1 fifo2 = init (fn () => System_Channel.fifo_rendezvous fifo1 fifo2); fun init_socket name = init (fn () => System_Channel.socket_rendezvous name);