replaced manager Actor by Consumer_Thread, which is lazy to defer its start to actual Handler init time;
authorwenzelm
Fri, 25 Apr 2014 13:29:56 +0200
changeset 56718 096139bcfadd
parent 56717 d96b10ec397c
child 56719 80eb2192516a
replaced manager Actor by Consumer_Thread, which is lazy to defer its start to actual Handler init time;
src/Pure/Concurrent/consumer_thread.scala
src/Pure/Tools/simplifier_trace.scala
--- a/src/Pure/Concurrent/consumer_thread.scala	Fri Apr 25 12:59:33 2014 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala	Fri Apr 25 13:29:56 2014 +0200
@@ -33,7 +33,7 @@
   private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
 
   private val thread = Simple_Thread.fork(name, daemon) { main_loop() }
-  private def is_active: Boolean = active && thread.isAlive
+  def is_active: Boolean = active && thread.isAlive
 
   private def failure(exn: Throwable): Unit =
     System.err.println(
--- a/src/Pure/Tools/simplifier_trace.scala	Fri Apr 25 12:59:33 2014 +0200
+++ b/src/Pure/Tools/simplifier_trace.scala	Fri Apr 25 13:29:56 2014 +0200
@@ -7,7 +7,6 @@
 package isabelle
 
 
-import scala.actors.Actor._
 import scala.annotation.tailrec
 import scala.collection.immutable.SortedMap
 
@@ -102,14 +101,13 @@
   case object Event
 
 
-  /* manager actor */
+  /* manager thread */
 
   private case class Handle_Results(
-    session: Session, id: Document_ID.Command, results: Command.Results)
-  private case class Generate_Trace(results: Command.Results)
+    session: Session, id: Document_ID.Command, results: Command.Results, slot: Promise[Context])
+  private case class Generate_Trace(results: Command.Results, slot: Promise[Trace])
   private case class Cancel(serial: Long)
   private object Clear_Memory
-  private object Stop
   case class Reply(session: Session, serial: Long, answer: Answer)
 
   case class Question(data: Item.Data, answers: List[Answer], default_answer: Answer)
@@ -139,18 +137,27 @@
   }
 
   def handle_results(session: Session, id: Document_ID.Command, results: Command.Results): Context =
-    (manager !? Handle_Results(session, id, results)).asInstanceOf[Context]
+  {
+    val slot = Future.promise[Context]
+    manager.send(Handle_Results(session, id, results, slot))
+    slot.join
+  }
 
   def generate_trace(results: Command.Results): Trace =
-    (manager !? Generate_Trace(results)).asInstanceOf[Trace]
+  {
+    val slot = Future.promise[Trace]
+    manager.send(Generate_Trace(results, slot))
+    slot.join
+  }
 
   def clear_memory() =
-    manager ! Clear_Memory
+    manager.send(Clear_Memory)
 
   def send_reply(session: Session, serial: Long, answer: Answer) =
-    manager ! Reply(session, serial, answer)
+    manager.send(Reply(session, serial, answer))
 
-  private val manager = actor {
+  private lazy val manager: Consumer_Thread[Any] =
+  {
     var contexts = Map.empty[Document_ID.Command, Context]
 
     var memory_children = Map.empty[Long, Set[Long]]
@@ -177,123 +184,121 @@
         "Simplifier_Trace.reply", Properties.Value.Long(serial), answer.name)
     }
 
