src/Pure/Concurrent/consumer_thread.scala
author wenzelm
Fri, 25 Apr 2014 13:29:56 +0200
changeset 56718 096139bcfadd
parent 56708 d39148de6eee
child 56782 433cf57550fa
permissions -rw-r--r--
replaced manager Actor by Consumer_Thread, which is lazy to defer its start to actual Handler init time;
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     1
/*  Title:      Pure/Concurrent/consumer_thread.scala
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     2
    Module:     PIDE
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     3
    Author:     Makarius
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     4
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
     5
Consumer thread with unbounded queueing of requests, and optional
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
     6
acknowledgment.
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     7
*/
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     8
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     9
package isabelle
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    10
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    11
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    12
import scala.annotation.tailrec
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    13
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    14
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    15
object Consumer_Thread
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    16
{
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    17
  def fork[A](name: String = "", daemon: Boolean = false)(
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    18
      consume: A => Boolean,
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    19
      finish: () => Unit = () => ()): Consumer_Thread[A] =
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    20
    new Consumer_Thread[A](name, daemon, consume, finish)
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    21
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    22
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    23
  /* internal messages */
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    24
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    25
  private type Ack = Synchronized[Option[Exn.Result[Boolean]]]
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    26
  private type Request[A] = (A, Option[Ack])
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    27
}
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    28
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    29
final class Consumer_Thread[A] private(
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    30
  name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    31
{
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    32
  private var active = true
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    33
  private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    34
56708
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    35
  private val thread = Simple_Thread.fork(name, daemon) { main_loop() }
56718
096139bcfadd replaced manager Actor by Consumer_Thread, which is lazy to defer its start to actual Handler init time;
wenzelm
parents: 56708
diff changeset
    36
  def is_active: Boolean = active && thread.isAlive
56708
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    37
56701
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    38
  private def failure(exn: Throwable): Unit =
56708
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    39
    System.err.println(
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    40
      "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn))
56701
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    41
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    42
  private def robust_finish(): Unit =
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    43
    try { finish() } catch { case exn: Throwable => failure(exn) }
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    44
56708
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    45
  @tailrec private def main_loop(): Unit =
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    46
    mbox.receive match {
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    47
      case Some((arg, ack)) =>
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    48
        val result = Exn.capture { consume(arg) }
56701
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    49
        val continue =
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    50
          result match {
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    51
            case Exn.Res(cont) => cont
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    52
            case Exn.Exn(exn) =>
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    53
              if (!ack.isDefined) failure(exn)
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    54
              true
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    55
          }
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    56
        ack.foreach(a => a.change(_ => Some(result)))
56708
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    57
        if (continue) main_loop() else robust_finish()
56701
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    58
      case None => robust_finish()
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    59
    }
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    60
56708
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    61
  assert(is_active)
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    62
56701
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    63
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    64
  /* main methods */
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    65
56708
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    66
  private def request(x: A, ack: Option[Consumer_Thread.Ack])
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    67
  {
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    68
    synchronized {
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    69
      if (is_active) mbox.send(Some((x, ack)))
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    70
      else error("Consumer thread not active: " + quote(thread.getName))
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    71
    }
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    72
    ack.foreach(a =>
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    73
      Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })))
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    74
  }
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    75
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    76
  def send(arg: A) { request(arg, None) }
56708
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    77
  def send_wait(arg: A) { request(arg, Some(Synchronized(None))) }
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    78
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    79
  def shutdown(): Unit =
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    80
  {
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    81
    synchronized { if (is_active) { active = false; mbox.send(None) } }
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    82
    thread.join
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    83
  }
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    84
}