clarified source structure;
authorwenzelm
Tue, 22 Aug 2023 11:54:06 +0200
changeset 78564 8ba186dc9bc8
parent 78563 1789ecbaf28b
child 78565 05de3e068312
clarified source structure;
src/Pure/System/progress.scala
src/Pure/Thy/export.scala
--- a/src/Pure/System/progress.scala	Tue Aug 22 11:33:25 2023 +0200
+++ b/src/Pure/System/progress.scala	Tue Aug 22 11:54:06 2023 +0200
@@ -321,33 +321,29 @@
     }
     if (context_uuid == _agent_uuid) db.vacuum(Progress.private_data.tables.list)
 
+    def consume(bulk_output: List[Progress.Output]): List[Exn.Result[Unit]] = sync_database {
+      if (bulk_output.nonEmpty) {
+        for (out <- bulk_output) {
+          out match {
+            case message: Progress.Message =>
+              if (do_output(message)) base_progress.output(message)
+            case theory: Progress.Theory => base_progress.theory(theory)
+          }
+        }
+
+        val messages =
+          for ((out, i) <- bulk_output.zipWithIndex)
+            yield (_serial + i + 1) -> out.message
+
+        Progress.private_data.write_messages(db, _context, messages)
+        _serial = messages.last._1
+      }
+      bulk_output.map(_ => Exn.Res(()))
+    }
+
     _consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")(
       bulk = _ => true, timeout = timeout,
-      consume = { bulk_output0 =>
-        val results =
-          for (bulk_output <- bulk_output0.grouped(200).toList) yield {
-            sync_database {
-              if (bulk_output.nonEmpty) {
-                for (out <- bulk_output) {
-                  out match {
-                    case message: Progress.Message =>
-                      if (do_output(message)) base_progress.output(message)
-                    case theory: Progress.Theory => base_progress.theory(theory)
-                  }
-                }
-
-                val messages =
-                  for ((out, i) <- bulk_output.zipWithIndex)
-                    yield (_serial + i + 1) -> out.message
-
-                Progress.private_data.write_messages(db, _context, messages)
-                _serial = messages.last._1
-              }
-              bulk_output.map(_ => Exn.Res(()))
-            }
-          }
-        (results.flatten, true)
-      })
+      consume = bulk_output => (bulk_output.grouped(200).toList.flatMap(consume), true))
   }
 
   def exit(close: Boolean = false): Unit = synchronized {
--- a/src/Pure/Thy/export.scala	Tue Aug 22 11:33:25 2023 +0200
+++ b/src/Pure/Thy/export.scala	Tue Aug 22 11:54:06 2023 +0200
@@ -253,49 +253,46 @@
   class Consumer private[Export](db: SQL.Database, cache: XML.Cache, progress: Progress) {
     private val errors = Synchronized[List[String]](Nil)
 
+    private def consume(args: List[(Entry, Boolean)]): List[Exn.Result[Unit]] = {
+      for ((entry, _) <- args) {
+        if (progress.stopped) entry.cancel() else entry.body.join
+      }
+      private_data.transaction_lock(db, label = "Export.consumer(" + args.length + ")") {
+        var known = private_data.known_entries(db, args.map(p => p._1.entry_name))
+        val buffer = new mutable.ListBuffer[Option[Entry]]
+
+        for ((entry, strict) <- args) {
+          if (progress.stopped || known(entry.entry_name)) {
+            buffer += None
+            if (strict && known(entry.entry_name)) {
+              val msg = message("Duplicate export", entry.theory_name, entry.name)
+              errors.change(msg :: _)
+            }
+          }
+          else {
+            buffer += Some(entry)
+            known += entry.entry_name
+          }
+        }
+
+        val entries = buffer.toList
+        try {
+          private_data.write_entries(db, entries.flatten)
+          val ok = Exn.Res[Unit](())
+          entries.map(_ => ok)
+        }
+        catch {
+          case exn: Throwable =>
+            val err = Exn.Exn[Unit](exn)
+            entries.map(_ => err)
+        }
+      }
+    }
+
     private val consumer =
       Consumer_Thread.fork_bulk[(Entry, Boolean)](name = "export")(
         bulk = { case (entry, _) => entry.is_finished },
-        consume =
-          { (args0: List[(Entry, Boolean)]) =>
-            val results: List[List[Exn.Result[Unit]]] =
-              for (args <- args0.grouped(20).toList) yield {
-                for ((entry, _) <- args) {
-                  if (progress.stopped) entry.cancel() else entry.body.join
-                }
-                private_data.transaction_lock(db, label = "Export.consumer(" + args.length + ")") {
-                  var known = private_data.known_entries(db, args.map(p => p._1.entry_name))
-                  val buffer = new mutable.ListBuffer[Option[Entry]]
-
-                  for ((entry, strict) <- args) {
-                    if (progress.stopped || known(entry.entry_name)) {
-                      buffer += None
-                      if (strict && known(entry.entry_name)) {
-                        val msg = message("Duplicate export", entry.theory_name, entry.name)
-                        errors.change(msg :: _)
-                      }
-                    }
-                    else {
-                      buffer += Some(entry)
-                      known += entry.entry_name
-                    }
-                  }
-
-                  val entries = buffer.toList
-                  try {
-                    private_data.write_entries(db, entries.flatten)
-                    val ok = Exn.Res[Unit](())
-                    entries.map(_ => ok)
-                  }
-                  catch {
-                    case exn: Throwable =>
-                      val err = Exn.Exn[Unit](exn)
-                      entries.map(_ => err)
-                  }
-                }
-              }
-            (results.flatten, true)
-          })
+        consume = args => (args.grouped(20).toList.flatMap(consume), true))
 
     def make_entry(session_name: String, args: Protocol.Export.Args, body: Bytes): Unit = {
       if (!progress.stopped && !body.is_empty) {