| author | Fabian Huch <huch@in.tum.de> | 
| Tue, 09 Jul 2024 11:11:15 +0200 | |
| changeset 80531 | c54a4c2db5b7 | 
| parent 80300 | 152d6c58adb3 | 
| permissions | -rw-r--r-- | 
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 1 | /* Title: Pure/Concurrent/consumer_thread.scala | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 2 | Author: Makarius | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 3 | |
| 56702 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 4 | Consumer thread with unbounded queueing of requests, and optional | 
| 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 5 | acknowledgment. | 
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 6 | */ | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 7 | |
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 8 | package isabelle | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 9 | |
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 10 | |
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 11 | import scala.annotation.tailrec | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 12 | |
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 13 | |
| 75393 | 14 | object Consumer_Thread {
 | 
| 71143 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 15 | def fork_bulk[A](name: String = "", daemon: Boolean = false)( | 
| 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 16 | bulk: A => Boolean, | 
| 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 17 | consume: List[A] => (List[Exn.Result[Unit]], Boolean), | 
| 78534 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 18 | timeout: Option[Time] = None, | 
| 78865 | 19 | limit: Int = 0, | 
| 71143 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 20 | finish: () => Unit = () => ()): Consumer_Thread[A] = | 
| 78865 | 21 | new Consumer_Thread[A](name, daemon, bulk, consume, timeout, limit, finish) | 
| 71143 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 22 | |
| 56698 | 23 | def fork[A](name: String = "", daemon: Boolean = false)( | 
| 24 | consume: A => Boolean, | |
| 78865 | 25 | limit: Int = 0, | 
| 75393 | 26 | finish: () => Unit = () => () | 
| 27 |     ): Consumer_Thread[A] = {
 | |
| 28 |     def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = {
 | |
| 71143 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 29 | assert(args.length == 1) | 
| 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 30 |       Exn.capture { consume(args.head) } match {
 | 
| 78533 | 31 | case Exn.Res(cont) => (List(Exn.Res(())), cont) | 
| 71143 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 32 | case Exn.Exn(exn) => (List(Exn.Exn(exn)), true) | 
| 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 33 | } | 
| 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 34 | } | 
| 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 35 | |
| 78865 | 36 | fork_bulk(name = name, daemon = daemon)( | 
| 37 | _ => false, consume_single, limit = limit, finish = finish) | |
| 71143 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 38 | } | 
| 56696 | 39 | } | 
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 40 | |
| 56698 | 41 | final class Consumer_Thread[A] private( | 
| 71143 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 42 | name: String, daemon: Boolean, | 
| 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 43 | bulk: A => Boolean, | 
| 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 44 | consume: List[A] => (List[Exn.Result[Unit]], Boolean), | 
| 78534 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 45 | timeout: Option[Time] = None, | 
| 78865 | 46 | limit: Int, | 
| 75393 | 47 | finish: () => Unit | 
| 48 | ) {
 | |
| 71142 | 49 | /* thread */ | 
| 50 | ||
| 56698 | 51 | private var active = true | 
| 78865 | 52 | private val mailbox = Mailbox[Option[Request]](limit = limit) | 
| 56696 | 53 | |
| 71692 | 54 |   private val thread = Isabelle_Thread.fork(name = name, daemon = daemon) { main_loop(Nil) }
 | 
| 74253 | 55 | def is_active(): Boolean = active && thread.isAlive | 
| 74254 | 56 | def check_thread(): Boolean = Thread.currentThread == thread | 
| 56708 | 57 | |
| 56701 | 58 | private def failure(exn: Throwable): Unit = | 
| 56782 
433cf57550fa
more systematic Isabelle output, like in classic Isabelle/ML (without markup);
 wenzelm parents: 
56718diff
changeset | 59 | Output.error_message( | 
| 80300 
152d6c58adb3
more informative exception output, with optional trace;
 wenzelm parents: 
78865diff
changeset | 60 | "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.print(exn)) | 
| 56701 | 61 | |
| 62 | private def robust_finish(): Unit = | |
| 63 |     try { finish() } catch { case exn: Throwable => failure(exn) }
 | |
| 64 | ||
| 71142 | 65 | |
| 66 | /* requests */ | |
| 67 | ||
| 75393 | 68 |   private class Request(val arg: A, acknowledge: Boolean = false) {
 | 
| 71143 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 69 | val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] = | 
| 71142 | 70 | if (acknowledge) Some(Synchronized(None)) else None | 
| 71 | ||
| 75393 | 72 |     def await(): Unit = {
 | 
| 71142 | 73 |       for (a <- ack) {
 | 
| 74 |         Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) }))
 | |
| 75 | } | |
| 76 | } | |
| 77 | } | |
| 78 | ||
| 75393 | 79 |   private def request(req: Request): Unit = {
 | 
| 71142 | 80 |     synchronized {
 | 
| 74253 | 81 | if (is_active()) mailbox.send(Some(req)) | 
| 71142 | 82 |       else error("Consumer thread not active: " + quote(thread.getName))
 | 
| 83 | } | |
| 74252 | 84 | req.await() | 
| 71142 | 85 | } | 
| 86 | ||
| 78534 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 87 |   private def process(msgs: List[Option[Request]]): (List[Option[Request]], Boolean) = {
 | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 88 | val reqs = | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 89 | proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg))) | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 90 | .getOrElse(msgs.take(1)) | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 91 | .map(_.get) | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 92 | val rest = msgs.drop(reqs.length) | 
| 71143 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 93 | |
| 78534 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 94 | val (results, cont) = consume(reqs.map(_.arg)) | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 95 |     for {
 | 
| 78592 | 96 | case (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None) | 
| 78534 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 97 |     } {
 | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 98 |       (req.ack, res) match {
 | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 99 | case (Some(a), _) => a.change(_ => Some(res)) | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 100 | case (None, Exn.Res(_)) => | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 101 | case (None, Exn.Exn(exn)) => failure(exn) | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 102 | } | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 103 | } | 
| 71143 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 wenzelm parents: 
71142diff
changeset | 104 | |
| 78534 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 105 | (rest, cont) | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 106 | } | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 107 | |
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 108 | @tailrec private def main_loop(buffer: List[Option[Request]]): Unit = | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 109 |     proper_list(buffer).getOrElse(mailbox.receive(timeout = timeout)) match {
 | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 110 | case None :: _ => robust_finish() | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 111 | case msgs => | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 112 | val (rest, cont) = process(msgs) | 
| 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 wenzelm parents: 
78533diff
changeset | 113 | if (cont) main_loop(rest) else robust_finish() | 
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 114 | } | 
| 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 115 | |
| 56701 | 116 | |
| 117 | /* main methods */ | |
| 118 | ||
| 74253 | 119 | assert(is_active()) | 
| 56696 | 120 | |
| 73340 | 121 | def send(arg: A): Unit = request(new Request(arg)) | 
| 122 | def send_wait(arg: A): Unit = request(new Request(arg, acknowledge = true)) | |
| 56702 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 wenzelm parents: 
56701diff
changeset | 123 | |
| 75393 | 124 |   def shutdown(): Unit = {
 | 
| 74253 | 125 |     synchronized { if (is_active()) { active = false; mailbox.send(None) } }
 | 
| 74140 | 126 | thread.join() | 
| 56696 | 127 | } | 
| 56695 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 wenzelm parents: diff
changeset | 128 | } |