src/Pure/Concurrent/consumer_thread.scala
author wenzelm
Fri Apr 25 13:29:56 2014 +0200 (2014-04-25)
changeset 56718 096139bcfadd
parent 56708 d39148de6eee
child 56782 433cf57550fa
permissions -rw-r--r--
replaced manager Actor by Consumer_Thread, which is lazy to defer its start to actual Handler init time;
wenzelm@56695
     1
/*  Title:      Pure/Concurrent/consumer_thread.scala
wenzelm@56695
     2
    Module:     PIDE
wenzelm@56695
     3
    Author:     Makarius
wenzelm@56695
     4
wenzelm@56702
     5
Consumer thread with unbounded queueing of requests, and optional
wenzelm@56702
     6
acknowledgment.
wenzelm@56695
     7
*/
wenzelm@56695
     8
wenzelm@56695
     9
package isabelle
wenzelm@56695
    10
wenzelm@56695
    11
wenzelm@56695
    12
import scala.annotation.tailrec
wenzelm@56695
    13
wenzelm@56695
    14
wenzelm@56696
    15
object Consumer_Thread
wenzelm@56695
    16
{
wenzelm@56698
    17
  def fork[A](name: String = "", daemon: Boolean = false)(
wenzelm@56698
    18
      consume: A => Boolean,
wenzelm@56698
    19
      finish: () => Unit = () => ()): Consumer_Thread[A] =
wenzelm@56698
    20
    new Consumer_Thread[A](name, daemon, consume, finish)
wenzelm@56702
    21
wenzelm@56702
    22
wenzelm@56702
    23
  /* internal messages */
wenzelm@56702
    24
wenzelm@56702
    25
  private type Ack = Synchronized[Option[Exn.Result[Boolean]]]
wenzelm@56702
    26
  private type Request[A] = (A, Option[Ack])
wenzelm@56696
    27
}
wenzelm@56695
    28
wenzelm@56698
    29
final class Consumer_Thread[A] private(
wenzelm@56698
    30
  name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
wenzelm@56696
    31
{
wenzelm@56698
    32
  private var active = true
wenzelm@56702
    33
  private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
wenzelm@56696
    34
wenzelm@56708
    35
  private val thread = Simple_Thread.fork(name, daemon) { main_loop() }
wenzelm@56718
    36
  def is_active: Boolean = active && thread.isAlive
wenzelm@56708
    37
wenzelm@56701
    38
  private def failure(exn: Throwable): Unit =
wenzelm@56708
    39
    System.err.println(
wenzelm@56708
    40
      "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn))
wenzelm@56701
    41
wenzelm@56701
    42
  private def robust_finish(): Unit =
wenzelm@56701
    43
    try { finish() } catch { case exn: Throwable => failure(exn) }
wenzelm@56701
    44
wenzelm@56708
    45
  @tailrec private def main_loop(): Unit =
wenzelm@56695
    46
    mbox.receive match {
wenzelm@56702
    47
      case Some((arg, ack)) =>
wenzelm@56702
    48
        val result = Exn.capture { consume(arg) }
wenzelm@56701
    49
        val continue =
wenzelm@56702
    50
          result match {
wenzelm@56702
    51
            case Exn.Res(cont) => cont
wenzelm@56702
    52
            case Exn.Exn(exn) =>
wenzelm@56702
    53
              if (!ack.isDefined) failure(exn)
wenzelm@56702
    54
              true
wenzelm@56702
    55
          }
wenzelm@56702
    56
        ack.foreach(a => a.change(_ => Some(result)))
wenzelm@56708
    57
        if (continue) main_loop() else robust_finish()
wenzelm@56701
    58
      case None => robust_finish()
wenzelm@56695
    59
    }
wenzelm@56695
    60
wenzelm@56708
    61
  assert(is_active)
wenzelm@56698
    62
wenzelm@56701
    63
wenzelm@56701
    64
  /* main methods */
wenzelm@56701
    65
wenzelm@56708
    66
  private def request(x: A, ack: Option[Consumer_Thread.Ack])
wenzelm@56708
    67
  {
wenzelm@56708
    68
    synchronized {
wenzelm@56708
    69
      if (is_active) mbox.send(Some((x, ack)))
wenzelm@56708
    70
      else error("Consumer thread not active: " + quote(thread.getName))
wenzelm@56708
    71
    }
wenzelm@56708
    72
    ack.foreach(a =>
wenzelm@56708
    73
      Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })))
wenzelm@56696
    74
  }
wenzelm@56696
    75
wenzelm@56702
    76
  def send(arg: A) { request(arg, None) }
wenzelm@56708
    77
  def send_wait(arg: A) { request(arg, Some(Synchronized(None))) }
wenzelm@56702
    78
wenzelm@56696
    79
  def shutdown(): Unit =
wenzelm@56696
    80
  {
wenzelm@56698
    81
    synchronized { if (is_active) { active = false; mbox.send(None) } }
wenzelm@56696
    82
    thread.join
wenzelm@56696
    83
  }
wenzelm@56695
    84
}