src/Pure/Concurrent/consumer_thread.scala
changeset 56695 963732291084
child 56696 ff782c5450bf
equal deleted inserted replaced
56694:c4e77643aad6 56695:963732291084
       
     1 /*  Title:      Pure/Concurrent/consumer_thread.scala
       
     2     Module:     PIDE
       
     3     Author:     Makarius
       
     4 
       
     5 Consumer thread with unbounded queueing of requests.
       
     6 */
       
     7 
       
     8 package isabelle
       
     9 
       
    10 
       
    11 import scala.annotation.tailrec
       
    12 
       
    13 
       
    14 class Consumer_Thread[A](name: String = "", daemon: Boolean = false)
       
    15 {
       
    16   def consume(x: A) { }
       
    17   def finish() { }
       
    18 
       
    19   private val mbox = Mailbox[Option[A]]
       
    20   @tailrec private def loop(): Unit =
       
    21     mbox.receive match {
       
    22       case Some(x) => consume(x); loop()
       
    23       case None => finish()
       
    24     }
       
    25   private val thread = Simple_Thread.fork(name, daemon) { loop() }
       
    26 
       
    27   final def send(x: A) { mbox.send(Some(x)) }
       
    28   final def shutdown() { mbox.send(None); mbox.await_empty; thread.join }
       
    29 }