| author | wenzelm | 
| Fri, 25 Apr 2014 13:55:50 +0200 | |
| changeset 56719 | 80eb2192516a | 
| parent 56718 | 096139bcfadd | 
| child 56782 | 433cf57550fa | 
| permissions | -rw-r--r-- | 
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 1 | /* Title: Pure/Concurrent/consumer_thread.scala | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 2 | Module: PIDE | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 3 | Author: Makarius | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 4 | |
| 56702 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 5 | Consumer thread with unbounded queueing of requests, and optional | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 6 | acknowledgment. | 
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 7 | */ | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 8 | |
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 9 | package isabelle | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 10 | |
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 11 | |
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 12 | import scala.annotation.tailrec | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 13 | |
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 14 | |
| 56696 | 15 | object Consumer_Thread | 
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 16 | {
 | 
| 56698 | 17 | def fork[A](name: String = "", daemon: Boolean = false)( | 
| 18 | consume: A => Boolean, | |
| 19 | finish: () => Unit = () => ()): Consumer_Thread[A] = | |
| 20 | new Consumer_Thread[A](name, daemon, consume, finish) | |
| 56702 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 21 | |
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 22 | |
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 23 | /* internal messages */ | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 24 | |
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 25 | private type Ack = Synchronized[Option[Exn.Result[Boolean]]] | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 26 | private type Request[A] = (A, Option[Ack]) | 
| 56696 | 27 | } | 
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 28 | |
| 56698 | 29 | final class Consumer_Thread[A] private( | 
| 30 | name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit) | |
| 56696 | 31 | {
 | 
| 56698 | 32 | private var active = true | 
| 56702 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 33 | private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]] | 
| 56696 | 34 | |
| 56708 | 35 |   private val thread = Simple_Thread.fork(name, daemon) { main_loop() }
 | 
| 56718 
096139bcfadd
replaced manager Actor by Consumer_Thread, which is lazy to defer its start to actual Handler init time;
 wenzelm parents: 
56708diff
changeset | 36 | def is_active: Boolean = active && thread.isAlive | 
| 56708 | 37 | |
| 56701 | 38 | private def failure(exn: Throwable): Unit = | 
| 56708 | 39 | System.err.println( | 
| 40 | "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn)) | |
| 56701 | 41 | |
| 42 | private def robust_finish(): Unit = | |
| 43 |     try { finish() } catch { case exn: Throwable => failure(exn) }
 | |
| 44 | ||
| 56708 | 45 | @tailrec private def main_loop(): Unit = | 
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 46 |     mbox.receive match {
 | 
| 56702 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 47 | case Some((arg, ack)) => | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 48 |         val result = Exn.capture { consume(arg) }
 | 
| 56701 | 49 | val continue = | 
| 56702 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 50 |           result match {
 | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 51 | case Exn.Res(cont) => cont | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 52 | case Exn.Exn(exn) => | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 53 | if (!ack.isDefined) failure(exn) | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 54 | true | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 55 | } | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 56 | ack.foreach(a => a.change(_ => Some(result))) | 
| 56708 | 57 | if (continue) main_loop() else robust_finish() | 
| 56701 | 58 | case None => robust_finish() | 
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 59 | } | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 60 | |
| 56708 | 61 | assert(is_active) | 
| 56698 | 62 | |
| 56701 | 63 | |
| 64 | /* main methods */ | |
| 65 | ||
| 56708 | 66 | private def request(x: A, ack: Option[Consumer_Thread.Ack]) | 
| 67 |   {
 | |
| 68 |     synchronized {
 | |
| 69 | if (is_active) mbox.send(Some((x, ack))) | |
| 70 |       else error("Consumer thread not active: " + quote(thread.getName))
 | |
| 71 | } | |
| 72 | ack.foreach(a => | |
| 73 |       Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })))
 | |
| 56696 | 74 | } | 
| 75 | ||
| 56702 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 76 |   def send(arg: A) { request(arg, None) }
 | 
| 56708 | 77 |   def send_wait(arg: A) { request(arg, Some(Synchronized(None))) }
 | 
| 56702 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 78 | |
| 56696 | 79 | def shutdown(): Unit = | 
| 80 |   {
 | |
| 56698 | 81 |     synchronized { if (is_active) { active = false; mbox.send(None) } }
 | 
| 56696 | 82 | thread.join | 
| 83 | } | |
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 84 | } |