| author | wenzelm | 
| Mon, 01 May 2017 09:52:11 +0200 | |
| changeset 65654 | 0fbaa9286331 | 
| parent 64370 | 865b39487b5d | 
| child 66094 | 24658c9d7c78 | 
| permissions | -rw-r--r-- | 
| 
56695
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
1  | 
/* Title: Pure/Concurrent/consumer_thread.scala  | 
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
2  | 
Author: Makarius  | 
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
3  | 
|
| 
56702
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
4  | 
Consumer thread with unbounded queueing of requests, and optional  | 
| 
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
5  | 
acknowledgment.  | 
| 
56695
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
6  | 
*/  | 
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
7  | 
|
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
8  | 
package isabelle  | 
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
9  | 
|
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
10  | 
|
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
11  | 
import scala.annotation.tailrec  | 
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
12  | 
|
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
13  | 
|
| 56696 | 14  | 
object Consumer_Thread  | 
| 
56695
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
15  | 
{
 | 
| 56698 | 16  | 
def fork[A](name: String = "", daemon: Boolean = false)(  | 
17  | 
consume: A => Boolean,  | 
|
18  | 
finish: () => Unit = () => ()): Consumer_Thread[A] =  | 
|
19  | 
new Consumer_Thread[A](name, daemon, consume, finish)  | 
|
| 
56702
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
20  | 
|
| 
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
21  | 
|
| 
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
22  | 
/* internal messages */  | 
| 
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
23  | 
|
| 
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
24  | 
private type Ack = Synchronized[Option[Exn.Result[Boolean]]]  | 
| 
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
25  | 
private type Request[A] = (A, Option[Ack])  | 
| 56696 | 26  | 
}  | 
| 
56695
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
27  | 
|
| 56698 | 28  | 
final class Consumer_Thread[A] private(  | 
29  | 
name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)  | 
|
| 56696 | 30  | 
{
 | 
| 56698 | 31  | 
private var active = true  | 
| 
57417
 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 
wenzelm 
parents: 
56782 
diff
changeset
 | 
32  | 
private val mailbox = Mailbox[Option[Consumer_Thread.Request[A]]]  | 
| 56696 | 33  | 
|
| 61556 | 34  | 
  private val thread = Standard_Thread.fork(name, daemon) { main_loop(Nil) }
 | 
| 
56718
 
096139bcfadd
replaced manager Actor by Consumer_Thread, which is lazy to defer its start to actual Handler init time;
 
wenzelm 
parents: 
56708 
diff
changeset
 | 
35  | 
def is_active: Boolean = active && thread.isAlive  | 
| 56708 | 36  | 
|
| 56701 | 37  | 
private def failure(exn: Throwable): Unit =  | 
| 
56782
 
433cf57550fa
more systematic Isabelle output, like in classic Isabelle/ML (without markup);
 
wenzelm 
parents: 
56718 
diff
changeset
 | 
38  | 
Output.error_message(  | 
| 56708 | 39  | 
"Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn))  | 
| 56701 | 40  | 
|
41  | 
private def robust_finish(): Unit =  | 
|
42  | 
    try { finish() } catch { case exn: Throwable => failure(exn) }
 | 
|
43  | 
||
| 
57417
 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 
wenzelm 
parents: 
56782 
diff
changeset
 | 
44  | 
@tailrec private def main_loop(msgs: List[Option[Consumer_Thread.Request[A]]]): Unit =  | 
| 
 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 
wenzelm 
parents: 
56782 
diff
changeset
 | 
45  | 
    msgs match {
 | 
| 
 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 
wenzelm 
parents: 
56782 
diff
changeset
 | 
46  | 
case Nil => main_loop(mailbox.receive(None))  | 
| 
 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 
wenzelm 
parents: 
56782 
diff
changeset
 | 
47  | 
case Some((arg, ack)) :: rest =>  | 
| 
56702
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
48  | 
        val result = Exn.capture { consume(arg) }
 | 
| 56701 | 49  | 
val continue =  | 
| 
56702
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
50  | 
          result match {
 | 
| 
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
51  | 
case Exn.Res(cont) => cont  | 
| 
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
52  | 
case Exn.Exn(exn) =>  | 
| 
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
53  | 
if (!ack.isDefined) failure(exn)  | 
| 
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
54  | 
true  | 
| 
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
55  | 
}  | 
| 
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
56  | 
ack.foreach(a => a.change(_ => Some(result)))  | 
| 
57417
 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 
wenzelm 
parents: 
56782 
diff
changeset
 | 
57  | 
if (continue) main_loop(rest) else robust_finish()  | 
| 
 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 
wenzelm 
parents: 
56782 
diff
changeset
 | 
58  | 
case None :: _ => robust_finish()  | 
| 
56695
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
59  | 
}  | 
| 
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
60  | 
|
| 56708 | 61  | 
assert(is_active)  | 
| 56698 | 62  | 
|
| 56701 | 63  | 
|
64  | 
/* main methods */  | 
|
65  | 
||
| 56708 | 66  | 
private def request(x: A, ack: Option[Consumer_Thread.Ack])  | 
67  | 
  {
 | 
|
68  | 
    synchronized {
 | 
|
| 
57417
 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 
wenzelm 
parents: 
56782 
diff
changeset
 | 
69  | 
if (is_active) mailbox.send(Some((x, ack)))  | 
| 56708 | 70  | 
      else error("Consumer thread not active: " + quote(thread.getName))
 | 
71  | 
}  | 
|
72  | 
ack.foreach(a =>  | 
|
73  | 
      Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })))
 | 
|
| 56696 | 74  | 
}  | 
75  | 
||
| 
56702
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
76  | 
  def send(arg: A) { request(arg, None) }
 | 
| 56708 | 77  | 
  def send_wait(arg: A) { request(arg, Some(Synchronized(None))) }
 | 
| 
56702
 
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
 
wenzelm 
parents: 
56701 
diff
changeset
 | 
78  | 
|
| 56696 | 79  | 
def shutdown(): Unit =  | 
80  | 
  {
 | 
|
| 
57417
 
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
 
wenzelm 
parents: 
56782 
diff
changeset
 | 
81  | 
    synchronized { if (is_active) { active = false; mailbox.send(None) } }
 | 
| 56696 | 82  | 
thread.join  | 
83  | 
}  | 
|
| 
56695
 
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
 
wenzelm 
parents:  
diff
changeset
 | 
84  | 
}  |