src/Pure/System/event_bus.scala
author wenzelm
Sun Aug 29 19:48:35 2010 +0200 (2010-08-29)
changeset 38849 2f198d107aef
parent 38428 c13c95c97e89
child 43406 40c67d894be4
permissions -rw-r--r--
session_actor: await state assigment of previous change before signalling current change, and avoid crash in overrun situations;
     1 /*  Title:      Pure/System/event_bus.scala
     2     Author:     Makarius
     3 
     4 Generic event bus with multiple receiving actors.
     5 */
     6 
     7 package isabelle
     8 
     9 import scala.actors.Actor, Actor._
    10 import scala.collection.mutable.ListBuffer
    11 
    12 
    13 class Event_Bus[Event]
    14 {
    15   /* receivers */
    16 
    17   private val receivers = new ListBuffer[Actor]
    18 
    19   def += (r: Actor) { synchronized { receivers += r } }
    20   def + (r: Actor): Event_Bus[Event] = { this += r; this }
    21 
    22   def += (f: Event => Unit) {
    23     this += actor { loop { react { case x: Event => f(x) } } }
    24   }
    25 
    26   def + (f: Event => Unit): Event_Bus[Event] = { this += f; this }
    27 
    28   def -= (r: Actor) { synchronized { receivers -= r } }
    29   def - (r: Actor) = { this -= r; this }
    30 
    31 
    32   /* event invocation */
    33 
    34   def event(x: Event) { synchronized { receivers.foreach(_ ! x) } }
    35 
    36 
    37   /* await global condition -- triggered via bus events */
    38 
    39   def await(cond: => Boolean)
    40   {
    41     case object Wait
    42     val a = new Actor {
    43       def act {
    44         if (cond) react { case Wait => reply(()); exit(Wait) }
    45         else {
    46           loop {
    47             react {
    48               case trigger if trigger != Wait =>
    49                 if (cond) { react { case Wait => reply(()); exit(Wait) } }
    50             }
    51           }
    52         }
    53       }
    54     }
    55     this += a
    56     a.start
    57     a !? Wait
    58     this -= a
    59   }
    60 }