author | wenzelm |
Fri, 25 Apr 2014 13:29:56 +0200 | |
changeset 56718 | 096139bcfadd |
parent 56708 | d39148de6eee |
child 56782 | 433cf57550fa |
permissions | -rw-r--r-- |
56695
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
1 |
/* Title: Pure/Concurrent/consumer_thread.scala |
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
2 |
Module: PIDE |
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
3 |
Author: Makarius |
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
4 |
|
56702
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
5 |
Consumer thread with unbounded queueing of requests, and optional |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
6 |
acknowledgment. |
56695
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
7 |
*/ |
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
8 |
|
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
9 |
package isabelle |
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
10 |
|
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
11 |
|
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
12 |
import scala.annotation.tailrec |
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
13 |
|
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
14 |
|
56696 | 15 |
object Consumer_Thread |
56695
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
16 |
{ |
56698 | 17 |
def fork[A](name: String = "", daemon: Boolean = false)( |
18 |
consume: A => Boolean, |
|
19 |
finish: () => Unit = () => ()): Consumer_Thread[A] = |
|
20 |
new Consumer_Thread[A](name, daemon, consume, finish) |
|
56702
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
21 |
|
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
22 |
|
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
23 |
/* internal messages */ |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
24 |
|
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
25 |
private type Ack = Synchronized[Option[Exn.Result[Boolean]]] |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
26 |
private type Request[A] = (A, Option[Ack]) |
56696 | 27 |
} |
56695
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
28 |
|
56698 | 29 |
final class Consumer_Thread[A] private( |
30 |
name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit) |
|
56696 | 31 |
{ |
56698 | 32 |
private var active = true |
56702
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
33 |
private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]] |
56696 | 34 |
|
56708 | 35 |
private val thread = Simple_Thread.fork(name, daemon) { main_loop() } |
56718
096139bcfadd
replaced manager Actor by Consumer_Thread, which is lazy to defer its start to actual Handler init time;
wenzelm
parents:
56708
diff
changeset
|
36 |
def is_active: Boolean = active && thread.isAlive |
56708 | 37 |
|
56701 | 38 |
private def failure(exn: Throwable): Unit = |
56708 | 39 |
System.err.println( |
40 |
"Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn)) |
|
56701 | 41 |
|
42 |
private def robust_finish(): Unit = |
|
43 |
try { finish() } catch { case exn: Throwable => failure(exn) } |
|
44 |
||
56708 | 45 |
@tailrec private def main_loop(): Unit = |
56695
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
46 |
mbox.receive match { |
56702
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
47 |
case Some((arg, ack)) => |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
48 |
val result = Exn.capture { consume(arg) } |
56701 | 49 |
val continue = |
56702
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
50 |
result match { |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
51 |
case Exn.Res(cont) => cont |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
52 |
case Exn.Exn(exn) => |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
53 |
if (!ack.isDefined) failure(exn) |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
54 |
true |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
55 |
} |
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
56 |
ack.foreach(a => a.change(_ => Some(result))) |
56708 | 57 |
if (continue) main_loop() else robust_finish() |
56701 | 58 |
case None => robust_finish() |
56695
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
59 |
} |
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
60 |
|
56708 | 61 |
assert(is_active) |
56698 | 62 |
|
56701 | 63 |
|
64 |
/* main methods */ |
|
65 |
||
56708 | 66 |
private def request(x: A, ack: Option[Consumer_Thread.Ack]) |
67 |
{ |
|
68 |
synchronized { |
|
69 |
if (is_active) mbox.send(Some((x, ack))) |
|
70 |
else error("Consumer thread not active: " + quote(thread.getName)) |
|
71 |
} |
|
72 |
ack.foreach(a => |
|
73 |
Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) }))) |
|
56696 | 74 |
} |
75 |
||
56702
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
76 |
def send(arg: A) { request(arg, None) } |
56708 | 77 |
def send_wait(arg: A) { request(arg, Some(Synchronized(None))) } |
56702
f96ad2b19c38
support for requests with explicit acknowledgment (and exception propagation);
wenzelm
parents:
56701
diff
changeset
|
78 |
|
56696 | 79 |
def shutdown(): Unit = |
80 |
{ |
|
56698 | 81 |
synchronized { if (is_active) { active = false; mbox.send(None) } } |
56696 | 82 |
thread.join |
83 |
} |
|
56695
963732291084
consumer thread with unbounded queueing of requests (similar to Message_Channel in ML);
wenzelm
parents:
diff
changeset
|
84 |
} |