misc tuning;
authorwenzelm
Thu, 24 Apr 2014 22:41:03 +0200
changeset 56708 d39148de6eee
parent 56707 aa4631879df8
child 56709 e83356e94380
misc tuning;
src/Pure/Concurrent/consumer_thread.scala
--- a/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 22:20:36 2014 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 22:41:03 2014 +0200
@@ -32,13 +32,17 @@
   private var active = true
   private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
 
+  private val thread = Simple_Thread.fork(name, daemon) { main_loop() }
+  private def is_active: Boolean = active && thread.isAlive
+
   private def failure(exn: Throwable): Unit =
-    System.err.println("Consumer thread failure:\n" + Exn.message(exn))
+    System.err.println(
+      "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn))
 
   private def robust_finish(): Unit =
     try { finish() } catch { case exn: Throwable => failure(exn) }
 
-  @tailrec private def loop(): Unit =
+  @tailrec private def main_loop(): Unit =
     mbox.receive match {
       case Some((arg, ack)) =>
         val result = Exn.capture { consume(arg) }
@@ -50,29 +54,27 @@
               true
           }
         ack.foreach(a => a.change(_ => Some(result)))
-        if (continue) loop() else robust_finish()
+        if (continue) main_loop() else robust_finish()
       case None => robust_finish()
     }
-  private val thread = Simple_Thread.fork(name, daemon) { loop() }
 
-  private def is_active: Boolean = active && thread.isAlive
+  assert(is_active)
 
 
   /* main methods */
 
-  private def request(x: A, ack: Option[Consumer_Thread.Ack]): Unit = synchronized {
-    if (is_active) mbox.send(Some((x, ack)))
-    else error("Consumer thread not active")
+  private def request(x: A, ack: Option[Consumer_Thread.Ack])
+  {
+    synchronized {
+      if (is_active) mbox.send(Some((x, ack)))
+      else error("Consumer thread not active: " + quote(thread.getName))
+    }
+    ack.foreach(a =>
+      Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })))
   }
 
   def send(arg: A) { request(arg, None) }
-
-  def send_wait(arg: A) {
-    val ack: Consumer_Thread.Ack = Synchronized(None)
-    request(arg, Some(ack))
-    val result = ack.guarded_access({ case None => None case res => Some((res.get, res)) })
-    Exn.release(result)
-  }
+  def send_wait(arg: A) { request(arg, Some(Synchronized(None))) }
 
   def shutdown(): Unit =
   {