more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
--- a/src/Doc/Implementation/ML.thy Fri Jun 27 19:38:32 2014 +0200
+++ b/src/Doc/Implementation/ML.thy Fri Jun 27 22:08:55 2014 +0200
@@ -1935,7 +1935,7 @@
text {* \medskip See @{file "~~/src/Pure/Concurrent/mailbox.ML"} how
to implement a mailbox as synchronized variable over a purely
- functional queue. *}
+ functional list. *}
section {* Managed evaluation *}
--- a/src/Pure/Concurrent/consumer_thread.scala Fri Jun 27 19:38:32 2014 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala Fri Jun 27 22:08:55 2014 +0200
@@ -30,9 +30,9 @@
name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
{
private var active = true
- private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
+ private val mailbox = Mailbox[Option[Consumer_Thread.Request[A]]]
- private val thread = Simple_Thread.fork(name, daemon) { main_loop() }
+ private val thread = Simple_Thread.fork(name, daemon) { main_loop(Nil) }
def is_active: Boolean = active && thread.isAlive
private def failure(exn: Throwable): Unit =
@@ -42,9 +42,10 @@
private def robust_finish(): Unit =
try { finish() } catch { case exn: Throwable => failure(exn) }
- @tailrec private def main_loop(): Unit =
- mbox.receive match {
- case Some((arg, ack)) =>
+ @tailrec private def main_loop(msgs: List[Option[Consumer_Thread.Request[A]]]): Unit =
+ msgs match {
+ case Nil => main_loop(mailbox.receive(None))
+ case Some((arg, ack)) :: rest =>
val result = Exn.capture { consume(arg) }
val continue =
result match {
@@ -54,8 +55,8 @@
true
}
ack.foreach(a => a.change(_ => Some(result)))
- if (continue) main_loop() else robust_finish()
- case None => robust_finish()
+ if (continue) main_loop(rest) else robust_finish()
+ case None :: _ => robust_finish()
}
assert(is_active)
@@ -66,7 +67,7 @@
private def request(x: A, ack: Option[Consumer_Thread.Ack])
{
synchronized {
- if (is_active) mbox.send(Some((x, ack)))
+ if (is_active) mailbox.send(Some((x, ack)))
else error("Consumer thread not active: " + quote(thread.getName))
}
ack.foreach(a =>
@@ -78,7 +79,7 @@
def shutdown(): Unit =
{
- synchronized { if (is_active) { active = false; mbox.send(None) } }
+ synchronized { if (is_active) { active = false; mailbox.send(None) } }
thread.join
}
}
--- a/src/Pure/Concurrent/mailbox.ML Fri Jun 27 19:38:32 2014 +0200
+++ b/src/Pure/Concurrent/mailbox.ML Fri Jun 27 22:08:55 2014 +0200
@@ -1,8 +1,8 @@
(* Title: Pure/Concurrent/mailbox.ML
Author: Makarius
-Message exchange via mailbox, with non-blocking send (due to unbounded
-queueing) and potentially blocking receive.
+Message exchange via mailbox, with multiple senders (non-blocking,
+unbounded buffering) and single receiver (bulk messages).
*)
signature MAILBOX =
@@ -10,30 +10,26 @@
type 'a T
val create: unit -> 'a T
val send: 'a T -> 'a -> unit
- val receive: 'a T -> 'a
- val receive_timeout: Time.time -> 'a T -> 'a option
+ val receive: Time.time option -> 'a T -> 'a list
val await_empty: 'a T -> unit
end;
structure Mailbox: MAILBOX =
struct
-datatype 'a T = Mailbox of 'a Queue.T Synchronized.var;
+datatype 'a T = Mailbox of 'a list Synchronized.var;
-fun create () = Mailbox (Synchronized.var "mailbox" Queue.empty);
+fun create () = Mailbox (Synchronized.var "mailbox" []);
-fun send (Mailbox mailbox) msg =
- Synchronized.change mailbox (Queue.enqueue msg);
+fun send (Mailbox mailbox) msg = Synchronized.change mailbox (cons msg);
-fun receive (Mailbox mailbox) =
- Synchronized.guarded_access mailbox (try Queue.dequeue);
-
-fun receive_timeout timeout (Mailbox mailbox) =
+fun receive timeout (Mailbox mailbox) =
Synchronized.timed_access mailbox
- (fn _ => SOME (Time.+ (Time.now (), timeout))) (try Queue.dequeue);
+ (fn _ => Option.map (fn t => (Time.+ (Time.now (), t))) timeout)
+ (fn [] => NONE | msgs => SOME (msgs, []))
+ |> these |> rev;
fun await_empty (Mailbox mailbox) =
- Synchronized.guarded_access mailbox
- (fn queue => if Queue.is_empty queue then SOME ((), queue) else NONE);
+ Synchronized.guarded_access mailbox (fn [] => SOME ((), []) | _ => NONE);
end;
--- a/src/Pure/Concurrent/mailbox.scala Fri Jun 27 19:38:32 2014 +0200
+++ b/src/Pure/Concurrent/mailbox.scala Fri Jun 27 22:08:55 2014 +0200
@@ -2,16 +2,13 @@
Module: PIDE
Author: Makarius
-Message exchange via mailbox, with non-blocking send (due to unbounded
-queueing) and potentially blocking receive.
+Message exchange via mailbox, with multiple senders (non-blocking,
+unbounded buffering) and single receiver (bulk messages).
*/
package isabelle
-import scala.collection.immutable.Queue
-
-
object Mailbox
{
def apply[A]: Mailbox[A] = new Mailbox[A]()
@@ -20,18 +17,15 @@
class Mailbox[A] private()
{
- private val mailbox = Synchronized(Queue.empty[A])
- override def toString: String = mailbox.value.mkString("Mailbox(", ",", ")")
-
- def send(msg: A): Unit =
- mailbox.change(_.enqueue(msg))
+ private val mailbox = Synchronized(List.empty[A])
+ override def toString: String = mailbox.value.reverse.mkString("Mailbox(", ",", ")")
- def receive: A =
- mailbox.guarded_access(_.dequeueOption)
+ def send(msg: A): Unit = mailbox.change(msg :: _)
- def receive_timeout(timeout: Time): Option[A] =
- mailbox.timed_access(_ => Some(Time.now() + timeout), _.dequeueOption)
+ def receive(timeout: Option[Time]): List[A] =
+ (mailbox.timed_access(_ => timeout.map(t => Time.now() + t),
+ { case Nil => None case msgs => Some((msgs, Nil)) }) getOrElse Nil).reverse
def await_empty: Unit =
- mailbox.guarded_access(queue => if (queue.isEmpty) Some(((), queue)) else None)
+ mailbox.guarded_access({ case Nil => Some(((), Nil)) case _ => None })
}
--- a/src/Pure/System/message_channel.ML Fri Jun 27 19:38:32 2014 +0200
+++ b/src/Pure/System/message_channel.ML Fri Jun 27 22:08:55 2014 +0200
@@ -42,17 +42,19 @@
fun shutdown (Message_Channel {shutdown, ...}) = shutdown ();
fun flush channel = ignore (try System_Channel.flush channel);
+val flush_timeout = SOME (seconds 0.02);
fun message_output mbox channel =
let
- fun loop receive =
- (case receive mbox of
- SOME NONE => flush channel
- | SOME (SOME msg) =>
- (output_message channel msg;
- loop (Mailbox.receive_timeout (seconds 0.02)))
- | NONE => (flush channel; loop (SOME o Mailbox.receive)));
- in fn () => loop (SOME o Mailbox.receive) end;
+ fun continue timeout =
+ (case Mailbox.receive timeout mbox of
+ [] => (flush channel; continue NONE)
+ | msgs => received timeout msgs)
+ and received _ (NONE :: _) = flush channel
+ | received timeout (SOME msg :: rest) =
+ (output_message channel msg; received flush_timeout rest)
+ | received timeout [] = continue timeout;
+ in fn () => continue NONE end;
fun make channel =
if Multithreading.available then