replaced manager Actor by Consumer_Thread, which is lazy to defer its start to actual Handler init time;
authorwenzelm
Fri Apr 25 13:29:56 2014 +0200 (2014-04-25)
changeset 56718096139bcfadd
parent 56717 d96b10ec397c
child 56719 80eb2192516a
replaced manager Actor by Consumer_Thread, which is lazy to defer its start to actual Handler init time;
src/Pure/Concurrent/consumer_thread.scala
src/Pure/Tools/simplifier_trace.scala
     1.1 --- a/src/Pure/Concurrent/consumer_thread.scala	Fri Apr 25 12:59:33 2014 +0200
     1.2 +++ b/src/Pure/Concurrent/consumer_thread.scala	Fri Apr 25 13:29:56 2014 +0200
     1.3 @@ -33,7 +33,7 @@
     1.4    private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
     1.5  
     1.6    private val thread = Simple_Thread.fork(name, daemon) { main_loop() }
     1.7 -  private def is_active: Boolean = active && thread.isAlive
     1.8 +  def is_active: Boolean = active && thread.isAlive
     1.9  
    1.10    private def failure(exn: Throwable): Unit =
    1.11      System.err.println(
     2.1 --- a/src/Pure/Tools/simplifier_trace.scala	Fri Apr 25 12:59:33 2014 +0200
     2.2 +++ b/src/Pure/Tools/simplifier_trace.scala	Fri Apr 25 13:29:56 2014 +0200
     2.3 @@ -7,7 +7,6 @@
     2.4  package isabelle
     2.5  
     2.6  
     2.7 -import scala.actors.Actor._
     2.8  import scala.annotation.tailrec
     2.9  import scala.collection.immutable.SortedMap
    2.10  
    2.11 @@ -102,14 +101,13 @@
    2.12    case object Event
    2.13  
    2.14  
    2.15 -  /* manager actor */
    2.16 +  /* manager thread */
    2.17  
    2.18    private case class Handle_Results(
    2.19 -    session: Session, id: Document_ID.Command, results: Command.Results)
    2.20 -  private case class Generate_Trace(results: Command.Results)
    2.21 +    session: Session, id: Document_ID.Command, results: Command.Results, slot: Promise[Context])
    2.22 +  private case class Generate_Trace(results: Command.Results, slot: Promise[Trace])
    2.23    private case class Cancel(serial: Long)
    2.24    private object Clear_Memory
    2.25 -  private object Stop
    2.26    case class Reply(session: Session, serial: Long, answer: Answer)
    2.27  
    2.28    case class Question(data: Item.Data, answers: List[Answer], default_answer: Answer)
    2.29 @@ -139,18 +137,27 @@
    2.30    }
    2.31  
    2.32    def handle_results(session: Session, id: Document_ID.Command, results: Command.Results): Context =
    2.33 -    (manager !? Handle_Results(session, id, results)).asInstanceOf[Context]
    2.34 +  {
    2.35 +    val slot = Future.promise[Context]
    2.36 +    manager.send(Handle_Results(session, id, results, slot))
    2.37 +    slot.join
    2.38 +  }
    2.39  
    2.40    def generate_trace(results: Command.Results): Trace =
    2.41 -    (manager !? Generate_Trace(results)).asInstanceOf[Trace]
    2.42 +  {
    2.43 +    val slot = Future.promise[Trace]
    2.44 +    manager.send(Generate_Trace(results, slot))
    2.45 +    slot.join
    2.46 +  }
    2.47  
    2.48    def clear_memory() =
    2.49 -    manager ! Clear_Memory
    2.50 +    manager.send(Clear_Memory)
    2.51  
    2.52    def send_reply(session: Session, serial: Long, answer: Answer) =
    2.53 -    manager ! Reply(session, serial, answer)
    2.54 +    manager.send(Reply(session, serial, answer))
    2.55  
    2.56 -  private val manager = actor {
    2.57 +  private lazy val manager: Consumer_Thread[Any] =
    2.58 +  {
    2.59      var contexts = Map.empty[Document_ID.Command, Context]
    2.60  
    2.61      var memory_children = Map.empty[Long, Set[Long]]
    2.62 @@ -177,123 +184,121 @@
    2.63          "Simplifier_Trace.reply", Properties.Value.Long(serial), answer.name)
    2.64      }
    2.65  
    2.66 -    loop {
    2.67 -      react {
    2.68 -        case Handle_Results(session, id, results) =>
    2.69 -          var new_context = contexts.getOrElse(id, Context())
    2.70 -          var new_serial = new_context.last_serial
    2.71 +    Consumer_Thread.fork[Any]("Simplifier_Trace.manager", daemon = true)(
    2.72 +      consume = (arg: Any) =>
    2.73 +      {
    2.74 +        arg match {
    2.75 +          case Handle_Results(session, id, results, slot) =>
    2.76 +            var new_context = contexts.getOrElse(id, Context())
    2.77 +            var new_serial = new_context.last_serial
    2.78  
    2.79 -          for ((serial, result) <- results.iterator if serial > new_context.last_serial)
    2.80 -          {
    2.81 -            result match {
    2.82 -              case Item(markup, data) =>
    2.83 -                memory_children +=
    2.84 -                  (data.parent -> (memory_children.getOrElse(data.parent, Set.empty) + serial))
    2.85 -
    2.86 -                markup match {
    2.87 +            for ((serial, result) <- results.iterator if serial > new_context.last_serial)
    2.88 +            {
    2.89 +              result match {
    2.90 +                case Item(markup, data) =>
    2.91 +                  memory_children +=
    2.92 +                    (data.parent -> (memory_children.getOrElse(data.parent, Set.empty) + serial))
    2.93  
    2.94 -                  case Markup.SIMP_TRACE_STEP =>
    2.95 -                    val index = Index.of_data(data)
    2.96 -                    memory.get(index) match {
    2.97 -                      case Some(answer) if data.memory =>
    2.98 -                        do_reply(session, serial, answer)
    2.99 -                      case _ =>
   2.100 -                        new_context += Question(data, Answer.step.all, Answer.step.default)
   2.101 -                    }
   2.102 +                  markup match {
   2.103  
   2.104 -                  case Markup.SIMP_TRACE_HINT =>
   2.105 -                    data.props match {
   2.106 -                      case Success(false) =>
   2.107 -                        results.get(data.parent) match {
   2.108 -                          case Some(Item(Markup.SIMP_TRACE_STEP, _)) =>
   2.109 -                            new_context +=
   2.110 -                              Question(data, Answer.hint_fail.all, Answer.hint_fail.default)
   2.111 -                          case _ =>
   2.112 -                            // unknown, better send a default reply
   2.113 -                            do_reply(session, data.serial, Answer.hint_fail.default)
   2.114 -                        }
   2.115 -                      case _ =>
   2.116 -                    }
   2.117 +                    case Markup.SIMP_TRACE_STEP =>
   2.118 +                      val index = Index.of_data(data)
   2.119 +                      memory.get(index) match {
   2.120 +                        case Some(answer) if data.memory =>
   2.121 +                          do_reply(session, serial, answer)
   2.122 +                        case _ =>
   2.123 +                          new_context += Question(data, Answer.step.all, Answer.step.default)
   2.124 +                      }
   2.125  
   2.126 -                  case Markup.SIMP_TRACE_IGNORE =>
   2.127 -                    // At this point, we know that the parent of this 'IGNORE' entry is a 'STEP'
   2.128 -                    // entry, and that that 'STEP' entry is about to be replayed. Hence, we need
   2.129 -                    // to selectively purge the replies which have been memorized, going down from
   2.130 -                    // the parent to all leaves.
   2.131 -
   2.132 -                    @tailrec
   2.133 -                    def purge(queue: Vector[Long]): Unit =
   2.134 -                      queue match {
   2.135 -                        case s +: rest =>
   2.136 -                          for (Item(Markup.SIMP_TRACE_STEP, data) <- results.get(s))
   2.137 -                            memory -= Index.of_data(data)
   2.138 -                          val children = memory_children.getOrElse(s, Set.empty)
   2.139 -                          memory_children -= s
   2.140 -                          purge(rest ++ children.toVector)
   2.141 +                    case Markup.SIMP_TRACE_HINT =>
   2.142 +                      data.props match {
   2.143 +                        case Success(false) =>
   2.144 +                          results.get(data.parent) match {
   2.145 +                            case Some(Item(Markup.SIMP_TRACE_STEP, _)) =>
   2.146 +                              new_context +=
   2.147 +                                Question(data, Answer.hint_fail.all, Answer.hint_fail.default)
   2.148 +                            case _ =>
   2.149 +                              // unknown, better send a default reply
   2.150 +                              do_reply(session, data.serial, Answer.hint_fail.default)
   2.151 +                          }
   2.152                          case _ =>
   2.153                        }
   2.154  
   2.155 -                    purge(Vector(data.parent))
   2.156 +                    case Markup.SIMP_TRACE_IGNORE =>
   2.157 +                      // At this point, we know that the parent of this 'IGNORE' entry is a 'STEP'
   2.158 +                      // entry, and that that 'STEP' entry is about to be replayed. Hence, we need
   2.159 +                      // to selectively purge the replies which have been memorized, going down from
   2.160 +                      // the parent to all leaves.
   2.161  
   2.162 -                  case _ =>
   2.163 -                }
   2.164 +                      @tailrec
   2.165 +                      def purge(queue: Vector[Long]): Unit =
   2.166 +                        queue match {
   2.167 +                          case s +: rest =>
   2.168 +                            for (Item(Markup.SIMP_TRACE_STEP, data) <- results.get(s))
   2.169 +                              memory -= Index.of_data(data)
   2.170 +                            val children = memory_children.getOrElse(s, Set.empty)
   2.171 +                            memory_children -= s
   2.172 +                            purge(rest ++ children.toVector)
   2.173 +                          case _ =>
   2.174 +                        }
   2.175  
   2.176 -              case _ =>
   2.177 +                      purge(Vector(data.parent))
   2.178 +
   2.179 +                    case _ =>
   2.180 +                  }
   2.181 +
   2.182 +                case _ =>
   2.183 +              }
   2.184 +
   2.185 +              new_serial = serial
   2.186              }
   2.187  
   2.188 -            new_serial = serial
   2.189 -          }
   2.190 +            new_context = new_context.with_serial(new_serial)
   2.191 +            contexts += (id -> new_context)
   2.192 +            slot.fulfill(new_context)
   2.193  
   2.194 -          new_context = new_context.with_serial(new_serial)
   2.195 -          contexts += (id -> new_context)
   2.196 -          reply(new_context)
   2.197 -
   2.198 -        case Generate_Trace(results) =>
   2.199 -          // Since there are potentially lots of trace messages, we do not cache them here again.
   2.200 -          // Instead, everytime the trace is being requested, we re-assemble it based on the
   2.201 -          // current results.
   2.202 +          case Generate_Trace(results, slot) =>
   2.203 +            // Since there are potentially lots of trace messages, we do not cache them here again.
   2.204 +            // Instead, everytime the trace is being requested, we re-assemble it based on the
   2.205 +            // current results.
   2.206  
   2.207 -          val items =
   2.208 -            (for { (_, Item(_, data)) <- results.iterator }
   2.209 -              yield data).toList
   2.210 +            val items =
   2.211 +              (for { (_, Item(_, data)) <- results.iterator }
   2.212 +                yield data).toList
   2.213  
   2.214 -          reply(Trace(items))
   2.215 +            slot.fulfill(Trace(items))
   2.216  
   2.217 -        case Cancel(serial) =>
   2.218 -          find_question(serial) match {
   2.219 -            case Some((id, _)) =>
   2.220 -              do_cancel(serial, id)
   2.221 -            case None =>
   2.222 -          }
   2.223 +          case Cancel(serial) =>
   2.224 +            find_question(serial) match {
   2.225 +              case Some((id, _)) =>
   2.226 +                do_cancel(serial, id)
   2.227 +              case None =>
   2.228 +            }
   2.229  
   2.230 -        case Clear_Memory =>
   2.231 -          memory_children = Map.empty
   2.232 -          memory = Map.empty
   2.233 -
   2.234 -        case Stop =>
   2.235 -          contexts = Map.empty
   2.236 -          exit("Simplifier_Trace: manager actor stopped")
   2.237 +          case Clear_Memory =>
   2.238 +            memory_children = Map.empty
   2.239 +            memory = Map.empty
   2.240  
   2.241 -        case Reply(session, serial, answer) =>
   2.242 -          find_question(serial) match {
   2.243 -            case Some((id, Question(data, _, _))) =>
   2.244 -              if (data.markup == Markup.SIMP_TRACE_STEP && data.memory)
   2.245 -              {
   2.246 -                val index = Index.of_data(data)
   2.247 -                memory += (index -> answer)
   2.248 -              }
   2.249 -              do_cancel(serial, id)
   2.250 -            case None =>
   2.251 -              System.err.println("send_reply: unknown serial " + serial)
   2.252 -          }
   2.253 +          case Reply(session, serial, answer) =>
   2.254 +            find_question(serial) match {
   2.255 +              case Some((id, Question(data, _, _))) =>
   2.256 +                if (data.markup == Markup.SIMP_TRACE_STEP && data.memory)
   2.257 +                {
   2.258 +                  val index = Index.of_data(data)
   2.259 +                  memory += (index -> answer)
   2.260 +                }
   2.261 +                do_cancel(serial, id)
   2.262 +              case None =>
   2.263 +                System.err.println("send_reply: unknown serial " + serial)
   2.264 +            }
   2.265  
   2.266 -          do_reply(session, serial, answer)
   2.267 -          session.trace_events.post(Event)
   2.268 -
   2.269 -        case bad =>
   2.270 -          System.err.println("context_manager: bad message " + bad)
   2.271 -      }
   2.272 -    }
   2.273 +            do_reply(session, serial, answer)
   2.274 +            session.trace_events.post(Event)
   2.275 +        }
   2.276 +        true
   2.277 +      },
   2.278 +      finish = () => contexts = Map.empty
   2.279 +    )
   2.280    }
   2.281  
   2.282  
   2.283 @@ -301,10 +306,12 @@
   2.284  
   2.285    class Handler extends Session.Protocol_Handler
   2.286    {
   2.287 +    assert(manager.is_active)
   2.288 +
   2.289      private def cancel(prover: Prover, msg: Prover.Protocol_Output): Boolean =
   2.290        msg.properties match {
   2.291          case Markup.Simp_Trace_Cancel(serial) =>
   2.292 -          manager ! Cancel(serial)
   2.293 +          manager.send(Cancel(serial))
   2.294            true
   2.295          case _ =>
   2.296            false
   2.297 @@ -312,8 +319,8 @@
   2.298  
   2.299      override def stop(prover: Prover) =
   2.300      {
   2.301 -      manager ! Clear_Memory
   2.302 -      manager ! Stop
   2.303 +      manager.send(Clear_Memory)
   2.304 +      manager.shutdown()
   2.305      }
   2.306  
   2.307      val functions = Map(Markup.SIMP_TRACE_CANCEL -> cancel _)