# HG changeset patch # User wenzelm # Date 1398425396 -7200 # Node ID 096139bcfaddc60bfdc06cc3b5a7fc3efddceaa4 # Parent d96b10ec397caa0f84d749b8b843ab718940e1fa replaced manager Actor by Consumer_Thread, which is lazy to defer its start to actual Handler init time; diff -r d96b10ec397c -r 096139bcfadd src/Pure/Concurrent/consumer_thread.scala --- a/src/Pure/Concurrent/consumer_thread.scala Fri Apr 25 12:59:33 2014 +0200 +++ b/src/Pure/Concurrent/consumer_thread.scala Fri Apr 25 13:29:56 2014 +0200 @@ -33,7 +33,7 @@ private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]] private val thread = Simple_Thread.fork(name, daemon) { main_loop() } - private def is_active: Boolean = active && thread.isAlive + def is_active: Boolean = active && thread.isAlive private def failure(exn: Throwable): Unit = System.err.println( diff -r d96b10ec397c -r 096139bcfadd src/Pure/Tools/simplifier_trace.scala --- a/src/Pure/Tools/simplifier_trace.scala Fri Apr 25 12:59:33 2014 +0200 +++ b/src/Pure/Tools/simplifier_trace.scala Fri Apr 25 13:29:56 2014 +0200 @@ -7,7 +7,6 @@ package isabelle -import scala.actors.Actor._ import scala.annotation.tailrec import scala.collection.immutable.SortedMap @@ -102,14 +101,13 @@ case object Event - /* manager actor */ + /* manager thread */ private case class Handle_Results( - session: Session, id: Document_ID.Command, results: Command.Results) - private case class Generate_Trace(results: Command.Results) + session: Session, id: Document_ID.Command, results: Command.Results, slot: Promise[Context]) + private case class Generate_Trace(results: Command.Results, slot: Promise[Trace]) private case class Cancel(serial: Long) private object Clear_Memory - private object Stop case class Reply(session: Session, serial: Long, answer: Answer) case class Question(data: Item.Data, answers: List[Answer], default_answer: Answer) @@ -139,18 +137,27 @@ } def handle_results(session: Session, id: Document_ID.Command, results: Command.Results): Context = - (manager !? Handle_Results(session, id, results)).asInstanceOf[Context] + { + val slot = Future.promise[Context] + manager.send(Handle_Results(session, id, results, slot)) + slot.join + } def generate_trace(results: Command.Results): Trace = - (manager !? Generate_Trace(results)).asInstanceOf[Trace] + { + val slot = Future.promise[Trace] + manager.send(Generate_Trace(results, slot)) + slot.join + } def clear_memory() = - manager ! Clear_Memory + manager.send(Clear_Memory) def send_reply(session: Session, serial: Long, answer: Answer) = - manager ! Reply(session, serial, answer) + manager.send(Reply(session, serial, answer)) - private val manager = actor { + private lazy val manager: Consumer_Thread[Any] = + { var contexts = Map.empty[Document_ID.Command, Context] var memory_children = Map.empty[Long, Set[Long]] @@ -177,123 +184,121 @@ "Simplifier_Trace.reply", Properties.Value.Long(serial), answer.name) } - loop { - react { - case Handle_Results(session, id, results) => - var new_context = contexts.getOrElse(id, Context()) - var new_serial = new_context.last_serial + Consumer_Thread.fork[Any]("Simplifier_Trace.manager", daemon = true)( + consume = (arg: Any) => + { + arg match { + case Handle_Results(session, id, results, slot) => + var new_context = contexts.getOrElse(id, Context()) + var new_serial = new_context.last_serial - for ((serial, result) <- results.iterator if serial > new_context.last_serial) - { - result match { - case Item(markup, data) => - memory_children += - (data.parent -> (memory_children.getOrElse(data.parent, Set.empty) + serial)) - - markup match { + for ((serial, result) <- results.iterator if serial > new_context.last_serial) + { + result match { + case Item(markup, data) => + memory_children += + (data.parent -> (memory_children.getOrElse(data.parent, Set.empty) + serial)) - case Markup.SIMP_TRACE_STEP => - val index = Index.of_data(data) - memory.get(index) match { - case Some(answer) if data.memory => - do_reply(session, serial, answer) - case _ => - new_context += Question(data, Answer.step.all, Answer.step.default) - } + markup match { - case Markup.SIMP_TRACE_HINT => - data.props match { - case Success(false) => - results.get(data.parent) match { - case Some(Item(Markup.SIMP_TRACE_STEP, _)) => - new_context += - Question(data, Answer.hint_fail.all, Answer.hint_fail.default) - case _ => - // unknown, better send a default reply - do_reply(session, data.serial, Answer.hint_fail.default) - } - case _ => - } + case Markup.SIMP_TRACE_STEP => + val index = Index.of_data(data) + memory.get(index) match { + case Some(answer) if data.memory => + do_reply(session, serial, answer) + case _ => + new_context += Question(data, Answer.step.all, Answer.step.default) + } - case Markup.SIMP_TRACE_IGNORE => - // At this point, we know that the parent of this 'IGNORE' entry is a 'STEP' - // entry, and that that 'STEP' entry is about to be replayed. Hence, we need - // to selectively purge the replies which have been memorized, going down from - // the parent to all leaves. - - @tailrec - def purge(queue: Vector[Long]): Unit = - queue match { - case s +: rest => - for (Item(Markup.SIMP_TRACE_STEP, data) <- results.get(s)) - memory -= Index.of_data(data) - val children = memory_children.getOrElse(s, Set.empty) - memory_children -= s - purge(rest ++ children.toVector) + case Markup.SIMP_TRACE_HINT => + data.props match { + case Success(false) => + results.get(data.parent) match { + case Some(Item(Markup.SIMP_TRACE_STEP, _)) => + new_context += + Question(data, Answer.hint_fail.all, Answer.hint_fail.default) + case _ => + // unknown, better send a default reply + do_reply(session, data.serial, Answer.hint_fail.default) + } case _ => } - purge(Vector(data.parent)) + case Markup.SIMP_TRACE_IGNORE => + // At this point, we know that the parent of this 'IGNORE' entry is a 'STEP' + // entry, and that that 'STEP' entry is about to be replayed. Hence, we need + // to selectively purge the replies which have been memorized, going down from + // the parent to all leaves. - case _ => - } + @tailrec + def purge(queue: Vector[Long]): Unit = + queue match { + case s +: rest => + for (Item(Markup.SIMP_TRACE_STEP, data) <- results.get(s)) + memory -= Index.of_data(data) + val children = memory_children.getOrElse(s, Set.empty) + memory_children -= s + purge(rest ++ children.toVector) + case _ => + } - case _ => + purge(Vector(data.parent)) + + case _ => + } + + case _ => + } + + new_serial = serial } - new_serial = serial - } + new_context = new_context.with_serial(new_serial) + contexts += (id -> new_context) + slot.fulfill(new_context) - new_context = new_context.with_serial(new_serial) - contexts += (id -> new_context) - reply(new_context) - - case Generate_Trace(results) => - // Since there are potentially lots of trace messages, we do not cache them here again. - // Instead, everytime the trace is being requested, we re-assemble it based on the - // current results. + case Generate_Trace(results, slot) => + // Since there are potentially lots of trace messages, we do not cache them here again. + // Instead, everytime the trace is being requested, we re-assemble it based on the + // current results. - val items = - (for { (_, Item(_, data)) <- results.iterator } - yield data).toList + val items = + (for { (_, Item(_, data)) <- results.iterator } + yield data).toList - reply(Trace(items)) + slot.fulfill(Trace(items)) - case Cancel(serial) => - find_question(serial) match { - case Some((id, _)) => - do_cancel(serial, id) - case None => - } + case Cancel(serial) => + find_question(serial) match { + case Some((id, _)) => + do_cancel(serial, id) + case None => + } - case Clear_Memory => - memory_children = Map.empty - memory = Map.empty - - case Stop => - contexts = Map.empty - exit("Simplifier_Trace: manager actor stopped") + case Clear_Memory => + memory_children = Map.empty + memory = Map.empty - case Reply(session, serial, answer) => - find_question(serial) match { - case Some((id, Question(data, _, _))) => - if (data.markup == Markup.SIMP_TRACE_STEP && data.memory) - { - val index = Index.of_data(data) - memory += (index -> answer) - } - do_cancel(serial, id) - case None => - System.err.println("send_reply: unknown serial " + serial) - } + case Reply(session, serial, answer) => + find_question(serial) match { + case Some((id, Question(data, _, _))) => + if (data.markup == Markup.SIMP_TRACE_STEP && data.memory) + { + val index = Index.of_data(data) + memory += (index -> answer) + } + do_cancel(serial, id) + case None => + System.err.println("send_reply: unknown serial " + serial) + } - do_reply(session, serial, answer) - session.trace_events.post(Event) - - case bad => - System.err.println("context_manager: bad message " + bad) - } - } + do_reply(session, serial, answer) + session.trace_events.post(Event) + } + true + }, + finish = () => contexts = Map.empty + ) } @@ -301,10 +306,12 @@ class Handler extends Session.Protocol_Handler { + assert(manager.is_active) + private def cancel(prover: Prover, msg: Prover.Protocol_Output): Boolean = msg.properties match { case Markup.Simp_Trace_Cancel(serial) => - manager ! Cancel(serial) + manager.send(Cancel(serial)) true case _ => false @@ -312,8 +319,8 @@ override def stop(prover: Prover) = { - manager ! Clear_Memory - manager ! Stop + manager.send(Clear_Memory) + manager.shutdown() } val functions = Map(Markup.SIMP_TRACE_CANCEL -> cancel _)