--- a/src/Pure/System/progress.scala Tue Aug 22 11:33:25 2023 +0200
+++ b/src/Pure/System/progress.scala Tue Aug 22 11:54:06 2023 +0200
@@ -321,33 +321,29 @@
}
if (context_uuid == _agent_uuid) db.vacuum(Progress.private_data.tables.list)
+ def consume(bulk_output: List[Progress.Output]): List[Exn.Result[Unit]] = sync_database {
+ if (bulk_output.nonEmpty) {
+ for (out <- bulk_output) {
+ out match {
+ case message: Progress.Message =>
+ if (do_output(message)) base_progress.output(message)
+ case theory: Progress.Theory => base_progress.theory(theory)
+ }
+ }
+
+ val messages =
+ for ((out, i) <- bulk_output.zipWithIndex)
+ yield (_serial + i + 1) -> out.message
+
+ Progress.private_data.write_messages(db, _context, messages)
+ _serial = messages.last._1
+ }
+ bulk_output.map(_ => Exn.Res(()))
+ }
+
_consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")(
bulk = _ => true, timeout = timeout,
- consume = { bulk_output0 =>
- val results =
- for (bulk_output <- bulk_output0.grouped(200).toList) yield {
- sync_database {
- if (bulk_output.nonEmpty) {
- for (out <- bulk_output) {
- out match {
- case message: Progress.Message =>
- if (do_output(message)) base_progress.output(message)
- case theory: Progress.Theory => base_progress.theory(theory)
- }
- }
-
- val messages =
- for ((out, i) <- bulk_output.zipWithIndex)
- yield (_serial + i + 1) -> out.message
-
- Progress.private_data.write_messages(db, _context, messages)
- _serial = messages.last._1
- }
- bulk_output.map(_ => Exn.Res(()))
- }
- }
- (results.flatten, true)
- })
+ consume = bulk_output => (bulk_output.grouped(200).toList.flatMap(consume), true))
}
def exit(close: Boolean = false): Unit = synchronized {
--- a/src/Pure/Thy/export.scala Tue Aug 22 11:33:25 2023 +0200
+++ b/src/Pure/Thy/export.scala Tue Aug 22 11:54:06 2023 +0200
@@ -253,49 +253,46 @@
class Consumer private[Export](db: SQL.Database, cache: XML.Cache, progress: Progress) {
private val errors = Synchronized[List[String]](Nil)
+ private def consume(args: List[(Entry, Boolean)]): List[Exn.Result[Unit]] = {
+ for ((entry, _) <- args) {
+ if (progress.stopped) entry.cancel() else entry.body.join
+ }
+ private_data.transaction_lock(db, label = "Export.consumer(" + args.length + ")") {
+ var known = private_data.known_entries(db, args.map(p => p._1.entry_name))
+ val buffer = new mutable.ListBuffer[Option[Entry]]
+
+ for ((entry, strict) <- args) {
+ if (progress.stopped || known(entry.entry_name)) {
+ buffer += None
+ if (strict && known(entry.entry_name)) {
+ val msg = message("Duplicate export", entry.theory_name, entry.name)
+ errors.change(msg :: _)
+ }
+ }
+ else {
+ buffer += Some(entry)
+ known += entry.entry_name
+ }
+ }
+
+ val entries = buffer.toList
+ try {
+ private_data.write_entries(db, entries.flatten)
+ val ok = Exn.Res[Unit](())
+ entries.map(_ => ok)
+ }
+ catch {
+ case exn: Throwable =>
+ val err = Exn.Exn[Unit](exn)
+ entries.map(_ => err)
+ }
+ }
+ }
+
private val consumer =
Consumer_Thread.fork_bulk[(Entry, Boolean)](name = "export")(
bulk = { case (entry, _) => entry.is_finished },
- consume =
- { (args0: List[(Entry, Boolean)]) =>
- val results: List[List[Exn.Result[Unit]]] =
- for (args <- args0.grouped(20).toList) yield {
- for ((entry, _) <- args) {
- if (progress.stopped) entry.cancel() else entry.body.join
- }
- private_data.transaction_lock(db, label = "Export.consumer(" + args.length + ")") {
- var known = private_data.known_entries(db, args.map(p => p._1.entry_name))
- val buffer = new mutable.ListBuffer[Option[Entry]]
-
- for ((entry, strict) <- args) {
- if (progress.stopped || known(entry.entry_name)) {
- buffer += None
- if (strict && known(entry.entry_name)) {
- val msg = message("Duplicate export", entry.theory_name, entry.name)
- errors.change(msg :: _)
- }
- }
- else {
- buffer += Some(entry)
- known += entry.entry_name
- }
- }
-
- val entries = buffer.toList
- try {
- private_data.write_entries(db, entries.flatten)
- val ok = Exn.Res[Unit](())
- entries.map(_ => ok)
- }
- catch {
- case exn: Throwable =>
- val err = Exn.Exn[Unit](exn)
- entries.map(_ => err)
- }
- }
- }
- (results.flatten, true)
- })
+ consume = args => (args.grouped(20).toList.flatMap(consume), true))
def make_entry(session_name: String, args: Protocol.Export.Args, body: Bytes): Unit = {
if (!progress.stopped && !body.is_empty) {