proper sync_database for Database_Progress consumer;
authorwenzelm
Mon, 21 Aug 2023 11:56:07 +0200
changeset 78550 d8cc0f625b52
parent 78549 1c5cbece77d2
child 78551 d0c9d277620e
proper sync_database for Database_Progress consumer; more efficient (but less reactive) Database_Progress.stopped;
src/Pure/System/progress.scala
--- a/src/Pure/System/progress.scala	Mon Aug 21 11:43:29 2023 +0200
+++ b/src/Pure/System/progress.scala	Mon Aug 21 11:56:07 2023 +0200
@@ -328,29 +328,25 @@
       consume = { bulk_output0 =>
         val results =
           for (bulk_output <- bulk_output0.grouped(200).toList) yield {
-            val serial_db = Progress.private_data.read_messages_serial(db, _context)
-            _serial = _serial max serial_db
-
-            if (bulk_output.nonEmpty) {
-              for (out <- bulk_output) {
-                out match {
-                  case message: Progress.Message =>
-                    if (do_output(message)) base_progress.output(message)
-                  case theory: Progress.Theory => base_progress.theory(theory)
+            sync_database {
+              if (bulk_output.nonEmpty) {
+                for (out <- bulk_output) {
+                  out match {
+                    case message: Progress.Message =>
+                      if (do_output(message)) base_progress.output(message)
+                    case theory: Progress.Theory => base_progress.theory(theory)
+                  }
                 }
-              }
 
-              val messages =
-                for ((out, i) <- bulk_output.zipWithIndex)
-                  yield (_serial + i + 1) -> out.message
+                val messages =
+                  for ((out, i) <- bulk_output.zipWithIndex)
+                    yield (_serial + i + 1) -> out.message
 
-              Progress.private_data.write_messages(db, _context, messages)
-              _serial = messages.last._1
+                Progress.private_data.write_messages(db, _context, messages)
+                _serial = messages.last._1
+              }
+              bulk_output.map(_ => Exn.Res(()))
             }
-
-            if (_serial != serial_db) Progress.private_data.update_agent(db, _agent_uuid, _serial)
-
-            bulk_output.map(_ => Exn.Res(()))
           }
         (results.flatten, true)
       })
@@ -397,9 +393,11 @@
         _serial = _serial max Progress.private_data.read_messages_serial(db, _context)
       }
 
+      val res = body
+
       if (_serial != serial0) Progress.private_data.update_agent(db, _agent_uuid, _serial)
 
-      body
+      res
     }
   }
 
@@ -414,8 +412,8 @@
   override def verbose: Boolean = base_progress.verbose
 
   override def stop(): Unit = sync_context { base_progress.stop(); sync() }
-  override def stopped: Boolean = sync_database { base_progress.stopped }
-  override def stopped_local: Boolean = sync_database { base_progress.stopped && !_stopped_db }
+  override def stopped: Boolean = sync_context { base_progress.stopped }
+  override def stopped_local: Boolean = sync_context { base_progress.stopped && !_stopped_db }
 
   override def toString: String = super.toString + ": database " + db