--- a/src/Pure/System/progress.scala Sun Aug 20 21:05:56 2023 +0200
+++ b/src/Pure/System/progress.scala Sun Aug 20 22:24:24 2023 +0200
@@ -325,35 +325,39 @@
_consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")(
bulk = _ => true, timeout = timeout,
- consume = { bulk_output =>
- val serial = _serial
- val serial_db = Progress.private_data.read_messages_serial(db, _context)
+ consume = { bulk_output0 =>
+ val results =
+ for (bulk_output <- bulk_output0.grouped(200).toList) yield {
+ val serial = _serial
+ val serial_db = Progress.private_data.read_messages_serial(db, _context)
- _serial = _serial max serial_db
+ _serial = _serial max serial_db
- if (bulk_output.nonEmpty) {
- for (out <- bulk_output) {
- out match {
- case message: Progress.Message =>
- if (do_output(message)) base_progress.output(message)
- case theory: Progress.Theory => base_progress.theory(theory)
- }
- }
+ if (bulk_output.nonEmpty) {
+ for (out <- bulk_output) {
+ out match {
+ case message: Progress.Message =>
+ if (do_output(message)) base_progress.output(message)
+ case theory: Progress.Theory => base_progress.theory(theory)
+ }
+ }
- val messages =
- for ((out, i) <- bulk_output.zipWithIndex)
- yield (_serial + i + 1) -> out.message
+ val messages =
+ for ((out, i) <- bulk_output.zipWithIndex)
+ yield (_serial + i + 1) -> out.message
- Progress.private_data.write_messages(db, _context, messages)
+ Progress.private_data.write_messages(db, _context, messages)
- _serial = messages.last._1
- }
+ _serial = messages.last._1
+ }
- if (_serial != serial_db) {
- Progress.private_data.update_agent(db, _agent_uuid, _serial)
- }
+ if (_serial != serial_db) {
+ Progress.private_data.update_agent(db, _agent_uuid, _serial)
+ }
- (bulk_output.map(_ => Exn.Res(())), true)
+ bulk_output.map(_ => Exn.Res(()))
+ }
+ (results.flatten, true)
})
}
--- a/src/Pure/Thy/export.scala Sun Aug 20 21:05:56 2023 +0200
+++ b/src/Pure/Thy/export.scala Sun Aug 20 22:24:24 2023 +0200
@@ -261,46 +261,47 @@
Consumer_Thread.fork_bulk[(Entry, Boolean)](name = "export")(
bulk = { case (entry, _) => entry.is_finished },
consume =
- { (args: List[(Entry, Boolean)]) =>
- for ((entry, _) <- args) {
- if (progress.stopped) entry.cancel() else entry.body.join
- }
- private_data.transaction_lock(db, label = "Export.consumer(" + args.length + ")") {
- var known = private_data.known_entries(db, args.map(p => p._1.entry_name))
- val buffer = new mutable.ListBuffer[Option[Entry]]
+ { (args0: List[(Entry, Boolean)]) =>
+ val results: List[List[Exn.Result[Unit]]] =
+ for (args <- args0.grouped(20).toList) yield {
+ for ((entry, _) <- args) {
+ if (progress.stopped) entry.cancel() else entry.body.join
+ }
+ private_data.transaction_lock(db, label = "Export.consumer(" + args.length + ")") {
+ var known = private_data.known_entries(db, args.map(p => p._1.entry_name))
+ val buffer = new mutable.ListBuffer[Option[Entry]]
- for ((entry, strict) <- args) {
- if (progress.stopped) {
- buffer += None
- }
- else if (known(entry.entry_name)) {
- if (strict) {
- val msg = message("Duplicate export", entry.theory_name, entry.name)
- errors.change(msg :: _)
+ for ((entry, strict) <- args) {
+ if (progress.stopped) {
+ buffer += None
+ }
+ else if (known(entry.entry_name)) {
+ if (strict) {
+ val msg = message("Duplicate export", entry.theory_name, entry.name)
+ errors.change(msg :: _)
+ }
+ buffer += None
+ }
+ else {
+ known += entry.entry_name
+ buffer += Some(entry)
+ }
}
- buffer += None
- }
- else {
- known += entry.entry_name
- buffer += Some(entry)
+
+ val entries = buffer.toList
+ try {
+ private_data.write_entries(db, entries)
+ val ok = Exn.Res[Unit](())
+ entries.map(_ => ok)
+ }
+ catch {
+ case exn: Throwable =>
+ val err = Exn.Exn[Unit](exn)
+ entries.map(_ => err)
+ }
}
}
-
- val entries = buffer.toList
- val results =
- try {
- private_data.write_entries(db, entries)
- val ok = Exn.Res[Unit](())
- entries.map(_ => ok)
- }
- catch {
- case exn: Throwable =>
- val err = Exn.Exn[Unit](exn)
- entries.map(_ => err)
- }
-
- (results, true)
- }
+ (results.flatten, true)
})
def make_entry(session_name: String, args: Protocol.Export.Args, body: Bytes): Unit = {