src/Pure/Concurrent/consumer_thread.scala
author wenzelm
Thu Apr 24 22:41:03 2014 +0200 (2014-04-24)
changeset 56708 d39148de6eee
parent 56702 f96ad2b19c38
child 56718 096139bcfadd
permissions -rw-r--r--
misc tuning;
     1 /*  Title:      Pure/Concurrent/consumer_thread.scala
     2     Module:     PIDE
     3     Author:     Makarius
     4 
     5 Consumer thread with unbounded queueing of requests, and optional
     6 acknowledgment.
     7 */
     8 
     9 package isabelle
    10 
    11 
    12 import scala.annotation.tailrec
    13 
    14 
    15 object Consumer_Thread
    16 {
    17   def fork[A](name: String = "", daemon: Boolean = false)(
    18       consume: A => Boolean,
    19       finish: () => Unit = () => ()): Consumer_Thread[A] =
    20     new Consumer_Thread[A](name, daemon, consume, finish)
    21 
    22 
    23   /* internal messages */
    24 
    25   private type Ack = Synchronized[Option[Exn.Result[Boolean]]]
    26   private type Request[A] = (A, Option[Ack])
    27 }
    28 
    29 final class Consumer_Thread[A] private(
    30   name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
    31 {
    32   private var active = true
    33   private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
    34 
    35   private val thread = Simple_Thread.fork(name, daemon) { main_loop() }
    36   private def is_active: Boolean = active && thread.isAlive
    37 
    38   private def failure(exn: Throwable): Unit =
    39     System.err.println(
    40       "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn))
    41 
    42   private def robust_finish(): Unit =
    43     try { finish() } catch { case exn: Throwable => failure(exn) }
    44 
    45   @tailrec private def main_loop(): Unit =
    46     mbox.receive match {
    47       case Some((arg, ack)) =>
    48         val result = Exn.capture { consume(arg) }
    49         val continue =
    50           result match {
    51             case Exn.Res(cont) => cont
    52             case Exn.Exn(exn) =>
    53               if (!ack.isDefined) failure(exn)
    54               true
    55           }
    56         ack.foreach(a => a.change(_ => Some(result)))
    57         if (continue) main_loop() else robust_finish()
    58       case None => robust_finish()
    59     }
    60 
    61   assert(is_active)
    62 
    63 
    64   /* main methods */
    65 
    66   private def request(x: A, ack: Option[Consumer_Thread.Ack])
    67   {
    68     synchronized {
    69       if (is_active) mbox.send(Some((x, ack)))
    70       else error("Consumer thread not active: " + quote(thread.getName))
    71     }
    72     ack.foreach(a =>
    73       Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })))
    74   }
    75 
    76   def send(arg: A) { request(arg, None) }
    77   def send_wait(arg: A) { request(arg, Some(Synchronized(None))) }
    78 
    79   def shutdown(): Unit =
    80   {
    81     synchronized { if (is_active) { active = false; mbox.send(None) } }
    82     thread.join
    83   }
    84 }