| author | wenzelm | 
| Tue, 05 Mar 2024 16:06:06 +0100 | |
| changeset 79777 | db9c6be8e236 | 
| parent 78865 | a0199212046a | 
| child 80300 | 152d6c58adb3 | 
| permissions | -rw-r--r-- | 
| 
56695
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
1  | 
/* Title: Pure/Concurrent/consumer_thread.scala  | 
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
2  | 
Author: Makarius  | 
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
3  | 
|
| 
56702
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
4  | 
Consumer thread with unbounded queueing of requests, and optional  | 
| 
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
5  | 
acknowledgment.  | 
| 
56695
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
6  | 
*/  | 
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
7  | 
|
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
8  | 
package isabelle  | 
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
9  | 
|
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
10  | 
|
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
11  | 
import scala.annotation.tailrec  | 
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
12  | 
|
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
13  | 
|
| 75393 | 14  | 
object Consumer_Thread {
 | 
| 
71143
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
15  | 
def fork_bulk[A](name: String = "", daemon: Boolean = false)(  | 
| 
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
16  | 
bulk: A => Boolean,  | 
| 
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
17  | 
consume: List[A] => (List[Exn.Result[Unit]], Boolean),  | 
| 
78534
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
18  | 
timeout: Option[Time] = None,  | 
| 78865 | 19  | 
limit: Int = 0,  | 
| 
71143
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
20  | 
finish: () => Unit = () => ()): Consumer_Thread[A] =  | 
| 78865 | 21  | 
new Consumer_Thread[A](name, daemon, bulk, consume, timeout, limit, finish)  | 
| 
71143
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
22  | 
|
| 56698 | 23  | 
def fork[A](name: String = "", daemon: Boolean = false)(  | 
24  | 
consume: A => Boolean,  | 
|
| 78865 | 25  | 
limit: Int = 0,  | 
| 75393 | 26  | 
finish: () => Unit = () => ()  | 
27  | 
    ): Consumer_Thread[A] = {
 | 
|
28  | 
    def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = {
 | 
|
| 
71143
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
29  | 
assert(args.length == 1)  | 
| 
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
30  | 
      Exn.capture { consume(args.head) } match {
 | 
| 78533 | 31  | 
case Exn.Res(cont) => (List(Exn.Res(())), cont)  | 
| 
71143
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
32  | 
case Exn.Exn(exn) => (List(Exn.Exn(exn)), true)  | 
| 
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
33  | 
}  | 
| 
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
34  | 
}  | 
| 
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
35  | 
|
| 78865 | 36  | 
fork_bulk(name = name, daemon = daemon)(  | 
37  | 
_ => false, consume_single, limit = limit, finish = finish)  | 
|
| 
71143
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
38  | 
}  | 
| 56696 | 39  | 
}  | 
| 
56695
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
40  | 
|
| 56698 | 41  | 
final class Consumer_Thread[A] private(  | 
| 
71143
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
42  | 
name: String, daemon: Boolean,  | 
| 
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
43  | 
bulk: A => Boolean,  | 
| 
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
44  | 
consume: List[A] => (List[Exn.Result[Unit]], Boolean),  | 
| 
78534
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
45  | 
timeout: Option[Time] = None,  | 
| 78865 | 46  | 
limit: Int,  | 
| 75393 | 47  | 
finish: () => Unit  | 
48  | 
) {
 | 
|
| 71142 | 49  | 
/* thread */  | 
50  | 
||
| 56698 | 51  | 
private var active = true  | 
| 78865 | 52  | 
private val mailbox = Mailbox[Option[Request]](limit = limit)  | 
| 56696 | 53  | 
|
| 71692 | 54  | 
  private val thread = Isabelle_Thread.fork(name = name, daemon = daemon) { main_loop(Nil) }
 | 
| 74253 | 55  | 
def is_active(): Boolean = active && thread.isAlive  | 
| 74254 | 56  | 
def check_thread(): Boolean = Thread.currentThread == thread  | 
| 56708 | 57  | 
|
| 56701 | 58  | 
private def failure(exn: Throwable): Unit =  | 
| 
56782
 
433cf57550fa
more systematic Isabelle output, like in classic Isabelle/ML (without markup);
 
wenzelm 
parents: 
56718 
diff
changeset
 | 
59  | 
Output.error_message(  | 
| 56708 | 60  | 
"Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn))  | 
| 56701 | 61  | 
|
62  | 
private def robust_finish(): Unit =  | 
|
63  | 
    try { finish() } catch { case exn: Throwable => failure(exn) }
 | 
|
64  | 
||
| 71142 | 65  | 
|
66  | 
/* requests */  | 
|
67  | 
||
| 75393 | 68  | 
  private class Request(val arg: A, acknowledge: Boolean = false) {
 | 
| 
71143
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
69  | 
val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] =  | 
| 71142 | 70  | 
if (acknowledge) Some(Synchronized(None)) else None  | 
71  | 
||
| 75393 | 72  | 
    def await(): Unit = {
 | 
| 71142 | 73  | 
      for (a <- ack) {
 | 
74  | 
        Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) }))
 | 
