more parallelism via consumer thread: with mailbox limit to avoid ressource problems;
authorwenzelm
Tue, 31 Oct 2023 16:15:59 +0100
changeset 78866 1bd52b048f8e
parent 78865 a0199212046a
child 78867 b02f8fb6b1b6
more parallelism via consumer thread: with mailbox limit to avoid ressource problems;
src/Pure/Admin/build_log.scala
--- a/src/Pure/Admin/build_log.scala	Tue Oct 31 16:11:26 2023 +0100
+++ b/src/Pure/Admin/build_log.scala	Tue Oct 31 16:15:59 2023 +0100
@@ -1019,10 +1019,9 @@
     ): Multi_Map[String, String] = {
       init_database(db)
 
-      var errors1 = errors
-      def add_error(name: String, exn: Throwable): Unit = {
-        errors1 = errors1.insert(name, Exn.print(exn))
-      }
+      val errors_result = Synchronized(errors)
+      def add_error(name: String, exn: Throwable): Unit =
+        errors_result.change(_.insert(name, Exn.print(exn)))
 
       val files_domain = {
         val names = files.map(Log_File.plain_name).toSet
@@ -1030,18 +1029,17 @@
       }
 
       abstract class Table_Status(table: SQL.Table) {
-        private var known: Set[String] =
-          read_domain(db, table, private_data.log_name,
-            restriction = files_domain)
+        private val known =
+          Synchronized(read_domain(db, table, private_data.log_name, restriction = files_domain))
 
-        def required(file: JFile): Boolean = !known(Log_File.plain_name(file))
-        def required(log_file: Log_File): Boolean = !known(log_file.name)
+        def required(file: JFile): Boolean = !(known.value)(Log_File.plain_name(file))
+        def required(log_file: Log_File): Boolean = !(known.value)(log_file.name)
 
         def update_db(db: SQL.Database, log_file: Log_File): Unit
         def update(log_file: Log_File): Unit = {
           if (required(log_file)) {
             update_db(db, log_file)
-            known += log_file.name
+            known.change(_ + log_file.name)
           }
         }
       }
@@ -1072,20 +1070,31 @@
           }
         ) ::: ml_statistics_status
 
-      for (file <- files.iterator if status.exists(_.required(file))) {
-        val log_name = Log_File.plain_name(file)
-        progress.echo("Log " + quote(log_name), verbose = true)
-        Exn.result { Log_File(file) } match {
-          case Exn.Res(log_file) =>
+      val consumer =
+        Consumer_Thread.fork[Log_File]("build_log_database")(
+          consume = { log_file =>
             private_data.transaction_lock(db, label = "build_log_database") {
               try { status.foreach(_.update(log_file)) }
-              catch { case exn: Throwable => add_error(log_name, exn) }
+              catch { case exn: Throwable => add_error(log_file.name, exn) }
             }
-          case Exn.Exn(exn) => add_error(log_name, exn)
+            true
+          },
+          limit = 1
+        )
+
+      try {
+        for (file <- files.iterator if status.exists(_.required(file))) {
+          val log_name = Log_File.plain_name(file)
+          progress.echo("Log " + quote(log_name), verbose = true)
+          Exn.result { Log_File(file) } match {
+            case Exn.Res(log_file) => consumer.send(log_file)
+            case Exn.Exn(exn) => add_error(log_name, exn)
+          }
         }
       }
+      finally { consumer.shutdown() }
 
-      errors1
+      errors_result.value
     }
 
     def read_meta_info(db: SQL.Database, log_name: String): Option[Meta_Info] = {