-    loop {
-      react {
-        case Handle_Results(session, id, results) =>
-          var new_context = contexts.getOrElse(id, Context())
-          var new_serial = new_context.last_serial
+    Consumer_Thread.fork[Any]("Simplifier_Trace.manager", daemon = true)(
+      consume = (arg: Any) =>
+      {
+        arg match {
+          case Handle_Results(session, id, results, slot) =>
+            var new_context = contexts.getOrElse(id, Context())
+            var new_serial = new_context.last_serial
 
-          for ((serial, result) <- results.iterator if serial > new_context.last_serial)
-          {
-            result match {
-              case Item(markup, data) =>
-                memory_children +=
-                  (data.parent -> (memory_children.getOrElse(data.parent, Set.empty) + serial))
-
-                markup match {
+            for ((serial, result) <- results.iterator if serial > new_context.last_serial)
+            {
+              result match {
+                case Item(markup, data) =>
+                  memory_children +=
+                    (data.parent -> (memory_children.getOrElse(data.parent, Set.empty) + serial))
 
-                  case Markup.SIMP_TRACE_STEP =>
-                    val index = Index.of_data(data)
-                    memory.get(index) match {
-                      case Some(answer) if data.memory =>
-                        do_reply(session, serial, answer)
-                      case _ =>
-                        new_context += Question(data, Answer.step.all, Answer.step.default)
-                    }
+                  markup match {
 
-                  case Markup.SIMP_TRACE_HINT =>
-                    data.props match {
-                      case Success(false) =>
-                        results.get(data.parent) match {
-                          case Some(Item(Markup.SIMP_TRACE_STEP, _)) =>
-                            new_context +=
-                              Question(data, Answer.hint_fail.all, Answer.hint_fail.default)
-                          case _ =>
-                            // unknown, better send a default reply
-                            do_reply(session, data.serial, Answer.hint_fail.default)
-                        }
-                      case _ =>
-                    }
+                    case Markup.SIMP_TRACE_STEP =>
+                      val index = Index.of_data(data)
+                      memory.get(index) match {
+                        case Some(answer) if data.memory =>
+                          do_reply(session, serial, answer)
+                        case _ =>
+                          new_context += Question(data, Answer.step.all, Answer.step.default)
+                      }
 
-                  case Markup.SIMP_TRACE_IGNORE =>
-                    // At this point, we know that the parent of this 'IGNORE' entry is a 'STEP'
-                    // entry, and that that 'STEP' entry is about to be replayed. Hence, we need
-                    // to selectively purge the replies which have been memorized, going down from
-                    // the parent to all leaves.
-
-                    @tailrec
-                    def purge(queue: Vector[Long]): Unit =
-                      queue match {
-                        case s +: rest =>
-                          for (Item(Markup.SIMP_TRACE_STEP, data) <- results.get(s))
-                            memory -= Index.of_data(data)
-                          val children = memory_children.getOrElse(s, Set.empty)
-                          memory_children -= s
-                          purge(rest ++ children.toVector)
+                    case Markup.SIMP_TRACE_HINT =>
+                      data.props match {
+                        case Success(false) =>
+                          results.get(data.parent) match {
+                            case Some(Item(Markup.SIMP_TRACE_STEP, _)) =>
+                              new_context +=
+                                Question(data, Answer.hint_fail.all, Answer.hint_fail.default)
+                            case _ =>
+                              // unknown, better send a default reply
+                              do_reply(session, data.serial, Answer.hint_fail.default)
+                          }
                         case _ =>
                       }
 
-                    purge(Vector(data.parent))
+                    case Markup.SIMP_TRACE_IGNORE =>
+                      // At this point, we know that the parent of this 'IGNORE' entry is a 'STEP'
+                      // entry, and that that 'STEP' entry is about to be replayed. Hence, we need
+                      // to selectively purge the replies which have been memorized, going down from
+                      // the parent to all leaves.
 
-                  case _ =>
-                }
+                      @tailrec
+                      def purge(queue: Vector[Long]): Unit =
+                        queue match {
+                          case s +: rest =>
+                            for (Item(Markup.SIMP_TRACE_STEP, data) <- results.get(s))
+                              memory -= Index.of_data(data)
+                            val children = memory_children.getOrElse(s, Set.empty)
+                            memory_children -= s
+                            purge(rest ++ children.toVector)
+                          case _ =>
+                        }
 
-              case _ =>
+                      purge(Vector(data.parent))
+
+                    case _ =>
+                  }
+
+                case _ =>
+              }
+
+              new_serial = serial
             }
 
-            new_serial = serial
-          }
+            new_context = new_context.with_serial(new_serial)
+            contexts += (id -> new_context)
+            slot.fulfill(new_context)
 
-          new_context = new_context.with_serial(new_serial)
-          contexts += (id -> new_context)
-          reply(new_context)
-
-        case Generate_Trace(results) =>
-          // Since there are potentially lots of trace messages, we do not cache them here again.
-          // Instead, everytime the trace is being requested, we re-assemble it based on the
-          // current results.
+          case Generate_Trace(results, slot) =>
+            // Since there are potentially lots of trace messages, we do not cache them here again.
+            // Instead, everytime the trace is being requested, we re-assemble it based on the
+            // current results.
 
-          val items =
-            (for { (_, Item(_, data)) <- results.iterator }
-              yield data).toList
+            val items =
+              (for { (_, Item(_, data)) <- results.iterator }
+                yield data).toList
 
-          reply(Trace(items))
+            slot.fulfill(Trace(items))
 
-        case Cancel(serial) =>
-          find_question(serial) match {
-            case Some((id, _)) =>
-              do_cancel(serial, id)
-            case None =>
-          }
+          case Cancel(serial) =>
+            find_question(serial) match {
+              case Some((id, _)) =>
+                do_cancel(serial, id)
+              case None =>
+            }
 
-        case Clear_Memory =>
-          memory_children = Map.empty
-          memory = Map.empty
-
-        case Stop =>
-          contexts = Map.empty
-          exit("Simplifier_Trace: manager actor stopped")
+          case Clear_Memory =>
+            memory_children = Map.empty
+            memory = Map.empty
 
-        case Reply(session, serial, answer) =>
-          find_question(serial) match {
-            case Some((id, Question(data, _, _))) =>
-              if (data.markup == Markup.SIMP_TRACE_STEP && data.memory)
-              {
-                val index = Index.of_data(data)
-                memory += (index -> answer)
-              }
-              do_cancel(serial, id)
-            case None =>
-              System.err.println("send_reply: unknown serial " + serial)
-          }
+          case Reply(session, serial, answer) =>
+            find_question(serial) match {
+              case Some((id, Question(data, _, _))) =>
+                if (data.markup == Markup.SIMP_TRACE_STEP && data.memory)
+                {
+                  val index = Index.of_data(data)
+                  memory += (index -> answer)
+                }
+                do_cancel(serial, id)
+              case None =>
+                System.err.println("send_reply: unknown serial " + serial)
+            }
 
-          do_reply(session, serial, answer)
-          session.trace_events.post(Event)
-
-        case bad =>
-          System.err.println("context_manager: bad message " + bad)
-      }
-    }
+            do_reply(session, serial, answer)
+            session.trace_events.post(Event)
+        }
+        true
+      },
+      finish = () => contexts = Map.empty
+    )
   }
 
 
@@ -301,10 +306,12 @@
 
   class Handler extends Session.Protocol_Handler
   {
+    assert(manager.is_active)
+
     private def cancel(prover: Prover, msg: Prover.Protocol_Output): Boolean =
       msg.properties match {
         case Markup.Simp_Trace_Cancel(serial) =>
-          manager ! Cancel(serial)
+          manager.send(Cancel(serial))
           true
         case _ =>
           false
@@ -312,8 +319,8 @@
 
     override def stop(prover: Prover) =
     {
-      manager ! Clear_Memory
-      manager ! Stop
+      manager.send(Clear_Memory)
+      manager.shutdown()
     }
 
     val functions = Map(Markup.SIMP_TRACE_CANCEL -> cancel _)