|
75  | 
}  | 
|
76  | 
}  | 
|
77  | 
}  | 
|
78  | 
||
| 75393 | 79  | 
  private def request(req: Request): Unit = {
 | 
| 71142 | 80  | 
    synchronized {
 | 
| 74253 | 81  | 
if (is_active()) mailbox.send(Some(req))  | 
| 71142 | 82  | 
      else error("Consumer thread not active: " + quote(thread.getName))
 | 
83  | 
}  | 
|
| 74252 | 84  | 
req.await()  | 
| 71142 | 85  | 
}  | 
86  | 
||
| 
78534
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
87  | 
  private def process(msgs: List[Option[Request]]): (List[Option[Request]], Boolean) = {
 | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
88  | 
val reqs =  | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
89  | 
proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg)))  | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
90  | 
.getOrElse(msgs.take(1))  | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
91  | 
.map(_.get)  | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
92  | 
val rest = msgs.drop(reqs.length)  | 
| 
71143
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
93  | 
|
| 
78534
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
94  | 
val (results, cont) = consume(reqs.map(_.arg))  | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
95  | 
    for {
 | 
| 78592 | 96  | 
case (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None)  | 
| 
78534
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
97  | 
    } {
 | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
98  | 
      (req.ack, res) match {
 | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
99  | 
case (Some(a), _) => a.change(_ => Some(res))  | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
100  | 
case (None, Exn.Res(_)) =>  | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
101  | 
case (None, Exn.Exn(exn)) => failure(exn)  | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
102  | 
}  | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
103  | 
}  | 
| 
71143
 
5ea3ed3c52b3
support for bulk operations: consume mailbox content in batches;
 
wenzelm 
parents: 
71142 
diff
changeset
 | 
104  | 
|
| 
78534
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
105  | 
(rest, cont)  | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
106  | 
}  | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
107  | 
|
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
108  | 
@tailrec private def main_loop(buffer: List[Option[Request]]): Unit =  | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
109  | 
    proper_list(buffer).getOrElse(mailbox.receive(timeout = timeout)) match {
 | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
110  | 
case None :: _ => robust_finish()  | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
111  | 
case msgs =>  | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
112  | 
val (rest, cont) = process(msgs)  | 
| 
 
879e1ba3868b
clarified main_loop: support timeout, which results in consume(Nil);
 
wenzelm 
parents: 
78533 
diff
changeset
 | 
113  | 
if (cont) main_loop(rest) else robust_finish()  | 
| 
56695
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
114  | 
}  | 
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
115  | 
|
| 56701 | 116  | 
|
117  | 
/* main methods */  | 
|
118  | 
||
| 74253 | 119  | 
assert(is_active())  | 
| 56696 | 120  | 
|
| 73340 | 121  | 
def send(arg: A): Unit = request(new Request(arg))  | 
122  | 
def send_wait(arg: A): Unit = request(new Request(arg, acknowledge = true))  | 
|
| 
56702
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
123  | 
|
| 75393 | 124  | 
  def shutdown(): Unit = {
 | 
| 74253 | 125  | 
    synchronized { if (is_active()) { active = false; mailbox.send(None) } }
 | 
| 74140 | 126  | 
thread.join()  | 
| 56696 | 127  | 
}  | 
| 
56695
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
128  | 
}  |