src/Pure/Concurrent/consumer_thread.scala
author wenzelm
Tue, 31 Oct 2023 16:11:26 +0100
changeset 78865 a0199212046a
parent 78864 2024a2298d7a
child 80300 152d6c58adb3
permissions -rw-r--r--
support for mailbox limit;
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     1
/*  Title:      Pure/Concurrent/consumer_thread.scala
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     2
    Author:     Makarius
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     3
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
     4
Consumer thread with unbounded queueing of requests, and optional
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
     5
acknowledgment.
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     6
*/
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     7
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     8
package isabelle
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     9
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    10
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    11
import scala.annotation.tailrec
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    12
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    13
75393
87ebf5a50283 clarified formatting, for the sake of scala3;
wenzelm
parents: 74254
diff changeset
    14
object Consumer_Thread {
71143
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    15
  def fork_bulk[A](name: String = "", daemon: Boolean = false)(
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    16
      bulk: A => Boolean,
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    17
      consume: List[A] => (List[Exn.Result[Unit]], Boolean),
78534
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
    18
      timeout: Option[Time] = None,
78865
a0199212046a support for mailbox limit;
wenzelm
parents: 78864
diff changeset
    19
      limit: Int = 0,
71143
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    20
      finish: () => Unit = () => ()): Consumer_Thread[A] =
78865
a0199212046a support for mailbox limit;
wenzelm
parents: 78864
diff changeset
    21
    new Consumer_Thread[A](name, daemon, bulk, consume, timeout, limit, finish)
71143
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    22
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    23
  def fork[A](name: String = "", daemon: Boolean = false)(
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    24
      consume: A => Boolean,
78865
a0199212046a support for mailbox limit;
wenzelm
parents: 78864
diff changeset
    25
      limit: Int = 0,
75393
87ebf5a50283 clarified formatting, for the sake of scala3;
wenzelm
parents: 74254
diff changeset
    26
      finish: () => Unit = () => ()
87ebf5a50283 clarified formatting, for the sake of scala3;
wenzelm
parents: 74254
diff changeset
    27
    ): Consumer_Thread[A] = {
87ebf5a50283 clarified formatting, for the sake of scala3;
wenzelm
parents: 74254
diff changeset
    28
    def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = {
71143
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    29
      assert(args.length == 1)
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    30
      Exn.capture { consume(args.head) } match {
78533
wenzelm
parents: 75393
diff changeset
    31
        case Exn.Res(cont) => (List(Exn.Res(())), cont)
71143
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    32
        case Exn.Exn(exn) => (List(Exn.Exn(exn)), true)
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    33
      }
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    34
    }
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    35
78865
a0199212046a support for mailbox limit;
wenzelm
parents: 78864
diff changeset
    36
    fork_bulk(name = name, daemon = daemon)(
a0199212046a support for mailbox limit;
wenzelm
parents: 78864
diff changeset
    37
      _ => false, consume_single, limit = limit, finish = finish)
71143
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    38
  }
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    39
}
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    40
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    41
final class Consumer_Thread[A] private(
71143
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    42
  name: String, daemon: Boolean,
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    43
  bulk: A => Boolean,
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    44
  consume: List[A] => (List[Exn.Result[Unit]], Boolean),
78534
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
    45
  timeout: Option[Time] = None,
78865
a0199212046a support for mailbox limit;
wenzelm
parents: 78864
diff changeset
    46
  limit: Int,
75393
87ebf5a50283 clarified formatting, for the sake of scala3;
wenzelm
parents: 74254
diff changeset
    47
  finish: () => Unit
87ebf5a50283 clarified formatting, for the sake of scala3;
wenzelm
parents: 74254
diff changeset
    48
) {
71142
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    49
  /* thread */
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    50
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    51
  private var active = true
78865
a0199212046a support for mailbox limit;
wenzelm
parents: 78864
diff changeset
    52
  private val mailbox = Mailbox[Option[Request]](limit = limit)
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    53
71692
f8e52c0152fe clarified names;
wenzelm
parents: 71685
diff changeset
    54
  private val thread = Isabelle_Thread.fork(name = name, daemon = daemon) { main_loop(Nil) }
74253
45dc9de1bd33 tuned signature;
wenzelm
parents: 74252
diff changeset
    55
  def is_active(): Boolean = active && thread.isAlive
74254
53e1a14e2ef1 tuned signature;
wenzelm
parents: 74253
diff changeset
    56
  def check_thread(): Boolean = Thread.currentThread == thread
56708
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    57
56701
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    58
  private def failure(exn: Throwable): Unit =
56782
433cf57550fa more systematic Isabelle output, like in classic Isabelle/ML (without markup);
wenzelm
parents: 56718
diff changeset
    59
    Output.error_message(
56708
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    60
      "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn))
56701
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    61
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    62
  private def robust_finish(): Unit =
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    63
    try { finish() } catch { case exn: Throwable => failure(exn) }
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    64
71142
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    65
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    66
  /* requests */
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    67
75393
87ebf5a50283 clarified formatting, for the sake of scala3;
wenzelm
parents: 74254
diff changeset
    68
  private class Request(val arg: A, acknowledge: Boolean = false) {
71143
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    69
    val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] =
71142
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    70
      if (acknowledge) Some(Synchronized(None)) else None
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    71
75393
87ebf5a50283 clarified formatting, for the sake of scala3;
wenzelm
parents: 74254
diff changeset
    72
    def await(): Unit = {
71142
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    73
      for (a <- ack) {
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    74
        Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) }))
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    75
      }
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    76
    }
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    77
  }
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    78
75393
87ebf5a50283 clarified formatting, for the sake of scala3;
wenzelm
parents: 74254
diff changeset
    79
  private def request(req: Request): Unit = {
71142
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    80
    synchronized {
74253
45dc9de1bd33 tuned signature;
wenzelm
parents: 74252
diff changeset
    81
      if (is_active()) mailbox.send(Some(req))
71142
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    82
      else error("Consumer thread not active: " + quote(thread.getName))
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    83
    }
74252
3300847d75ae tuned signature;
wenzelm
parents: 74251
diff changeset
    84
    req.await()
71142
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    85
  }
