equal
deleted
inserted
replaced
18 |
18 |
19 object Consumer { |
19 object Consumer { |
20 def apply[A](name: String)(consume: A => Unit): Consumer[A] = |
20 def apply[A](name: String)(consume: A => Unit): Consumer[A] = |
21 new Consumer[A](name, consume) |
21 new Consumer[A](name, consume) |
22 } |
22 } |
23 final class Consumer[-A] private(val name: String, val consume: A => Unit) |
23 final class Consumer[-A] private(val name: String, val consume: A => Unit) { |
|
24 def failure(exn: Throwable): Unit = |
|
25 Output.error_message( |
|
26 "Session consumer failure: " + quote(name) + "\n" + Exn.print(exn)) |
|
27 } |
24 |
28 |
25 class Outlet[A](dispatcher: Consumer_Thread[() => Unit]) { |
29 class Outlet[A](dispatcher: Consumer_Thread[() => Unit]) { |
26 private val consumers = Synchronized[List[Consumer[A]]](Nil) |
30 private val consumers = Synchronized[List[Consumer[A]]](Nil) |
27 |
31 |
28 def += (c: Consumer[A]): Unit = consumers.change(Library.update(c)) |
32 def += (c: Consumer[A]): Unit = consumers.change(Library.update(c)) |
30 |
34 |
31 def post(a: A): Unit = { |
35 def post(a: A): Unit = { |
32 for (c <- consumers.value.iterator) { |
36 for (c <- consumers.value.iterator) { |
33 dispatcher.send(() => |
37 dispatcher.send(() => |
34 try { c.consume(a) } |
38 try { c.consume(a) } |
35 catch { |
39 catch { case exn: Throwable => c.failure(exn) }) |
36 case exn: Throwable => |
|
37 Output.error_message("Consumer failed: " + quote(c.name) + "\n" + Exn.print(exn)) |
|
38 }) |
|
39 } |
40 } |
40 } |
41 } |
41 } |
42 } |
42 |
43 |
43 |
44 |