# HG changeset patch # User wenzelm # Date 1692698046 -7200 # Node ID 8ba186dc9bc87dd64fc6ebdb302b339614edc829 # Parent 1789ecbaf28b22cef527c178b619c6083be9f19f clarified source structure; diff -r 1789ecbaf28b -r 8ba186dc9bc8 src/Pure/System/progress.scala --- a/src/Pure/System/progress.scala Tue Aug 22 11:33:25 2023 +0200 +++ b/src/Pure/System/progress.scala Tue Aug 22 11:54:06 2023 +0200 @@ -321,33 +321,29 @@ } if (context_uuid == _agent_uuid) db.vacuum(Progress.private_data.tables.list) + def consume(bulk_output: List[Progress.Output]): List[Exn.Result[Unit]] = sync_database { + if (bulk_output.nonEmpty) { + for (out <- bulk_output) { + out match { + case message: Progress.Message => + if (do_output(message)) base_progress.output(message) + case theory: Progress.Theory => base_progress.theory(theory) + } + } + + val messages = + for ((out, i) <- bulk_output.zipWithIndex) + yield (_serial + i + 1) -> out.message + + Progress.private_data.write_messages(db, _context, messages) + _serial = messages.last._1 + } + bulk_output.map(_ => Exn.Res(())) + } + _consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")( bulk = _ => true, timeout = timeout, - consume = { bulk_output0 => - val results = - for (bulk_output <- bulk_output0.grouped(200).toList) yield { - sync_database { - if (bulk_output.nonEmpty) { - for (out <- bulk_output) { - out match { - case message: Progress.Message => - if (do_output(message)) base_progress.output(message) - case theory: Progress.Theory => base_progress.theory(theory) - } - } - - val messages = - for ((out, i) <- bulk_output.zipWithIndex) - yield (_serial + i + 1) -> out.message - - Progress.private_data.write_messages(db, _context, messages) - _serial = messages.last._1 - } - bulk_output.map(_ => Exn.Res(())) - } - } - (results.flatten, true) - }) + consume = bulk_output => (bulk_output.grouped(200).toList.flatMap(consume), true)) } def exit(close: Boolean = false): Unit = synchronized { diff -r 1789ecbaf28b -r 8ba186dc9bc8 src/Pure/Thy/export.scala --- a/src/Pure/Thy/export.scala Tue Aug 22 11:33:25 2023 +0200 +++ b/src/Pure/Thy/export.scala Tue Aug 22 11:54:06 2023 +0200 @@ -253,49 +253,46 @@ class Consumer private[Export](db: SQL.Database, cache: XML.Cache, progress: Progress) { private val errors = Synchronized[List[String]](Nil) + private def consume(args: List[(Entry, Boolean)]): List[Exn.Result[Unit]] = { + for ((entry, _) <- args) { + if (progress.stopped) entry.cancel() else entry.body.join + } + private_data.transaction_lock(db, label = "Export.consumer(" + args.length + ")") { + var known = private_data.known_entries(db, args.map(p => p._1.entry_name)) + val buffer = new mutable.ListBuffer[Option[Entry]] + + for ((entry, strict) <- args) { + if (progress.stopped || known(entry.entry_name)) { + buffer += None + if (strict && known(entry.entry_name)) { + val msg = message("Duplicate export", entry.theory_name, entry.name) + errors.change(msg :: _) + } + } + else { + buffer += Some(entry) + known += entry.entry_name + } + } + + val entries = buffer.toList + try { + private_data.write_entries(db, entries.flatten) + val ok = Exn.Res[Unit](()) + entries.map(_ => ok) + } + catch { + case exn: Throwable => + val err = Exn.Exn[Unit](exn) + entries.map(_ => err) + } + } + } + private val consumer = Consumer_Thread.fork_bulk[(Entry, Boolean)](name = "export")( bulk = { case (entry, _) => entry.is_finished }, - consume = - { (args0: List[(Entry, Boolean)]) => - val results: List[List[Exn.Result[Unit]]] = - for (args <- args0.grouped(20).toList) yield { - for ((entry, _) <- args) { - if (progress.stopped) entry.cancel() else entry.body.join - } - private_data.transaction_lock(db, label = "Export.consumer(" + args.length + ")") { - var known = private_data.known_entries(db, args.map(p => p._1.entry_name)) - val buffer = new mutable.ListBuffer[Option[Entry]] - - for ((entry, strict) <- args) { - if (progress.stopped || known(entry.entry_name)) { - buffer += None - if (strict && known(entry.entry_name)) { - val msg = message("Duplicate export", entry.theory_name, entry.name) - errors.change(msg :: _) - } - } - else { - buffer += Some(entry) - known += entry.entry_name - } - } - - val entries = buffer.toList - try { - private_data.write_entries(db, entries.flatten) - val ok = Exn.Res[Unit](()) - entries.map(_ => ok) - } - catch { - case exn: Throwable => - val err = Exn.Exn[Unit](exn) - entries.map(_ => err) - } - } - } - (results.flatten, true) - }) + consume = args => (args.grouped(20).toList.flatMap(consume), true)) def make_entry(session_name: String, args: Protocol.Export.Args, body: Bytes): Unit = { if (!progress.stopped && !body.is_empty) {