19 object Consumer { |
19 object Consumer { |
20 def apply[A](name: String)(consume: A => Unit): Consumer[A] = |
20 def apply[A](name: String)(consume: A => Unit): Consumer[A] = |
21 new Consumer[A](name, consume) |
21 new Consumer[A](name, consume) |
22 } |
22 } |
23 final class Consumer[-A] private(val name: String, val consume: A => Unit) { |
23 final class Consumer[-A] private(val name: String, val consume: A => Unit) { |
24 def failure(exn: Throwable): Unit = |
24 private def failure(exn: Throwable): Unit = |
25 Output.error_message( |
25 Output.error_message( |
26 "Session consumer failure: " + quote(name) + "\n" + Exn.print(exn)) |
26 "Session consumer failure: " + quote(name) + "\n" + Exn.print(exn)) |
|
27 |
|
28 def consume_robust(a: A): Unit = |
|
29 try { consume(a) } |
|
30 catch { case exn: Throwable => failure(exn) } |
27 } |
31 } |
28 |
32 |
29 class Outlet[A](dispatcher: Consumer_Thread[() => Unit]) { |
33 class Outlet[A](dispatcher: Consumer_Thread[() => Unit]) { |
30 private val consumers = Synchronized[List[Consumer[A]]](Nil) |
34 private val consumers = Synchronized[List[Consumer[A]]](Nil) |
31 |
35 |
32 def += (c: Consumer[A]): Unit = consumers.change(Library.update(c)) |
36 def += (c: Consumer[A]): Unit = consumers.change(Library.update(c)) |
33 def -= (c: Consumer[A]): Unit = consumers.change(Library.remove(c)) |
37 def -= (c: Consumer[A]): Unit = consumers.change(Library.remove(c)) |
34 |
38 |
35 def post(a: A): Unit = { |
39 def post(a: A): Unit = { |
36 for (c <- consumers.value.iterator) { |
40 for (c <- consumers.value.iterator) { |
37 dispatcher.send(() => |
41 dispatcher.send(() => c.consume_robust(a)) |
38 try { c.consume(a) } |
|
39 catch { case exn: Throwable => c.failure(exn) }) |
|
40 } |
42 } |
41 } |
43 } |
42 } |
44 } |
43 |
45 |
44 |
46 |