d6688677a784 clarified signature -- more explicit types;
wenzelm
parents: 66094
diff changeset
    86
78534
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
    87
  private def process(msgs: List[Option[Request]]): (List[Option[Request]], Boolean) = {
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
    88
    val reqs =
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
    89
      proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg)))
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
    90
        .getOrElse(msgs.take(1))
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
    91
        .map(_.get)
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
    92
    val rest = msgs.drop(reqs.length)
71143
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
    93
78534
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
    94
    val (results, cont) = consume(reqs.map(_.arg))
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
    95
    for {
78592
fdfe9b91d96e misc tuning: support "scalac -source 3.3";
wenzelm
parents: 78534
diff changeset
    96
      case (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None)
78534
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
    97
    } {
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
    98
      (req.ack, res) match {
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
    99
        case (Some(a), _) => a.change(_ => Some(res))
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
   100
        case (None, Exn.Res(_)) =>
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
   101
        case (None, Exn.Exn(exn)) => failure(exn)
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
   102
      }
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
   103
    }
71143
5ea3ed3c52b3 support for bulk operations: consume mailbox content in batches;
wenzelm
parents: 71142
diff changeset
   104
78534
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
   105
    (rest, cont)
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
   106
  }
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
   107
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
   108
  @tailrec private def main_loop(buffer: List[Option[Request]]): Unit =
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
   109
    proper_list(buffer).getOrElse(mailbox.receive(timeout = timeout)) match {
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
   110
      case None :: _ => robust_finish()
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
   111
      case msgs =>
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
   112
        val (rest, cont) = process(msgs)
879e1ba3868b clarified main_loop: support timeout, which results in consume(Nil);
wenzelm
parents: 78533
diff changeset
   113
        if (cont) main_loop(rest) else robust_finish()
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
   114
    }
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
   115
56701
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
   116
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
   117
  /* main methods */
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
   118
74253
45dc9de1bd33 tuned signature;
wenzelm
parents: 74252
diff changeset
   119
  assert(is_active())
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
   120
73340
0ffcad1f6130 tuned --- fewer warnings;
wenzelm
parents: 71692
diff changeset
   121
  def send(arg: A): Unit = request(new Request(arg))
0ffcad1f6130 tuned --- fewer warnings;
wenzelm
parents: 71692
diff changeset
   122
  def send_wait(arg: A): Unit = request(new Request(arg, acknowledge = true))
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
   123
75393
87ebf5a50283 clarified formatting, for the sake of scala3;
wenzelm
parents: 74254
diff changeset
   124
  def shutdown(): Unit = {
74253
45dc9de1bd33 tuned signature;
wenzelm
parents: 74252
diff changeset
   125
    synchronized { if (is_active()) { active = false; mailbox.send(None) } }
74140
8a5e02ef975c clarified signature;
wenzelm
parents: 73340
diff changeset
   126
    thread.join()
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
   127
  }
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
   128
}