# HG changeset patch # User wenzelm # Date 1403899735 -7200 # Node ID 29fe9bac501bc4e3b0f711afb3f27e5376d9e91e # Parent 9188d901209d41c79b272fef9d888f17c6112033 more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section; diff -r 9188d901209d -r 29fe9bac501b src/Doc/Implementation/ML.thy --- a/src/Doc/Implementation/ML.thy Fri Jun 27 19:38:32 2014 +0200 +++ b/src/Doc/Implementation/ML.thy Fri Jun 27 22:08:55 2014 +0200 @@ -1935,7 +1935,7 @@ text {* \medskip See @{file "~~/src/Pure/Concurrent/mailbox.ML"} how to implement a mailbox as synchronized variable over a purely - functional queue. *} + functional list. *} section {* Managed evaluation *} diff -r 9188d901209d -r 29fe9bac501b src/Pure/Concurrent/consumer_thread.scala --- a/src/Pure/Concurrent/consumer_thread.scala Fri Jun 27 19:38:32 2014 +0200 +++ b/src/Pure/Concurrent/consumer_thread.scala Fri Jun 27 22:08:55 2014 +0200 @@ -30,9 +30,9 @@ name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit) { private var active = true - private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]] + private val mailbox = Mailbox[Option[Consumer_Thread.Request[A]]] - private val thread = Simple_Thread.fork(name, daemon) { main_loop() } + private val thread = Simple_Thread.fork(name, daemon) { main_loop(Nil) } def is_active: Boolean = active && thread.isAlive private def failure(exn: Throwable): Unit = @@ -42,9 +42,10 @@ private def robust_finish(): Unit = try { finish() } catch { case exn: Throwable => failure(exn) } - @tailrec private def main_loop(): Unit = - mbox.receive match { - case Some((arg, ack)) => + @tailrec private def main_loop(msgs: List[Option[Consumer_Thread.Request[A]]]): Unit = + msgs match { + case Nil => main_loop(mailbox.receive(None)) + case Some((arg, ack)) :: rest => val result = Exn.capture { consume(arg) } val continue = result match { @@ -54,8 +55,8 @@ true } ack.foreach(a => a.change(_ => Some(result))) - if (continue) main_loop() else robust_finish() - case None => robust_finish() + if (continue) main_loop(rest) else robust_finish() + case None :: _ => robust_finish() } assert(is_active) @@ -66,7 +67,7 @@ private def request(x: A, ack: Option[Consumer_Thread.Ack]) { synchronized { - if (is_active) mbox.send(Some((x, ack))) + if (is_active) mailbox.send(Some((x, ack))) else error("Consumer thread not active: " + quote(thread.getName)) } ack.foreach(a => @@ -78,7 +79,7 @@ def shutdown(): Unit = { - synchronized { if (is_active) { active = false; mbox.send(None) } } + synchronized { if (is_active) { active = false; mailbox.send(None) } } thread.join } } diff -r 9188d901209d -r 29fe9bac501b src/Pure/Concurrent/mailbox.ML --- a/src/Pure/Concurrent/mailbox.ML Fri Jun 27 19:38:32 2014 +0200 +++ b/src/Pure/Concurrent/mailbox.ML Fri Jun 27 22:08:55 2014 +0200 @@ -1,8 +1,8 @@ (* Title: Pure/Concurrent/mailbox.ML Author: Makarius -Message exchange via mailbox, with non-blocking send (due to unbounded -queueing) and potentially blocking receive. +Message exchange via mailbox, with multiple senders (non-blocking, +unbounded buffering) and single receiver (bulk messages). *) signature MAILBOX = @@ -10,30 +10,26 @@ type 'a T val create: unit -> 'a T val send: 'a T -> 'a -> unit - val receive: 'a T -> 'a - val receive_timeout: Time.time -> 'a T -> 'a option + val receive: Time.time option -> 'a T -> 'a list val await_empty: 'a T -> unit end; structure Mailbox: MAILBOX = struct -datatype 'a T = Mailbox of 'a Queue.T Synchronized.var; +datatype 'a T = Mailbox of 'a list Synchronized.var; -fun create () = Mailbox (Synchronized.var "mailbox" Queue.empty); +fun create () = Mailbox (Synchronized.var "mailbox" []); -fun send (Mailbox mailbox) msg = - Synchronized.change mailbox (Queue.enqueue msg); +fun send (Mailbox mailbox) msg = Synchronized.change mailbox (cons msg); -fun receive (Mailbox mailbox) = - Synchronized.guarded_access mailbox (try Queue.dequeue); - -fun receive_timeout timeout (Mailbox mailbox) = +fun receive timeout (Mailbox mailbox) = Synchronized.timed_access mailbox - (fn _ => SOME (Time.+ (Time.now (), timeout))) (try Queue.dequeue); + (fn _ => Option.map (fn t => (Time.+ (Time.now (), t))) timeout) + (fn [] => NONE | msgs => SOME (msgs, [])) + |> these |> rev; fun await_empty (Mailbox mailbox) = - Synchronized.guarded_access mailbox - (fn queue => if Queue.is_empty queue then SOME ((), queue) else NONE); + Synchronized.guarded_access mailbox (fn [] => SOME ((), []) | _ => NONE); end; diff -r 9188d901209d -r 29fe9bac501b src/Pure/Concurrent/mailbox.scala --- a/src/Pure/Concurrent/mailbox.scala Fri Jun 27 19:38:32 2014 +0200 +++ b/src/Pure/Concurrent/mailbox.scala Fri Jun 27 22:08:55 2014 +0200 @@ -2,16 +2,13 @@ Module: PIDE Author: Makarius -Message exchange via mailbox, with non-blocking send (due to unbounded -queueing) and potentially blocking receive. +Message exchange via mailbox, with multiple senders (non-blocking, +unbounded buffering) and single receiver (bulk messages). */ package isabelle -import scala.collection.immutable.Queue - - object Mailbox { def apply[A]: Mailbox[A] = new Mailbox[A]() @@ -20,18 +17,15 @@ class Mailbox[A] private() { - private val mailbox = Synchronized(Queue.empty[A]) - override def toString: String = mailbox.value.mkString("Mailbox(", ",", ")") - - def send(msg: A): Unit = - mailbox.change(_.enqueue(msg)) + private val mailbox = Synchronized(List.empty[A]) + override def toString: String = mailbox.value.reverse.mkString("Mailbox(", ",", ")") - def receive: A = - mailbox.guarded_access(_.dequeueOption) + def send(msg: A): Unit = mailbox.change(msg :: _) - def receive_timeout(timeout: Time): Option[A] = - mailbox.timed_access(_ => Some(Time.now() + timeout), _.dequeueOption) + def receive(timeout: Option[Time]): List[A] = + (mailbox.timed_access(_ => timeout.map(t => Time.now() + t), + { case Nil => None case msgs => Some((msgs, Nil)) }) getOrElse Nil).reverse def await_empty: Unit = - mailbox.guarded_access(queue => if (queue.isEmpty) Some(((), queue)) else None) + mailbox.guarded_access({ case Nil => Some(((), Nil)) case _ => None }) } diff -r 9188d901209d -r 29fe9bac501b src/Pure/System/message_channel.ML --- a/src/Pure/System/message_channel.ML Fri Jun 27 19:38:32 2014 +0200 +++ b/src/Pure/System/message_channel.ML Fri Jun 27 22:08:55 2014 +0200 @@ -42,17 +42,19 @@ fun shutdown (Message_Channel {shutdown, ...}) = shutdown (); fun flush channel = ignore (try System_Channel.flush channel); +val flush_timeout = SOME (seconds 0.02); fun message_output mbox channel = let - fun loop receive = - (case receive mbox of - SOME NONE => flush channel - | SOME (SOME msg) => - (output_message channel msg; - loop (Mailbox.receive_timeout (seconds 0.02))) - | NONE => (flush channel; loop (SOME o Mailbox.receive))); - in fn () => loop (SOME o Mailbox.receive) end; + fun continue timeout = + (case Mailbox.receive timeout mbox of + [] => (flush channel; continue NONE) + | msgs => received timeout msgs) + and received _ (NONE :: _) = flush channel + | received timeout (SOME msg :: rest) = + (output_message channel msg; received flush_timeout rest) + | received timeout [] = continue timeout; + in fn () => continue NONE end; fun make channel = if Multithreading.available then