limit size and complexity of bulk transactions;
authorwenzelm
Sun, 20 Aug 2023 22:24:24 +0200
changeset 78542 4ffc1933f5d9
parent 78541 d95497dcd9bc
child 78543 cfdb586adbbd
limit size and complexity of bulk transactions;
src/Pure/System/progress.scala
src/Pure/Thy/export.scala
--- a/src/Pure/System/progress.scala	Sun Aug 20 21:05:56 2023 +0200
+++ b/src/Pure/System/progress.scala	Sun Aug 20 22:24:24 2023 +0200
@@ -325,35 +325,39 @@
 
     _consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")(
       bulk = _ => true, timeout = timeout,
-      consume = { bulk_output =>
-        val serial = _serial
-        val serial_db = Progress.private_data.read_messages_serial(db, _context)
+      consume = { bulk_output0 =>
+        val results =
+          for (bulk_output <- bulk_output0.grouped(200).toList) yield {
+            val serial = _serial
+            val serial_db = Progress.private_data.read_messages_serial(db, _context)
 
-        _serial = _serial max serial_db
+            _serial = _serial max serial_db
 
-        if (bulk_output.nonEmpty) {
-          for (out <- bulk_output) {
-            out match {
-              case message: Progress.Message =>
-                if (do_output(message)) base_progress.output(message)
-              case theory: Progress.Theory => base_progress.theory(theory)
-            }
-          }
+            if (bulk_output.nonEmpty) {
+              for (out <- bulk_output) {
+                out match {
+                  case message: Progress.Message =>
+                    if (do_output(message)) base_progress.output(message)
+                  case theory: Progress.Theory => base_progress.theory(theory)
+                }
+              }
 
-          val messages =
-            for ((out, i) <- bulk_output.zipWithIndex)
-              yield (_serial + i + 1) -> out.message
+              val messages =
+                for ((out, i) <- bulk_output.zipWithIndex)
+                  yield (_serial + i + 1) -> out.message
 
-          Progress.private_data.write_messages(db, _context, messages)
+              Progress.private_data.write_messages(db, _context, messages)
 
-          _serial = messages.last._1
-        }
+              _serial = messages.last._1
+            }
 
-        if (_serial != serial_db) {
-          Progress.private_data.update_agent(db, _agent_uuid, _serial)
-        }
+            if (_serial != serial_db) {
+              Progress.private_data.update_agent(db, _agent_uuid, _serial)
+            }
 
-        (bulk_output.map(_ => Exn.Res(())), true)
+            bulk_output.map(_ => Exn.Res(()))
+          }
+        (results.flatten, true)
       })
   }
 
--- a/src/Pure/Thy/export.scala	Sun Aug 20 21:05:56 2023 +0200
+++ b/src/Pure/Thy/export.scala	Sun Aug 20 22:24:24 2023 +0200
@@ -261,46 +261,47 @@
       Consumer_Thread.fork_bulk[(Entry, Boolean)](name = "export")(
         bulk = { case (entry, _) => entry.is_finished },
         consume =
-          { (args: List[(Entry, Boolean)]) =>
-            for ((entry, _) <- args) {
-              if (progress.stopped) entry.cancel() else entry.body.join
-            }
-            private_data.transaction_lock(db, label = "Export.consumer(" + args.length + ")") {
-              var known = private_data.known_entries(db, args.map(p => p._1.entry_name))
-              val buffer = new mutable.ListBuffer[Option[Entry]]
+          { (args0: List[(Entry, Boolean)]) =>
+            val results: List[List[Exn.Result[Unit]]] =
+              for (args <- args0.grouped(20).toList) yield {
+                for ((entry, _) <- args) {
+                  if (progress.stopped) entry.cancel() else entry.body.join
+                }
+                private_data.transaction_lock(db, label = "Export.consumer(" + args.length + ")") {
+                  var known = private_data.known_entries(db, args.map(p => p._1.entry_name))
+                  val buffer = new mutable.ListBuffer[Option[Entry]]
 
-              for ((entry, strict) <- args) {
-                if (progress.stopped) {
-                  buffer += None
-                }
-                else if (known(entry.entry_name)) {
-                  if (strict) {
-                    val msg = message("Duplicate export", entry.theory_name, entry.name)
-                    errors.change(msg :: _)
+                  for ((entry, strict) <- args) {
+                    if (progress.stopped) {
+                      buffer += None
+                    }
+                    else if (known(entry.entry_name)) {
+                      if (strict) {
+                        val msg = message("Duplicate export", entry.theory_name, entry.name)
+                        errors.change(msg :: _)
+                      }
+                      buffer += None
+                    }
+                    else {
+                      known += entry.entry_name
+                      buffer += Some(entry)
+                    }
                   }
-                  buffer += None
-                }
-                else {
-                  known += entry.entry_name
-                  buffer += Some(entry)
+
+                  val entries = buffer.toList
+                  try {
+                    private_data.write_entries(db, entries)
+                    val ok = Exn.Res[Unit](())
+                    entries.map(_ => ok)
+                  }
+                  catch {
+                    case exn: Throwable =>
+                      val err = Exn.Exn[Unit](exn)
+                      entries.map(_ => err)
+                  }
                 }
               }
-
-              val entries = buffer.toList
-              val results =
-                try {
-                  private_data.write_entries(db, entries)
-                  val ok = Exn.Res[Unit](())
-                  entries.map(_ => ok)
-                }
-                catch {
-                  case exn: Throwable =>
-                    val err = Exn.Exn[Unit](exn)
-                    entries.map(_ => err)
-                }
-
-              (results, true)
-            }
+            (results.flatten, true)
           })
 
     def make_entry(session_name: String, args: Protocol.Export.Args, body: Bytes): Unit = {