author | traytel |
Tue, 03 Mar 2015 19:08:04 +0100 | |
changeset 59580 | cbc38731d42f |
parent 57417 | 29fe9bac501b |
child 61556 | 0d4ee4168e41 |
permissions | -rw-r--r-- |
56695
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
1 |
/* Title: Pure/Concurrent/consumer_thread.scala |
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
2 |
Module: PIDE |
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
3 |
Author: Makarius |
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
4 |
|
56702
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
5 |
Consumer thread with unbounded queueing of requests, and optional |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
6 |
acknowledgment. |
56695
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
7 |
*/ |
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
8 |
|
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
9 |
package isabelle |
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
10 |
|
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
11 |
|
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
12 |
import scala.annotation.tailrec |
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
13 |
|
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
14 |
|
56696 | 15 |
object Consumer_Thread |
56695
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
16 |
{ |
56698 | 17 |
def fork[A](name: String = "", daemon: Boolean = false)( |
18 |
consume: A => Boolean, |
|
19 |
finish: () => Unit = () => ()): Consumer_Thread[A] = |
|
20 |
new Consumer_Thread[A](name, daemon, consume, finish) |
|
56702
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
21 |
|
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
22 |
|
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
23 |
/* internal messages */ |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
24 |
|
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
25 |
private type Ack = Synchronized[Option[Exn.Result[Boolean]]] |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
26 |
private type Request[A] = (A, Option[Ack]) |
56696 | 27 |
} |
56695
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
28 |
|
56698 | 29 |
final class Consumer_Thread[A] private( |
30 |
name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit) |
|
56696 | 31 |
{ |
56698 | 32 |
private var active = true |
57417
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents:
56782
diff
changeset
|
33 |
private val mailbox = Mailbox[Option[Consumer_Thread.Request[A]]] |
56696 | 34 |
|
57417
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents:
56782
diff
changeset
|
35 |
private val thread = Simple_Thread.fork(name, daemon) { main_loop(Nil) } |
56718
096139bcfadd
replaced manager Actor by Consumer_Thread, which is lazy to defer its start to actual Handler init time;
wenzelm
parents:
56708
diff
changeset
|
36 |
def is_active: Boolean = active && thread.isAlive |
56708 | 37 |
|
56701 | 38 |
private def failure(exn: Throwable): Unit = |
56782
433cf57550fa
more systematic Isabelle output, like in classic Isabelle/ML (without markup);
wenzelm
parents:
56718
diff
changeset
|
39 |
Output.error_message( |
56708 | 40 |
"Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn)) |
56701 | 41 |
|
42 |
private def robust_finish(): Unit = |
|
43 |
try { finish() } catch { case exn: Throwable => failure(exn) } |
|
44 |
||
57417
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents:
56782
diff
changeset
|
45 |
@tailrec private def main_loop(msgs: List[Option[Consumer_Thread.Request[A]]]): Unit = |
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents:
56782
diff
changeset
|
46 |
msgs match { |
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents:
56782
diff
changeset
|
47 |
case Nil => main_loop(mailbox.receive(None)) |
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents:
56782
diff
changeset
|
48 |
case Some((arg, ack)) :: rest => |
56702
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
49 |
val result = Exn.capture { consume(arg) } |
56701 | 50 |
val continue = |
56702
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
51 |
result match { |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
52 |
case Exn.Res(cont) => cont |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
53 |
case Exn.Exn(exn) => |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
54 |
if (!ack.isDefined) failure(exn) |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
55 |
true |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
56 |
} |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
57 |
ack.foreach(a => a.change(_ => Some(result))) |
57417
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents:
56782
diff
changeset
|
58 |
if (continue) main_loop(rest) else robust_finish() |
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents:
56782
diff
changeset
|
59 |
case None :: _ => robust_finish() |
56695
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
60 |
} |
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
61 |
|
56708 | 62 |
assert(is_active) |
56698 | 63 |
|
56701 | 64 |
|
65 |
/* main methods */ |
|
66 |
||
56708 | 67 |
private def request(x: A, ack: Option[Consumer_Thread.Ack]) |
68 |
{ |
|
69 |
synchronized { |
|
57417
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents:
56782
diff
changeset
|
70 |
if (is_active) mailbox.send(Some((x, ack))) |
56708 | 71 |
else error("Consumer thread not active: " + quote(thread.getName)) |
72 |
} |
|
73 |
ack.foreach(a => |
|
74 |
Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) }))) |
|
56696 | 75 |
} |
76 |
||
56702
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
77 |
def send(arg: A) { request(arg, None) } |
56708 | 78 |
def send_wait(arg: A) { request(arg, Some(Synchronized(None))) } |
56702
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
79 |
|
56696 | 80 |
def shutdown(): Unit = |
81 |
{ |
|
57417
29fe9bac501b
more tight Mailbox: single list is sufficient for single receiver, reverse outside critical section;
wenzelm
parents:
56782
diff
changeset
|
82 |
synchronized { if (is_active) { active = false; mailbox.send(None) } } |
56696 | 83 |
thread.join |
84 |
} |
|
56695
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
85 |
} |