src/Pure/PIDE/session.scala
changeset 56719 80eb2192516a
parent 56715 52125652e82a
child 56733 f7700146678d
equal deleted inserted replaced
56718:096139bcfadd 56719:80eb2192516a
   180   val all_messages = new Session.Outlet[Prover.Message](dispatcher)  // potential bottle-neck
   180   val all_messages = new Session.Outlet[Prover.Message](dispatcher)  // potential bottle-neck
   181   val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher)
   181   val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher)
   182 
   182 
   183 
   183 
   184 
   184 
   185   /** buffered changes: to be dispatched to clients **/
       
   186 
       
   187   private case class Received_Change(assignment: Boolean, commands: List[Command])
       
   188 
       
   189   private val change_buffer: Consumer_Thread[Received_Change] =
       
   190   {
       
   191     object changed
       
   192     {
       
   193       private var assignment: Boolean = false
       
   194       private var nodes: Set[Document.Node.Name] = Set.empty
       
   195       private var commands: Set[Command] = Set.empty
       
   196 
       
   197       def flush(): Unit = synchronized {
       
   198         if (assignment || !nodes.isEmpty || !commands.isEmpty)
       
   199           commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
       
   200         assignment = false
       
   201         nodes = Set.empty
       
   202         commands = Set.empty
       
   203       }
       
   204 
       
   205       def invoke(change: Received_Change): Unit = synchronized {
       
   206         assignment |= change.assignment
       
   207         for (command <- change.commands) {
       
   208           nodes += command.node_name
       
   209           commands += command
       
   210         }
       
   211       }
       
   212     }
       
   213 
       
   214     val timer = new Timer("change_buffer", true)
       
   215     timer.schedule(new TimerTask { def run = changed.flush() }, output_delay.ms, output_delay.ms)
       
   216 
       
   217     Consumer_Thread.fork[Received_Change]("change_buffer", daemon = true)(
       
   218       consume = { case change => changed.invoke(change); true },
       
   219       finish = () => { timer.cancel(); changed.flush() }
       
   220     )
       
   221   }
       
   222 
       
   223 
       
   224 
       
   225   /** pipelined change parsing **/
   185   /** pipelined change parsing **/
   226 
   186 
   227   private case class Text_Edits(
   187   private case class Text_Edits(
   228     previous: Future[Document.Version],
   188     previous: Future[Document.Version],
   229     doc_blobs: Document.Blobs,
   189     doc_blobs: Document.Blobs,
   301   private case object Stop
   261   private case object Stop
   302   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   262   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   303   private case class Protocol_Command(name: String, args: List[String])
   263   private case class Protocol_Command(name: String, args: List[String])
   304   private case class Messages(msgs: List[Prover.Message])
   264   private case class Messages(msgs: List[Prover.Message])
   305   private case class Update_Options(options: Options)
   265   private case class Update_Options(options: Options)
       
   266 
       
   267 
       
   268   /* buffered changes */
       
   269 
       
   270   private object change_buffer
       
   271   {
       
   272     private var assignment: Boolean = false
       
   273     private var nodes: Set[Document.Node.Name] = Set.empty
       
   274     private var commands: Set[Command] = Set.empty
       
   275 
       
   276     def flush(): Unit = synchronized {
       
   277       if (assignment || !nodes.isEmpty || !commands.isEmpty)
       
   278         commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
       
   279       assignment = false
       
   280       nodes = Set.empty
       
   281       commands = Set.empty
       
   282     }
       
   283 
       
   284     def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
       
   285       assignment |= assign
       
   286       for (command <- cmds) {
       
   287         nodes += command.node_name
       
   288         commands += command
       
   289       }
       
   290     }
       
   291 
       
   292     private val timer = new Timer("change_buffer", true)
       
   293     timer.schedule(new TimerTask { def run = flush() }, output_delay.ms, output_delay.ms)
       
   294 
       
   295     def shutdown()
       
   296     {
       
   297       timer.cancel()
       
   298       flush()
       
   299     }
       
   300   }
   306 
   301 
   307 
   302 
   308   /* buffered prover messages */
   303   /* buffered prover messages */
   309 
   304 
   310   private object receiver
   305   private object receiver
   441 
   436 
   442       def accumulate(state_id: Document_ID.Generic, message: XML.Elem)
   437       def accumulate(state_id: Document_ID.Generic, message: XML.Elem)
   443       {
   438       {
   444         try {
   439         try {
   445           val st = global_state.change_result(_.accumulate(state_id, message))
   440           val st = global_state.change_result(_.accumulate(state_id, message))
   446           change_buffer.send(Received_Change(false, List(st.command)))
   441           change_buffer.invoke(false, List(st.command))
   447         }
   442         }
   448         catch {
   443         catch {
   449           case _: Document.State.Fail => bad_output()
   444           case _: Document.State.Fail => bad_output()
   450         }
   445         }
   451       }
   446       }
   465               case Markup.Assign_Update =>
   460               case Markup.Assign_Update =>
   466                 msg.text match {
   461                 msg.text match {
   467                   case Protocol.Assign_Update(id, update) =>
   462                   case Protocol.Assign_Update(id, update) =>
   468                     try {
   463                     try {
   469                       val cmds = global_state.change_result(_.assign(id, update))
   464                       val cmds = global_state.change_result(_.assign(id, update))
   470                       change_buffer.send(Received_Change(true, cmds))
   465                       change_buffer.invoke(true, cmds)
   471                     }
   466                     }
   472                     catch { case _: Document.State.Fail => bad_output() }
   467                     catch { case _: Document.State.Fail => bad_output() }
   473                     postponed_changes.flush()
   468                     postponed_changes.flush()
   474                   case _ => bad_output()
   469                   case _ => bad_output()
   475                 }
   470                 }