explicit shutdown of message output thread;
authorwenzelm
Wed Jul 10 22:56:48 2013 +0200 (2013-07-10 ago)
changeset 525830a7240d88e09
parent 52582 31467a4b1466
child 52584 5cad4a5f5615
explicit shutdown of message output thread;
src/Pure/Concurrent/mailbox.ML
src/Pure/Concurrent/simple_thread.ML
src/Pure/System/isabelle_process.ML
     1.1 --- a/src/Pure/Concurrent/mailbox.ML	Wed Jul 10 22:04:57 2013 +0200
     1.2 +++ b/src/Pure/Concurrent/mailbox.ML	Wed Jul 10 22:56:48 2013 +0200
     1.3 @@ -12,6 +12,7 @@
     1.4    val send: 'a T -> 'a -> unit
     1.5    val receive: 'a T -> 'a
     1.6    val receive_timeout: Time.time -> 'a T -> 'a option
     1.7 +  val await_empty: 'a T -> unit
     1.8  end;
     1.9  
    1.10  structure Mailbox: MAILBOX =
    1.11 @@ -31,4 +32,8 @@
    1.12    Synchronized.timed_access mailbox
    1.13      (fn _ => SOME (Time.+ (Time.now (), timeout))) (try Queue.dequeue);
    1.14  
    1.15 +fun await_empty (Mailbox mailbox) =
    1.16 +  Synchronized.guarded_access mailbox
    1.17 +    (fn queue => if Queue.is_empty queue then SOME ((), queue) else NONE);
    1.18 +
    1.19  end;
     2.1 --- a/src/Pure/Concurrent/simple_thread.ML	Wed Jul 10 22:04:57 2013 +0200
     2.2 +++ b/src/Pure/Concurrent/simple_thread.ML	Wed Jul 10 22:56:48 2013 +0200
     2.3 @@ -9,6 +9,7 @@
     2.4    val is_self: Thread.thread -> bool
     2.5    val attributes: bool -> Thread.threadAttribute list
     2.6    val fork: bool -> (unit -> unit) -> Thread.thread
     2.7 +  val join: Thread.thread -> unit
     2.8    val interrupt_unsynchronized: Thread.thread -> unit
     2.9    val synchronized: string -> Mutex.mutex -> (unit -> 'a) -> 'a
    2.10  end;
    2.11 @@ -27,6 +28,10 @@
    2.12        body () handle exn => if Exn.is_interrupt exn then () else reraise exn),
    2.13      attributes interrupts);
    2.14  
    2.15 +fun join thread =
    2.16 +  while Thread.isActive thread
    2.17 +  do OS.Process.sleep (seconds 0.1);
    2.18 +
    2.19  fun interrupt_unsynchronized thread = Thread.interrupt thread handle Thread _ => ();
    2.20  
    2.21  
     3.1 --- a/src/Pure/System/isabelle_process.ML	Wed Jul 10 22:04:57 2013 +0200
     3.2 +++ b/src/Pure/System/isabelle_process.ML	Wed Jul 10 22:56:48 2013 +0200
     3.3 @@ -106,31 +106,43 @@
     3.4  fun flush channel = ignore (try System_Channel.flush channel);
     3.5  
     3.6  datatype target =
     3.7 -  Channel of System_Channel.T |
     3.8 -  Mailbox of (string list * bool) Mailbox.T;
     3.9 +  Sync of {send: string list -> unit} |
    3.10 +  Async of {send: string list -> bool -> unit, shutdown: unit -> unit};
    3.11  
    3.12  fun message do_flush target name raw_props body =
    3.13    let
    3.14      val robust_props = map (pairself YXML.embed_controls) raw_props;
    3.15      val header = YXML.string_of (XML.Elem ((name, robust_props), []));
    3.16      val msg = chunk header @ chunk body;
    3.17 -  in
    3.18 -    (case target of
    3.19 -      Channel channel => (List.app (fn s => System_Channel.output channel s) msg; flush channel)
    3.20 -    | Mailbox mbox => Mailbox.send mbox (msg, do_flush))
    3.21 -  end;
    3.22 +  in (case target of Sync {send} => send msg | Async {send, ...} => send msg do_flush) end;
    3.23  
    3.24  fun message_output mbox channel =
    3.25    let
    3.26      fun loop receive =
    3.27        (case receive mbox of
    3.28 -        SOME (msg, do_flush) =>
    3.29 +        SOME NONE => flush channel
    3.30 +      | SOME (SOME (msg, do_flush)) =>
    3.31           (List.app (fn s => System_Channel.output channel s) msg;
    3.32            if do_flush then flush channel else ();
    3.33            loop (Mailbox.receive_timeout (seconds 0.02)))
    3.34        | NONE => (flush channel; loop (SOME o Mailbox.receive)));
    3.35    in fn () => loop (SOME o Mailbox.receive) end;
    3.36  
    3.37 +fun make_target channel =
    3.38 +  if Multithreading.available then
    3.39 +    let
    3.40 +      val mbox = Mailbox.create ();
    3.41 +      val thread = Simple_Thread.fork false (message_output mbox channel);
    3.42 +    in
    3.43 +      Async {
    3.44 +        send = fn msg => fn do_flush => Mailbox.send mbox (SOME (msg, do_flush)),
    3.45 +        shutdown = fn () =>
    3.46 +          (Mailbox.send mbox NONE; Mailbox.await_empty mbox; Simple_Thread.join thread)
    3.47 +      }
    3.48 +    end
    3.49 +  else
    3.50 +    Sync {send = fn msg => (List.app (fn s => System_Channel.output channel s) msg; flush channel)};
    3.51 +
    3.52  in
    3.53  
    3.54  fun init_channels channel =
    3.55 @@ -138,13 +150,7 @@
    3.56      val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdOut, IO.LINE_BUF);
    3.57      val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdErr, IO.LINE_BUF);
    3.58  
    3.59 -    val target =
    3.60 -      if Multithreading.available then
    3.61 -        let
    3.62 -          val mbox = Mailbox.create ();
    3.63 -          val _ = Simple_Thread.fork false (message_output mbox channel);
    3.64 -        in Mailbox mbox end
    3.65 -      else Channel channel;
    3.66 +    val target = make_target channel;
    3.67  
    3.68      fun standard_message opt_serial name body =
    3.69        if body = "" then ()
    3.70 @@ -167,7 +173,8 @@
    3.71      Output.Private_Hooks.protocol_message_fn := message true target Markup.protocolN;
    3.72      Output.Private_Hooks.urgent_message_fn := ! Output.Private_Hooks.writeln_fn;
    3.73      Output.Private_Hooks.prompt_fn := ignore;
    3.74 -    message true target Markup.initN [] (Session.welcome ())
    3.75 +    message true target Markup.initN [] (Session.welcome ());
    3.76 +    (case target of Async {shutdown, ...} => shutdown | _ => fn () => ())
    3.77    end;
    3.78  
    3.79  end;
    3.80 @@ -235,9 +242,10 @@
    3.81          (fn mode => (mode @ default_modes1) |> fold (update op =) default_modes2);
    3.82  
    3.83      val channel = rendezvous ();
    3.84 -    val _ = init_channels channel;
    3.85 +    val shutdown_channels = init_channels channel;
    3.86      val _ = Session.init_protocol_handlers ();
    3.87 -  in loop channel end);
    3.88 +    val _ = loop channel;
    3.89 +  in shutdown_channels () end);
    3.90  
    3.91  fun init_fifos fifo1 fifo2 = init (fn () => System_Channel.fifo_rendezvous fifo1 fifo2);
    3.92  fun init_socket name = init (fn () => System_Channel.socket_rendezvous name);