src/Pure/Concurrent/consumer_thread.scala
author wenzelm
Thu, 24 Apr 2014 16:19:11 +0200
changeset 56702 f96ad2b19c38
parent 56701 ac5b66fa2a56
child 56708 d39148de6eee
permissions -rw-r--r--
support for requests with explicit acknowledgment (and exception propagation);
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     1
/*  Title:      Pure/Concurrent/consumer_thread.scala
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     2
    Module:     PIDE
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     3
    Author:     Makarius
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     4
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
     5
Consumer thread with unbounded queueing of requests, and optional
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
     6
acknowledgment.
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     7
*/
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     8
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
     9
package isabelle
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    10
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    11
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    12
import scala.annotation.tailrec
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    13
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    14
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    15
object Consumer_Thread
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    16
{
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    17
  def fork[A](name: String = "", daemon: Boolean = false)(
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    18
      consume: A => Boolean,
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    19
      finish: () => Unit = () => ()): Consumer_Thread[A] =
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    20
    new Consumer_Thread[A](name, daemon, consume, finish)
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    21
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    22
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    23
  /* internal messages */
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    24
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    25
  private type Ack = Synchronized[Option[Exn.Result[Boolean]]]
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    26
  private type Request[A] = (A, Option[Ack])
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    27
}
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    28
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    29
final class Consumer_Thread[A] private(
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    30
  name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    31
{
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    32
  private var active = true
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    33
  private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    34
56701
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    35
  private def failure(exn: Throwable): Unit =
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    36
    System.err.println("Consumer thread failure:\n" + Exn.message(exn))
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    37
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    38
  private def robust_finish(): Unit =
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    39
    try { finish() } catch { case exn: Throwable => failure(exn) }
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    40
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    41
  @tailrec private def loop(): Unit =
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    42
    mbox.receive match {
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    43
      case Some((arg, ack)) =>
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    44
        val result = Exn.capture { consume(arg) }
56701
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    45
        val continue =
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    46
          result match {
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    47
            case Exn.Res(cont) => cont
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    48
            case Exn.Exn(exn) =>
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    49
              if (!ack.isDefined) failure(exn)
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    50
              true
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    51
          }
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    52
        ack.foreach(a => a.change(_ => Some(result)))
56701
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    53
        if (continue) loop() else robust_finish()
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    54
      case None => robust_finish()
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    55
    }
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    56
  private val thread = Simple_Thread.fork(name, daemon) { loop() }
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    57
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    58
  private def is_active: Boolean = active && thread.isAlive
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    59
56701
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    60
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    61
  /* main methods */
ac5b66fa2a56 more robust thread: continue after failure;
wenzelm
parents: 56698
diff changeset
    62
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    63
  private def request(x: A, ack: Option[Consumer_Thread.Ack]): Unit = synchronized {
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    64
    if (is_active) mbox.send(Some((x, ack)))
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    65
    else error("Consumer thread not active")
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    66
  }
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    67
56702
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    68
  def send(arg: A) { request(arg, None) }
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    69
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    70
  def send_wait(arg: A) {
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    71
    val ack: Consumer_Thread.Ack = Synchronized(None)
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    72
    request(arg, Some(ack))
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    73
    val result = ack.guarded_access({ case None => None case res => Some((res.get, res)) })
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    74
    Exn.release(result)
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    75
  }
f96ad2b19c38 support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents: 56701
diff changeset
    76
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    77
  def shutdown(): Unit =
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    78
  {
56698
e0655270d3f3 allow more control of main loop;
wenzelm
parents: 56696
diff changeset
    79
    synchronized { if (is_active) { active = false; mbox.send(None) } }
56696
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    80
    thread.join
ff782c5450bf more robust shutdown;
wenzelm
parents: 56695
diff changeset
    81
  }
56695
963732291084 consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff changeset
    82
}