src/Pure/System/message_channel.ML
author wenzelm
Sat, 28 Mar 2020 19:33:14 +0100
changeset 71612 e0a5d6068141
parent 70995 2c17fa0f5187
child 71692 f8e52c0152fe
permissions -rw-r--r--
tuned;

(*  Title:      Pure/System/message_channel.ML
    Author:     Makarius

Preferably asynchronous channel for Isabelle messages.
*)

signature MESSAGE_CHANNEL =
sig
  type message
  val message: string -> Properties.T -> XML.body -> message
  type T
  val send: T -> message -> unit
  val shutdown: T -> unit
  val make: BinIO.outstream -> T
end;

structure Message_Channel: MESSAGE_CHANNEL =
struct

(* message *)

datatype message = Message of XML.body;

fun body_size body = fold (YXML.traverse (Integer.add o size)) body 0;

fun chunk body = XML.Text (string_of_int (body_size body) ^ "\n") :: body;

fun message name raw_props body =
  let
    val robust_props = map (apply2 YXML.embed_controls) raw_props;
    val header = XML.Elem ((name, robust_props), []);
  in Message (chunk [header] @ chunk body) end;

fun output_message stream (Message body) =
  fold (YXML.traverse (fn s => fn () => File.output stream s)) body ();


(* channel *)

datatype T = Message_Channel of {send: message -> unit, shutdown: unit -> unit};

fun send (Message_Channel {send, ...}) = send;
fun shutdown (Message_Channel {shutdown, ...}) = shutdown ();

val flush_timeout = SOME (seconds 0.02);

fun message_output mbox stream =
  let
    fun continue timeout =
      (case Mailbox.receive timeout mbox of
        [] => (Byte_Message.flush stream; continue NONE)
      | msgs => received timeout msgs)
    and received _ (NONE :: _) = Byte_Message.flush stream
      | received _ (SOME msg :: rest) = (output_message stream msg; received flush_timeout rest)
      | received timeout [] = continue timeout;
  in fn () => continue NONE end;

fun make stream =
  let
    val mbox = Mailbox.create ();
    val thread =
      Standard_Thread.fork {name = "channel", stack_limit = NONE, interrupts = false}
        (message_output mbox stream);
    fun send msg = Mailbox.send mbox (SOME msg);
    fun shutdown () =
      (Mailbox.send mbox NONE; Mailbox.await_empty mbox; Standard_Thread.join thread);
  in Message_Channel {send = send, shutdown = shutdown} end;

end;