| author | wenzelm | 
| Mon, 24 Oct 2016 12:16:12 +0200 | |
| changeset 64370 | 865b39487b5d | 
| parent 61556 | 0d4ee4168e41 | 
| child 66094 | 24658c9d7c78 | 
| permissions | -rw-r--r-- | 
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 1 | /* Title: Pure/Concurrent/consumer_thread.scala | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 2 | Author: Makarius | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 3 | |
| 56702 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 4 | Consumer thread with unbounded queueing of requests, and optional | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 5 | acknowledgment. | 
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 6 | */ | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 7 | |
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 8 | package isabelle | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 9 | |
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 10 | |
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 11 | import scala.annotation.tailrec | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 12 | |
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 13 | |
| 56696 | 14 | object Consumer_Thread | 
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 15 | {
 | 
| 56698 | 16 | def fork[A](name: String = "", daemon: Boolean = false)( | 
| 17 | consume: A => Boolean, | |
| 18 | finish: () => Unit = () => ()): Consumer_Thread[A] = | |
| 19 | new Consumer_Thread[A](name, daemon, consume, finish) | |
| 56702 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 20 | |
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 21 | |
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 22 | /* internal messages */ | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 23 | |
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 24 | private type Ack = Synchronized[Option[Exn.Result[Boolean]]] | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 25 | private type Request[A] = (A, Option[Ack]) | 
| 56696 | 26 | } | 
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 27 | |
| 56698 | 28 | final class Consumer_Thread[A] private( | 
| 29 | name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit) | |
| 56696 | 30 | {
 | 
| 56698 | 31 | private var active = true | 
| 57417 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 wenzelm parents: 
56782diff
changeset | 32 | private val mailbox = Mailbox[Option[Consumer_Thread.Request[A]]] | 
| 56696 | 33 | |
| 61556 | 34 |   private val thread = Standard_Thread.fork(name, daemon) { main_loop(Nil) }
 | 
| 56718 
096139bcfadd
replaced manager Actor by Consumer_Thread, which is lazy to defer its start to actual Handler init time;
 wenzelm parents: 
56708diff
changeset | 35 | def is_active: Boolean = active && thread.isAlive | 
| 56708 | 36 | |
| 56701 | 37 | private def failure(exn: Throwable): Unit = | 
| 56782 
433cf57550fa
more systematic Isabelle output, like in classic Isabelle/ML (without markup);
 wenzelm parents: 
56718diff
changeset | 38 | Output.error_message( | 
| 56708 | 39 | "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn)) | 
| 56701 | 40 | |
| 41 | private def robust_finish(): Unit = | |
| 42 |     try { finish() } catch { case exn: Throwable => failure(exn) }
 | |
| 43 | ||
| 57417 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 wenzelm parents: 
56782diff
changeset | 44 | @tailrec private def main_loop(msgs: List[Option[Consumer_Thread.Request[A]]]): Unit = | 
| 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 wenzelm parents: 
56782diff
changeset | 45 |     msgs match {
 | 
| 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 wenzelm parents: 
56782diff
changeset | 46 | case Nil => main_loop(mailbox.receive(None)) | 
| 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 wenzelm parents: 
56782diff
changeset | 47 | case Some((arg, ack)) :: rest => | 
| 56702 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 48 |         val result = Exn.capture { consume(arg) }
 | 
| 56701 | 49 | val continue = | 
| 56702 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 50 |           result match {
 | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 51 | case Exn.Res(cont) => cont | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 52 | case Exn.Exn(exn) => | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 53 | if (!ack.isDefined) failure(exn) | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 54 | true | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 55 | } | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 56 | ack.foreach(a => a.change(_ => Some(result))) | 
| 57417 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 wenzelm parents: 
56782diff
changeset | 57 | if (continue) main_loop(rest) else robust_finish() | 
| 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 wenzelm parents: 
56782diff
changeset | 58 | case None :: _ => robust_finish() | 
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 59 | } | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 60 | |
| 56708 | 61 | assert(is_active) | 
| 56698 | 62 | |
| 56701 | 63 | |
| 64 | /* main methods */ | |
| 65 | ||
| 56708 | 66 | private def request(x: A, ack: Option[Consumer_Thread.Ack]) | 
| 67 |   {
 | |
| 68 |     synchronized {
 | |
| 57417 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 wenzelm parents: 
56782diff
changeset | 69 | if (is_active) mailbox.send(Some((x, ack))) | 
| 56708 | 70 |       else error("Consumer thread not active: " + quote(thread.getName))
 | 
| 71 | } | |
| 72 | ack.foreach(a => | |
| 73 |       Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })))
 | |
| 56696 | 74 | } | 
| 75 | ||
| 56702 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 76 |   def send(arg: A) { request(arg, None) }
 | 
| 56708 | 77 |   def send_wait(arg: A) { request(arg, Some(Synchronized(None))) }
 | 
| 56702 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 78 | |
| 56696 | 79 | def shutdown(): Unit = | 
| 80 |   {
 | |
| 57417 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 wenzelm parents: 
56782diff
changeset | 81 |     synchronized { if (is_active) { active = false; mailbox.send(None) } }
 | 
| 56696 | 82 | thread.join | 
| 83 | } | |
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 84 | } |