# HG changeset patch # User wenzelm # Date 1692563064 -7200 # Node ID 4ffc1933f5d906ec14f12ea7cd3890e61c7a8d17 # Parent d95497dcd9bc38745a39f8b66963e0027cffc601 limit size and complexity of bulk transactions; diff -r d95497dcd9bc -r 4ffc1933f5d9 src/Pure/System/progress.scala --- a/src/Pure/System/progress.scala Sun Aug 20 21:05:56 2023 +0200 +++ b/src/Pure/System/progress.scala Sun Aug 20 22:24:24 2023 +0200 @@ -325,35 +325,39 @@ _consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")( bulk = _ => true, timeout = timeout, - consume = { bulk_output => - val serial = _serial - val serial_db = Progress.private_data.read_messages_serial(db, _context) + consume = { bulk_output0 => + val results = + for (bulk_output <- bulk_output0.grouped(200).toList) yield { + val serial = _serial + val serial_db = Progress.private_data.read_messages_serial(db, _context) - _serial = _serial max serial_db + _serial = _serial max serial_db - if (bulk_output.nonEmpty) { - for (out <- bulk_output) { - out match { - case message: Progress.Message => - if (do_output(message)) base_progress.output(message) - case theory: Progress.Theory => base_progress.theory(theory) - } - } + if (bulk_output.nonEmpty) { + for (out <- bulk_output) { + out match { + case message: Progress.Message => + if (do_output(message)) base_progress.output(message) + case theory: Progress.Theory => base_progress.theory(theory) + } + } - val messages = - for ((out, i) <- bulk_output.zipWithIndex) - yield (_serial + i + 1) -> out.message + val messages = + for ((out, i) <- bulk_output.zipWithIndex) + yield (_serial + i + 1) -> out.message - Progress.private_data.write_messages(db, _context, messages) + Progress.private_data.write_messages(db, _context, messages) - _serial = messages.last._1 - } + _serial = messages.last._1 + } - if (_serial != serial_db) { - Progress.private_data.update_agent(db, _agent_uuid, _serial) - } + if (_serial != serial_db) { + Progress.private_data.update_agent(db, _agent_uuid, _serial) + } - (bulk_output.map(_ => Exn.Res(())), true) + bulk_output.map(_ => Exn.Res(())) + } + (results.flatten, true) }) } diff -r d95497dcd9bc -r 4ffc1933f5d9 src/Pure/Thy/export.scala --- a/src/Pure/Thy/export.scala Sun Aug 20 21:05:56 2023 +0200 +++ b/src/Pure/Thy/export.scala Sun Aug 20 22:24:24 2023 +0200 @@ -261,46 +261,47 @@ Consumer_Thread.fork_bulk[(Entry, Boolean)](name = "export")( bulk = { case (entry, _) => entry.is_finished }, consume = - { (args: List[(Entry, Boolean)]) => - for ((entry, _) <- args) { - if (progress.stopped) entry.cancel() else entry.body.join - } - private_data.transaction_lock(db, label = "Export.consumer(" + args.length + ")") { - var known = private_data.known_entries(db, args.map(p => p._1.entry_name)) - val buffer = new mutable.ListBuffer[Option[Entry]] + { (args0: List[(Entry, Boolean)]) => + val results: List[List[Exn.Result[Unit]]] = + for (args <- args0.grouped(20).toList) yield { + for ((entry, _) <- args) { + if (progress.stopped) entry.cancel() else entry.body.join + } + private_data.transaction_lock(db, label = "Export.consumer(" + args.length + ")") { + var known = private_data.known_entries(db, args.map(p => p._1.entry_name)) + val buffer = new mutable.ListBuffer[Option[Entry]] - for ((entry, strict) <- args) { - if (progress.stopped) { - buffer += None - } - else if (known(entry.entry_name)) { - if (strict) { - val msg = message("Duplicate export", entry.theory_name, entry.name) - errors.change(msg :: _) + for ((entry, strict) <- args) { + if (progress.stopped) { + buffer += None + } + else if (known(entry.entry_name)) { + if (strict) { + val msg = message("Duplicate export", entry.theory_name, entry.name) + errors.change(msg :: _) + } + buffer += None + } + else { + known += entry.entry_name + buffer += Some(entry) + } } - buffer += None - } - else { - known += entry.entry_name - buffer += Some(entry) + + val entries = buffer.toList + try { + private_data.write_entries(db, entries) + val ok = Exn.Res[Unit](()) + entries.map(_ => ok) + } + catch { + case exn: Throwable => + val err = Exn.Exn[Unit](exn) + entries.map(_ => err) + } } } - - val entries = buffer.toList - val results = - try { - private_data.write_entries(db, entries) - val ok = Exn.Res[Unit](()) - entries.map(_ => ok) - } - catch { - case exn: Throwable => - val err = Exn.Exn[Unit](exn) - entries.map(_ => err) - } - - (results, true) - } + (results.flatten, true) }) def make_entry(session_name: String, args: Protocol.Export.Args, body: Bytes): Unit = {