src/Pure/Concurrent/consumer_thread.scala
author wenzelm
Fri, 16 Jun 2017 15:59:27 +0200
changeset 66094 24658c9d7c78
parent 64370 865b39487b5d
child 71142 d6688677a784
permissions -rw-r--r--
more general dispatcher operations;
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     1
/*  Title:      Pure/Concurrent/consumer_thread.scala
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     2
    Author:     Makarius
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     3
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
     4
Consumer thread with unbounded queueing of requests, and optional
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
     5
acknowledgment.
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     6
*/
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     7
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     8
package isabelle
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     9
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    10
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    11
import scala.annotation.tailrec
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    12
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    13
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    14
object Consumer_Thread
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    15
{
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    16
  def fork[A](name: String = "", daemon: Boolean = false)(
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    17
      consume: A => Boolean,
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    18
      finish: () => Unit = () => ()): Consumer_Thread[A] =
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    19
    new Consumer_Thread[A](name, daemon, consume, finish)
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    20
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    21
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    22
  /* internal messages */
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    23
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    24
  private type Ack = Synchronized[Option[Exn.Result[Boolean]]]
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    25
  private type Request[A] = (A, Option[Ack])
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    26
}
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    27
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    28
final class Consumer_Thread[A] private(
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    29
  name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    30
{
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    31
  private var active = true
57417
29fe9bac501b more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents: 56782
diff changeset
    32
  private val mailbox = Mailbox[Option[Consumer_Thread.Request[A]]]
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    33
61556
0d4ee4168e41 clarified modules;
wenzelm
parents: 57417
diff changeset
    34
  private val thread = Standard_Thread.fork(name, daemon) { main_loop(Nil) }
56718
096139bcfadd replaced manager Actor by Consumer_Thread, which is lazy to defer its start to actual Handler init time;
wenzelm
parents: 56708
diff changeset
    35
  def is_active: Boolean = active && thread.isAlive
66094
24658c9d7c78 more general dispatcher operations;
wenzelm
parents: 64370
diff changeset
    36
  def check_thread: Boolean = Thread.currentThread == thread
56708
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    37
56701
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    38
  private def failure(exn: Throwable): Unit =
56782
433cf57550fa more systematic Isabelle output, like in classic Isabelle/ML (without markup);
wenzelm
parents: 56718
diff changeset
    39
    Output.error_message(
56708
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    40
      "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn))
56701
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    41
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    42
  private def robust_finish(): Unit =
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    43
    try { finish() } catch { case exn: Throwable => failure(exn) }
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    44
57417
29fe9bac501b more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents: 56782
diff changeset
    45
  @tailrec private def main_loop(msgs: List[Option[Consumer_Thread.Request[A]]]): Unit =
29fe9bac501b more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents: 56782
diff changeset
    46
    msgs match {
29fe9bac501b more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents: 56782
diff changeset
    47
      case Nil => main_loop(mailbox.receive(None))
29fe9bac501b more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents: 56782
diff changeset
    48
      case Some((arg, ack)) :: rest =>
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    49
        val result = Exn.capture { consume(arg) }
56701
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    50
        val continue =
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    51
          result match {
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    52
            case Exn.Res(cont) => cont
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    53
            case Exn.Exn(exn) =>
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    54
              if (!ack.isDefined) failure(exn)
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    55
              true
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    56
          }
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    57
        ack.foreach(a => a.change(_ => Some(result)))
57417
29fe9bac501b more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents: 56782
diff changeset
    58
        if (continue) main_loop(rest) else robust_finish()
29fe9bac501b more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents: 56782
diff changeset
    59
      case None :: _ => robust_finish()
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    60
    }
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    61
56708
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    62
  assert(is_active)
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    63
56701
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    64
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    65
  /* main methods */
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    66
56708
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    67
  private def request(x: A, ack: Option[Consumer_Thread.Ack])
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    68
  {
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    69
    synchronized {
57417
29fe9bac501b more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents: 56782
diff changeset
    70
      if (is_active) mailbox.send(Some((x, ack)))
56708
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    71
      else error("Consumer thread not active: " + quote(thread.getName))
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    72
    }
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    73
    ack.foreach(a =>
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    74
      Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })))
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    75
  }
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    76
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    77
  def send(arg: A) { request(arg, None) }
56708
d39148de6eee misc tuning;
wenzelm
parents: 56702
diff changeset
    78
  def send_wait(arg: A) { request(arg, Some(Synchronized(None))) }
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    79
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    80
  def shutdown(): Unit =
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    81
  {
57417
29fe9bac501b more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents: 56782
diff changeset
    82
    synchronized { if (is_active) { active = false; mailbox.send(None) } }
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    83
    thread.join
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    84
  }
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    85
}