more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
authorwenzelm
Fri, 27 Jun 2014 22:08:55 +0200
changeset 57417 29fe9bac501b
parent 57416 9188d901209d
child 57418 6ab1c7cb0b8d
child 57420 8103a3f6f342
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
src/Doc/Implementation/ML.thy
src/Pure/Concurrent/consumer_thread.scala
src/Pure/Concurrent/mailbox.ML
src/Pure/Concurrent/mailbox.scala
src/Pure/System/message_channel.ML
--- a/src/Doc/Implementation/ML.thy	Fri Jun 27 19:38:32 2014 +0200
+++ b/src/Doc/Implementation/ML.thy	Fri Jun 27 22:08:55 2014 +0200
@@ -1935,7 +1935,7 @@
 
 text {* \medskip See @{file "~~/src/Pure/Concurrent/mailbox.ML"} how
   to implement a mailbox as synchronized variable over a purely
-  functional queue. *}
+  functional list. *}
 
 
 section {* Managed evaluation *}
--- a/src/Pure/Concurrent/consumer_thread.scala	Fri Jun 27 19:38:32 2014 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala	Fri Jun 27 22:08:55 2014 +0200
@@ -30,9 +30,9 @@
   name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
 {
   private var active = true
-  private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
+  private val mailbox = Mailbox[Option[Consumer_Thread.Request[A]]]
 
-  private val thread = Simple_Thread.fork(name, daemon) { main_loop() }
+  private val thread = Simple_Thread.fork(name, daemon) { main_loop(Nil) }
   def is_active: Boolean = active && thread.isAlive
 
   private def failure(exn: Throwable): Unit =
@@ -42,9 +42,10 @@
   private def robust_finish(): Unit =
     try { finish() } catch { case exn: Throwable => failure(exn) }
 
-  @tailrec private def main_loop(): Unit =
-    mbox.receive match {
-      case Some((arg, ack)) =>
+  @tailrec private def main_loop(msgs: List[Option[Consumer_Thread.Request[A]]]): Unit =
+    msgs match {
+      case Nil => main_loop(mailbox.receive(None))
+      case Some((arg, ack)) :: rest =>
         val result = Exn.capture { consume(arg) }
         val continue =
           result match {
@@ -54,8 +55,8 @@
               true
           }
         ack.foreach(a => a.change(_ => Some(result)))
-        if (continue) main_loop() else robust_finish()
-      case None => robust_finish()
+        if (continue) main_loop(rest) else robust_finish()
+      case None :: _ => robust_finish()
     }
 
   assert(is_active)
@@ -66,7 +67,7 @@
   private def request(x: A, ack: Option[Consumer_Thread.Ack])
   {
     synchronized {
-      if (is_active) mbox.send(Some((x, ack)))
+      if (is_active) mailbox.send(Some((x, ack)))
       else error("Consumer thread not active: " + quote(thread.getName))
     }
     ack.foreach(a =>
@@ -78,7 +79,7 @@
 
   def shutdown(): Unit =
   {
-    synchronized { if (is_active) { active = false; mbox.send(None) } }
+    synchronized { if (is_active) { active = false; mailbox.send(None) } }
     thread.join
   }
 }
--- a/src/Pure/Concurrent/mailbox.ML	Fri Jun 27 19:38:32 2014 +0200
+++ b/src/Pure/Concurrent/mailbox.ML	Fri Jun 27 22:08:55 2014 +0200
@@ -1,8 +1,8 @@
 (*  Title:      Pure/Concurrent/mailbox.ML
     Author:     Makarius
 
-Message exchange via mailbox, with non-blocking send (due to unbounded
-queueing) and potentially blocking receive.
+Message exchange via mailbox, with multiple senders (non-blocking,
+unbounded buffering) and single receiver (bulk messages).
 *)
 
 signature MAILBOX =
@@ -10,30 +10,26 @@
   type 'a T
   val create: unit -> 'a T
   val send: 'a T -> 'a -> unit
-  val receive: 'a T -> 'a
-  val receive_timeout: Time.time -> 'a T -> 'a option
+  val receive: Time.time option -> 'a T -> 'a list
   val await_empty: 'a T -> unit
 end;
 
 structure Mailbox: MAILBOX =
 struct
 
-datatype 'a T = Mailbox of 'a Queue.T Synchronized.var;
+datatype 'a T = Mailbox of 'a list Synchronized.var;
 
-fun create () = Mailbox (Synchronized.var "mailbox" Queue.empty);
+fun create () = Mailbox (Synchronized.var "mailbox" []);
 
-fun send (Mailbox mailbox) msg =
-  Synchronized.change mailbox (Queue.enqueue msg);
+fun send (Mailbox mailbox) msg = Synchronized.change mailbox (cons msg);
 
-fun receive (Mailbox mailbox) =
-  Synchronized.guarded_access mailbox (try Queue.dequeue);
-
-fun receive_timeout timeout (Mailbox mailbox) =
+fun receive timeout (Mailbox mailbox) =
   Synchronized.timed_access mailbox
-    (fn _ => SOME (Time.+ (Time.now (), timeout))) (try Queue.dequeue);
+    (fn _ => Option.map (fn t => (Time.+ (Time.now (), t))) timeout)
+    (fn [] => NONE | msgs => SOME (msgs, []))
+  |> these |> rev;
 
 fun await_empty (Mailbox mailbox) =
-  Synchronized.guarded_access mailbox
-    (fn queue => if Queue.is_empty queue then SOME ((), queue) else NONE);
+  Synchronized.guarded_access mailbox (fn [] => SOME ((), []) | _ => NONE);
 
 end;
--- a/src/Pure/Concurrent/mailbox.scala	Fri Jun 27 19:38:32 2014 +0200
+++ b/src/Pure/Concurrent/mailbox.scala	Fri Jun 27 22:08:55 2014 +0200
@@ -2,16 +2,13 @@
     Module:     PIDE
     Author:     Makarius
 
-Message exchange via mailbox, with non-blocking send (due to unbounded
-queueing) and potentially blocking receive.
+Message exchange via mailbox, with multiple senders (non-blocking,
+unbounded buffering) and single receiver (bulk messages).
 */
 
 package isabelle
 
 
-import scala.collection.immutable.Queue
-
-
 object Mailbox
 {
   def apply[A]: Mailbox[A] = new Mailbox[A]()
@@ -20,18 +17,15 @@
 
 class Mailbox[A] private()
 {
-  private val mailbox = Synchronized(Queue.empty[A])
-  override def toString: String = mailbox.value.mkString("Mailbox(", ",", ")")
-
-  def send(msg: A): Unit =
-    mailbox.change(_.enqueue(msg))
+  private val mailbox = Synchronized(List.empty[A])
+  override def toString: String = mailbox.value.reverse.mkString("Mailbox(", ",", ")")
 
-  def receive: A =
-    mailbox.guarded_access(_.dequeueOption)
+  def send(msg: A): Unit = mailbox.change(msg :: _)
 
-  def receive_timeout(timeout: Time): Option[A] =
-    mailbox.timed_access(_ => Some(Time.now() + timeout), _.dequeueOption)
+  def receive(timeout: Option[Time]): List[A] =
+    (mailbox.timed_access(_ => timeout.map(t => Time.now() + t),
+      { case Nil => None case msgs => Some((msgs, Nil)) }) getOrElse Nil).reverse
 
   def await_empty: Unit =
-    mailbox.guarded_access(queue => if (queue.isEmpty) Some(((), queue)) else None)
+    mailbox.guarded_access({ case Nil => Some(((), Nil)) case _ => None })
 }
--- a/src/Pure/System/message_channel.ML	Fri Jun 27 19:38:32 2014 +0200
+++ b/src/Pure/System/message_channel.ML	Fri Jun 27 22:08:55 2014 +0200
@@ -42,17 +42,19 @@
 fun shutdown (Message_Channel {shutdown, ...}) = shutdown ();
 
 fun flush channel = ignore (try System_Channel.flush channel);
+val flush_timeout = SOME (seconds 0.02);
 
 fun message_output mbox channel =
   let
-    fun loop receive =
-      (case receive mbox of
-        SOME NONE => flush channel
-      | SOME (SOME msg) =>
-         (output_message channel msg;
-          loop (Mailbox.receive_timeout (seconds 0.02)))
-      | NONE => (flush channel; loop (SOME o Mailbox.receive)));
-  in fn () => loop (SOME o Mailbox.receive) end;
+    fun continue timeout =
+      (case Mailbox.receive timeout mbox of
+        [] => (flush channel; continue NONE)
+      | msgs => received timeout msgs)
+    and received _ (NONE :: _) = flush channel
+      | received timeout (SOME msg :: rest) =
+          (output_message channel msg; received flush_timeout rest)
+      | received timeout [] = continue timeout;
+  in fn () => continue NONE end;
 
 fun make channel =
   if Multithreading.available then