src/Pure/PIDE/session.scala
changeset 80302 d1c7da4ff0f5
parent 80301 f5055150b70b
child 80462 7a1f9e571046
equal deleted inserted replaced
80301:f5055150b70b 80302:d1c7da4ff0f5
    19   object Consumer {
    19   object Consumer {
    20     def apply[A](name: String)(consume: A => Unit): Consumer[A] =
    20     def apply[A](name: String)(consume: A => Unit): Consumer[A] =
    21       new Consumer[A](name, consume)
    21       new Consumer[A](name, consume)
    22   }
    22   }
    23   final class Consumer[-A] private(val name: String, val consume: A => Unit) {
    23   final class Consumer[-A] private(val name: String, val consume: A => Unit) {
    24     def failure(exn: Throwable): Unit =
    24     private def failure(exn: Throwable): Unit =
    25       Output.error_message(
    25       Output.error_message(
    26         "Session consumer failure: " + quote(name) + "\n" + Exn.print(exn))
    26         "Session consumer failure: " + quote(name) + "\n" + Exn.print(exn))
       
    27 
       
    28     def consume_robust(a: A): Unit =
       
    29       try { consume(a) }
       
    30       catch { case exn: Throwable => failure(exn) }
    27   }
    31   }
    28 
    32 
    29   class Outlet[A](dispatcher: Consumer_Thread[() => Unit]) {
    33   class Outlet[A](dispatcher: Consumer_Thread[() => Unit]) {
    30     private val consumers = Synchronized[List[Consumer[A]]](Nil)
    34     private val consumers = Synchronized[List[Consumer[A]]](Nil)
    31 
    35 
    32     def += (c: Consumer[A]): Unit = consumers.change(Library.update(c))
    36     def += (c: Consumer[A]): Unit = consumers.change(Library.update(c))
    33     def -= (c: Consumer[A]): Unit = consumers.change(Library.remove(c))
    37     def -= (c: Consumer[A]): Unit = consumers.change(Library.remove(c))
    34 
    38 
    35     def post(a: A): Unit = {
    39     def post(a: A): Unit = {
    36       for (c <- consumers.value.iterator) {
    40       for (c <- consumers.value.iterator) {
    37         dispatcher.send(() =>
    41         dispatcher.send(() => c.consume_robust(a))
    38           try { c.consume(a) }
       
    39           catch { case exn: Throwable => c.failure(exn) })
       
    40       }
    42       }
    41     }
    43     }
    42   }
    44   }
    43 
    45 
    44 
    46