src/Pure/PIDE/session.scala
changeset 75393 87ebf5a50283
parent 74731 161e84e6b40a
child 75440 39011d0d2128
equal deleted inserted replaced
75388:b3ca4a6ed74b 75393:87ebf5a50283
    11 import scala.collection.immutable.Queue
    11 import scala.collection.immutable.Queue
    12 import scala.collection.mutable
    12 import scala.collection.mutable
    13 import scala.annotation.tailrec
    13 import scala.annotation.tailrec
    14 
    14 
    15 
    15 
    16 object Session
    16 object Session {
    17 {
       
    18   /* outlets */
    17   /* outlets */
    19 
    18 
    20   object Consumer
    19   object Consumer {
    21   {
       
    22     def apply[A](name: String)(consume: A => Unit): Consumer[A] =
    20     def apply[A](name: String)(consume: A => Unit): Consumer[A] =
    23       new Consumer[A](name, consume)
    21       new Consumer[A](name, consume)
    24   }
    22   }
    25   final class Consumer[-A] private(val name: String, val consume: A => Unit)
    23   final class Consumer[-A] private(val name: String, val consume: A => Unit)
    26 
    24 
    27   class Outlet[A](dispatcher: Consumer_Thread[() => Unit])
    25   class Outlet[A](dispatcher: Consumer_Thread[() => Unit]) {
    28   {
       
    29     private val consumers = Synchronized[List[Consumer[A]]](Nil)
    26     private val consumers = Synchronized[List[Consumer[A]]](Nil)
    30 
    27 
    31     def += (c: Consumer[A]): Unit = consumers.change(Library.update(c))
    28     def += (c: Consumer[A]): Unit = consumers.change(Library.update(c))
    32     def -= (c: Consumer[A]): Unit = consumers.change(Library.remove(c))
    29     def -= (c: Consumer[A]): Unit = consumers.change(Library.remove(c))
    33 
    30 
    34     def post(a: A): Unit =
    31     def post(a: A): Unit = {
    35     {
       
    36       for (c <- consumers.value.iterator) {
    32       for (c <- consumers.value.iterator) {
    37         dispatcher.send(() =>
    33         dispatcher.send(() =>
    38           try { c.consume(a) }
    34           try { c.consume(a) }
    39           catch {
    35           catch {
    40             case exn: Throwable =>
    36             case exn: Throwable =>
    71   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    67   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    72   case class Build_Theories(id: String, master_dir: Path, theories: List[(Options, List[Path])])
    68   case class Build_Theories(id: String, master_dir: Path, theories: List[(Options, List[Path])])
    73   case class Commands_Changed(
    69   case class Commands_Changed(
    74     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    70     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    75 
    71 
    76   sealed abstract class Phase
    72   sealed abstract class Phase {
    77   {
       
    78     def print: String =
    73     def print: String =
    79       this match {
    74       this match {
    80         case Terminated(result) => if (result.ok) "finished" else "failed"
    75         case Terminated(result) => if (result.ok) "finished" else "failed"
    81         case _ => Word.lowercase(this.toString)
    76         case _ => Word.lowercase(this.toString)
    82       }
    77       }
    89   //}}}
    84   //}}}
    90 
    85 
    91 
    86 
    92   /* syslog */
    87   /* syslog */
    93 
    88 
    94   private[Session] class Syslog(limit: Int)
    89   private[Session] class Syslog(limit: Int) {
    95   {
       
    96     private var queue = Queue.empty[XML.Elem]
    90     private var queue = Queue.empty[XML.Elem]
    97     private var length = 0
    91     private var length = 0
    98 
    92 
    99     def += (msg: XML.Elem): Unit = synchronized {
    93     def += (msg: XML.Elem): Unit = synchronized {
   100       queue = queue.enqueue(msg)
    94       queue = queue.enqueue(msg)
   111 
   105 
   112   /* protocol handlers */
   106   /* protocol handlers */
   113 
   107 
   114   type Protocol_Function = Prover.Protocol_Output => Boolean
   108   type Protocol_Function = Prover.Protocol_Output => Boolean
   115 
   109 
   116   abstract class Protocol_Handler extends Isabelle_System.Service
   110   abstract class Protocol_Handler extends Isabelle_System.Service {
   117   {
       
   118     def init(session: Session): Unit = {}
   111     def init(session: Session): Unit = {}
   119     def exit(): Unit = {}
   112     def exit(): Unit = {}
   120     def functions: List[(String, Protocol_Function)] = Nil
   113     def functions: List[(String, Protocol_Function)] = Nil
   121     def prover_options(options: Options): Options = options
   114     def prover_options(options: Options): Options = options
   122   }
   115   }
   123 }
   116 }
   124 
   117 
   125 
   118 
   126 class Session(_session_options: => Options, val resources: Resources) extends Document.Session
   119 class Session(_session_options: => Options, val resources: Resources) extends Document.Session {
   127 {
       
   128   session =>
   120   session =>
   129 
   121 
   130   val cache: Term.Cache = Term.Cache.make()
   122   val cache: Term.Cache = Term.Cache.make()
   131 
   123 
   132   def build_blobs_info(name: Document.Node.Name): Command.Blobs_Info =
   124   def build_blobs_info(name: Document.Node.Name): Command.Blobs_Info =
   154   /* dispatcher */
   146   /* dispatcher */
   155 
   147 
   156   private val dispatcher =
   148   private val dispatcher =
   157     Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true }
   149     Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true }
   158 
   150 
   159   def assert_dispatcher[A](body: => A): A =
   151   def assert_dispatcher[A](body: => A): A = {
   160   {
       
   161     assert(dispatcher.check_thread())
   152     assert(dispatcher.check_thread())
   162     body
   153     body
   163   }
   154   }
   164 
   155 
   165   def require_dispatcher[A](body: => A): A =
   156   def require_dispatcher[A](body: => A): A = {
   166   {
       
   167     require(dispatcher.check_thread(), "not on dispatcher thread")
   157     require(dispatcher.check_thread(), "not on dispatcher thread")
   168     body
   158     body
   169   }
   159   }
   170 
   160 
   171   def send_dispatcher(body: => Unit): Unit =
   161   def send_dispatcher(body: => Unit): Unit = {
   172   {
       
   173     if (dispatcher.check_thread()) body
   162     if (dispatcher.check_thread()) body
   174     else dispatcher.send(() => body)
   163     else dispatcher.send(() => body)
   175   }
   164   }
   176 
   165 
   177   def send_wait_dispatcher(body: => Unit): Unit =
   166   def send_wait_dispatcher(body: => Unit): Unit = {
   178   {
       
   179     if (dispatcher.check_thread()) body
   167     if (dispatcher.check_thread()) body
   180     else dispatcher.send_wait(() => body)
   168     else dispatcher.send_wait(() => body)
   181   }
   169   }
   182 
   170 
   183 
   171 
   216   private case object Prune_History
   204   private case object Prune_History
   217 
   205 
   218 
   206 
   219   /* phase */
   207   /* phase */
   220 
   208 
   221   private def post_phase(new_phase: Session.Phase): Session.Phase =
   209   private def post_phase(new_phase: Session.Phase): Session.Phase = {
   222   {
       
   223     phase_changed.post(new_phase)
   210     phase_changed.post(new_phase)
   224     new_phase
   211     new_phase
   225   }
   212   }
   226   private val _phase = Synchronized[Session.Phase](Session.Inactive)
   213   private val _phase = Synchronized[Session.Phase](Session.Inactive)
   227   private def phase_=(new_phase: Session.Phase): Unit = _phase.change(_ => post_phase(new_phase))
   214   private def phase_=(new_phase: Session.Phase): Unit = _phase.change(_ => post_phase(new_phase))
   243     doc_blobs: Document.Blobs,
   230     doc_blobs: Document.Blobs,
   244     text_edits: List[Document.Edit_Text],
   231     text_edits: List[Document.Edit_Text],
   245     consolidate: List[Document.Node.Name],
   232     consolidate: List[Document.Node.Name],
   246     version_result: Promise[Document.Version])
   233     version_result: Promise[Document.Version])
   247 
   234 
   248   private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true)
   235   private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true) {
   249   {
       
   250     case Text_Edits(previous, doc_blobs, text_edits, consolidate, version_result) =>
   236     case Text_Edits(previous, doc_blobs, text_edits, consolidate, version_result) =>
   251       val prev = previous.get_finished
   237       val prev = previous.get_finished
   252       val change =
   238       val change =
   253         Timing.timeit("parse_change", timing) {
   239         Timing.timeit("parse_change", timing) {
   254           resources.parse_change(reparse_limit, prev, doc_blobs, text_edits, consolidate)
   240           resources.parse_change(reparse_limit, prev, doc_blobs, text_edits, consolidate)
   259   }
   245   }
   260 
   246 
   261 
   247 
   262   /* buffered changes */
   248   /* buffered changes */
   263 
   249 
   264   private object change_buffer
   250   private object change_buffer {
   265   {
       
   266     private var assignment: Boolean = false
   251     private var assignment: Boolean = false
   267     private var nodes: Set[Document.Node.Name] = Set.empty
   252     private var nodes: Set[Document.Node.Name] = Set.empty
   268     private var commands: Set[Command] = Set.empty
   253     private var commands: Set[Command] = Set.empty
   269 
   254 
   270     def flush(): Unit = synchronized {
   255     def flush(): Unit = synchronized {
   289           commands += command
   274           commands += command
   290         }
   275         }
   291         delay_flush.invoke()
   276         delay_flush.invoke()
   292       }
   277       }
   293 
   278 
   294     def shutdown(): Unit =
   279     def shutdown(): Unit = {
   295     {
       
   296       delay_flush.revoke()
   280       delay_flush.revoke()
   297       flush()
   281       flush()
   298     }
   282     }
   299   }
   283   }
   300 
   284 
   301 
   285 
   302   /* postponed changes */
   286   /* postponed changes */
   303 
   287 
   304   private object postponed_changes
   288   private object postponed_changes {
   305   {
       
   306     private var postponed: List[Session.Change] = Nil
   289     private var postponed: List[Session.Change] = Nil
   307 
   290 
   308     def store(change: Session.Change): Unit = synchronized { postponed ::= change }
   291     def store(change: Session.Change): Unit = synchronized { postponed ::= change }
   309 
   292 
   310     def flush(state: Document.State): List[Session.Change] = synchronized {
   293     def flush(state: Document.State): List[Session.Change] = synchronized {
   315   }
   298   }
   316 
   299 
   317 
   300 
   318   /* node consolidation */
   301   /* node consolidation */
   319 
   302 
   320   private object consolidation
   303   private object consolidation {
   321   {
       
   322     private val delay =
   304     private val delay =
   323       Delay.first(consolidate_delay) { manager.send(Consolidate_Execution) }
   305       Delay.first(consolidate_delay) { manager.send(Consolidate_Execution) }
   324 
   306 
   325     private val init_state: Option[Set[Document.Node.Name]] = Some(Set.empty)
   307     private val init_state: Option[Set[Document.Node.Name]] = Some(Set.empty)
   326     private val state = Synchronized(init_state)
   308     private val state = Synchronized(init_state)
   327 
   309 
   328     def exit(): Unit =
   310     def exit(): Unit = {
   329     {
       
   330       delay.revoke()
   311       delay.revoke()
   331       state.change(_ => None)
   312       state.change(_ => None)
   332     }
   313     }
   333 
   314 
   334     def update(new_nodes: Set[Document.Node.Name] = Set.empty): Unit =
   315     def update(new_nodes: Set[Document.Node.Name] = Set.empty): Unit = {
   335     {
       
   336       val active =
   316       val active =
   337         state.change_result(st =>
   317         state.change_result(st =>
   338           (st.isDefined, st.map(nodes => if (nodes.isEmpty) new_nodes else nodes ++ new_nodes)))
   318           (st.isDefined, st.map(nodes => if (nodes.isEmpty) new_nodes else nodes ++ new_nodes)))
   339       if (active) delay.invoke()
   319       if (active) delay.invoke()
   340     }
   320     }
   344   }
   324   }
   345 
   325 
   346 
   326 
   347   /* prover process */
   327   /* prover process */
   348 
   328 
   349   private object prover
   329   private object prover {
   350   {
       
   351     private val variable = Synchronized[Option[Prover]](None)
   330     private val variable = Synchronized[Option[Prover]](None)
   352 
   331 
   353     def defined: Boolean = variable.value.isDefined
   332     def defined: Boolean = variable.value.isDefined
   354     def get: Prover = variable.value.get
   333     def get: Prover = variable.value.get
   355     def set(p: Prover): Unit = variable.change(_ => Some(p))
   334     def set(p: Prover): Unit = variable.change(_ => Some(p))
   389   /* manager thread */
   368   /* manager thread */
   390 
   369 
   391   private val delay_prune =
   370   private val delay_prune =
   392     Delay.first(prune_delay) { manager.send(Prune_History) }
   371     Delay.first(prune_delay) { manager.send(Prune_History) }
   393 
   372 
   394   private val manager: Consumer_Thread[Any] =
   373   private val manager: Consumer_Thread[Any] = {
   395   {
       
   396     /* global state */
   374     /* global state */
   397     val global_state = Synchronized(Document.State.init)
   375     val global_state = Synchronized(Document.State.init)
   398 
   376 
   399 
   377 
   400     /* raw edits */
   378     /* raw edits */
   401 
   379 
   402     def handle_raw_edits(
   380     def handle_raw_edits(
   403       doc_blobs: Document.Blobs = Document.Blobs.empty,
   381       doc_blobs: Document.Blobs = Document.Blobs.empty,
   404       edits: List[Document.Edit_Text] = Nil,
   382       edits: List[Document.Edit_Text] = Nil,
   405       consolidate: List[Document.Node.Name] = Nil): Unit =
   383       consolidate: List[Document.Node.Name] = Nil): Unit = {
   406     //{{{
   384     //{{{
   407     {
       
   408       require(prover.defined, "prover process not defined (handle_raw_edits)")
   385       require(prover.defined, "prover process not defined (handle_raw_edits)")
   409 
   386 
   410       if (edits.nonEmpty) prover.get.discontinue_execution()
   387       if (edits.nonEmpty) prover.get.discontinue_execution()
   411 
   388 
   412       val previous = global_state.value.history.tip.version
   389       val previous = global_state.value.history.tip.version
   413       val version = Future.promise[Document.Version]
   390       val version = Future.promise[Document.Version]
   414       global_state.change(_.continue_history(previous, edits, version))
   391       global_state.change(_.continue_history(previous, edits, version))
   415 
   392 
   416       raw_edits.post(Session.Raw_Edits(doc_blobs, edits))
   393       raw_edits.post(Session.Raw_Edits(doc_blobs, edits))
   417       change_parser.send(Text_Edits(previous, doc_blobs, edits, consolidate, version))
   394       change_parser.send(Text_Edits(previous, doc_blobs, edits, consolidate, version))
   418     }
       
   419     //}}}
   395     //}}}
       
   396     }
   420 
   397 
   421 
   398 
   422     /* resulting changes */
   399     /* resulting changes */
   423 
   400 
   424     def handle_change(change: Session.Change): Unit =
   401     def handle_change(change: Session.Change): Unit = {
   425     //{{{
   402     //{{{
   426     {
       
   427       require(prover.defined, "prover process not defined (handle_change)")
   403       require(prover.defined, "prover process not defined (handle_change)")
   428 
   404 
   429       // define commands
   405       // define commands
   430       {
   406       {
   431         val id_commands = new mutable.ListBuffer[Command]
   407         val id_commands = new mutable.ListBuffer[Command]
   432         def id_command(command: Command): Unit =
   408         def id_command(command: Command): Unit = {
   433         {
       
   434           for {
   409           for {
   435             (name, digest) <- command.blobs_defined
   410             (name, digest) <- command.blobs_defined
   436             if !global_state.value.defined_blob(digest)
   411             if !global_state.value.defined_blob(digest)
   437           } {
   412           } {
   438             change.version.nodes(name).get_blob match {
   413             change.version.nodes(name).get_blob match {
   460       val assignment = global_state.value.the_assignment(change.previous).check_finished
   435       val assignment = global_state.value.the_assignment(change.previous).check_finished
   461       global_state.change(_.define_version(change.version, assignment))
   436       global_state.change(_.define_version(change.version, assignment))
   462 
   437 
   463       prover.get.update(change.previous.id, change.version.id, change.doc_edits, change.consolidate)
   438       prover.get.update(change.previous.id, change.version.id, change.doc_edits, change.consolidate)
   464       resources.commit(change)
   439       resources.commit(change)
   465     }
       
   466     //}}}
   440     //}}}
       
   441     }
   467 
   442 
   468 
   443 
   469     /* prover output */
   444     /* prover output */
   470 
   445 
   471     def handle_output(output: Prover.Output): Unit =
   446     def handle_output(output: Prover.Output): Unit = {
   472     //{{{
   447     //{{{
   473     {
   448       def bad_output(): Unit = {
   474       def bad_output(): Unit =
       
   475       {
       
   476         if (verbose)
   449         if (verbose)
   477           Output.warning("Ignoring bad prover output: " + output.message.toString)
   450           Output.warning("Ignoring bad prover output: " + output.message.toString)
   478       }
   451       }
   479 
   452 
   480       def change_command(f: Document.State => (Command.State, Document.State)): Unit =
   453       def change_command(f: Document.State => (Command.State, Document.State)): Unit = {
   481       {
       
   482         try {
   454         try {
   483           val st = global_state.change_result(f)
   455           val st = global_state.change_result(f)
   484           if (!st.command.span.is_theory) {
   456           if (!st.command.span.is_theory) {
   485             change_buffer.invoke(false, Nil, List(st.command))
   457             change_buffer.invoke(false, Nil, List(st.command))
   486           }
   458           }
   589 
   561 
   590             case _ =>
   562             case _ =>
   591               raw_output_messages.post(output)
   563               raw_output_messages.post(output)
   592           }
   564           }
   593         }
   565         }
   594     }
       
   595     //}}}
   566     //}}}
       
   567     }
   596 
   568 
   597 
   569 
   598     /* main thread */
   570     /* main thread */
   599 
   571 
   600     Consumer_Thread.fork[Any]("Session.manager", daemon = true)
   572     Consumer_Thread.fork[Any]("Session.manager", daemon = true) {
   601     {
       
   602       case arg: Any =>
   573       case arg: Any =>
   603         //{{{
   574         //{{{
   604         arg match {
   575         arg match {
   605           case output: Prover.Output =>
   576           case output: Prover.Output =>
   606             if (output.is_syslog) {
   577             if (output.is_syslog) {
   695   }
   666   }
   696 
   667 
   697 
   668 
   698   /* main operations */
   669   /* main operations */
   699 
   670 
   700   def get_state(): Document.State =
   671   def get_state(): Document.State = {
   701   {
       
   702     if (manager.is_active()) {
   672     if (manager.is_active()) {
   703       val promise = Future.promise[Document.State]
   673       val promise = Future.promise[Document.State]
   704       manager.send_wait(Get_State(promise))
   674       manager.send_wait(Get_State(promise))
   705       promise.join
   675       promise.join
   706     }
   676     }
   713 
   683 
   714   def recent_syntax(name: Document.Node.Name): Outer_Syntax =
   684   def recent_syntax(name: Document.Node.Name): Outer_Syntax =
   715     get_state().recent_finished.version.get_finished.nodes(name).syntax getOrElse
   685     get_state().recent_finished.version.get_finished.nodes(name).syntax getOrElse
   716     resources.session_base.overall_syntax
   686     resources.session_base.overall_syntax
   717 
   687 
   718   @tailrec final def await_stable_snapshot(): Document.Snapshot =
   688   @tailrec final def await_stable_snapshot(): Document.Snapshot = {
   719   {
       
   720     val snapshot = this.snapshot()
   689     val snapshot = this.snapshot()
   721     if (snapshot.is_outdated) {
   690     if (snapshot.is_outdated) {
   722       output_delay.sleep()
   691       output_delay.sleep()
   723       await_stable_snapshot()
   692       await_stable_snapshot()
   724     }
   693     }
   725     else snapshot
   694     else snapshot
   726   }
   695   }
   727 
   696 
   728   def start(start_prover: Prover.Receiver => Prover): Unit =
   697   def start(start_prover: Prover.Receiver => Prover): Unit = {
   729   {
       
   730     file_formats
   698     file_formats
   731     _phase.change(
   699     _phase.change(
   732       {
   700       {
   733         case Session.Inactive =>
   701         case Session.Inactive =>
   734           manager.send(Start(start_prover))
   702           manager.send(Start(start_prover))
   735           post_phase(Session.Startup)
   703           post_phase(Session.Startup)
   736         case phase => error("Cannot start prover in phase " + quote(phase.print))
   704         case phase => error("Cannot start prover in phase " + quote(phase.print))
   737       })
   705       })
   738   }
   706   }
   739 
   707 
   740   def stop(): Process_Result =
   708   def stop(): Process_Result = {
   741   {
       
   742     val was_ready =
   709     val was_ready =
   743       _phase.guarded_access(
   710       _phase.guarded_access(
   744         {
   711         {
   745           case Session.Startup | Session.Shutdown => None
   712           case Session.Startup | Session.Shutdown => None
   746           case Session.Terminated(_) => Some((false, phase))
   713           case Session.Terminated(_) => Some((false, phase))