misc tuning;
authorwenzelm
Thu Apr 24 22:41:03 2014 +0200 (2014-04-24)
changeset 56708d39148de6eee
parent 56707 aa4631879df8
child 56709 e83356e94380
misc tuning;
src/Pure/Concurrent/consumer_thread.scala
     1.1 --- a/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 22:20:36 2014 +0200
     1.2 +++ b/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 22:41:03 2014 +0200
     1.3 @@ -32,13 +32,17 @@
     1.4    private var active = true
     1.5    private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
     1.6  
     1.7 +  private val thread = Simple_Thread.fork(name, daemon) { main_loop() }
     1.8 +  private def is_active: Boolean = active && thread.isAlive
     1.9 +
    1.10    private def failure(exn: Throwable): Unit =
    1.11 -    System.err.println("Consumer thread failure:\n" + Exn.message(exn))
    1.12 +    System.err.println(
    1.13 +      "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn))
    1.14  
    1.15    private def robust_finish(): Unit =
    1.16      try { finish() } catch { case exn: Throwable => failure(exn) }
    1.17  
    1.18 -  @tailrec private def loop(): Unit =
    1.19 +  @tailrec private def main_loop(): Unit =
    1.20      mbox.receive match {
    1.21        case Some((arg, ack)) =>
    1.22          val result = Exn.capture { consume(arg) }
    1.23 @@ -50,29 +54,27 @@
    1.24                true
    1.25            }
    1.26          ack.foreach(a => a.change(_ => Some(result)))
    1.27 -        if (continue) loop() else robust_finish()
    1.28 +        if (continue) main_loop() else robust_finish()
    1.29        case None => robust_finish()
    1.30      }
    1.31 -  private val thread = Simple_Thread.fork(name, daemon) { loop() }
    1.32  
    1.33 -  private def is_active: Boolean = active && thread.isAlive
    1.34 +  assert(is_active)
    1.35  
    1.36  
    1.37    /* main methods */
    1.38  
    1.39 -  private def request(x: A, ack: Option[Consumer_Thread.Ack]): Unit = synchronized {
    1.40 -    if (is_active) mbox.send(Some((x, ack)))
    1.41 -    else error("Consumer thread not active")
    1.42 +  private def request(x: A, ack: Option[Consumer_Thread.Ack])
    1.43 +  {
    1.44 +    synchronized {
    1.45 +      if (is_active) mbox.send(Some((x, ack)))
    1.46 +      else error("Consumer thread not active: " + quote(thread.getName))
    1.47 +    }
    1.48 +    ack.foreach(a =>
    1.49 +      Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })))
    1.50    }
    1.51  
    1.52    def send(arg: A) { request(arg, None) }
    1.53 -
    1.54 -  def send_wait(arg: A) {
    1.55 -    val ack: Consumer_Thread.Ack = Synchronized(None)
    1.56 -    request(arg, Some(ack))
    1.57 -    val result = ack.guarded_access({ case None => None case res => Some((res.get, res)) })
    1.58 -    Exn.release(result)
    1.59 -  }
    1.60 +  def send_wait(arg: A) { request(arg, Some(Synchronized(None))) }
    1.61  
    1.62    def shutdown(): Unit =
    1.63    {