more parallelism via consumer thread: with mailbox limit to avoid ressource problems;
--- a/src/Pure/Admin/build_log.scala Tue Oct 31 16:11:26 2023 +0100
+++ b/src/Pure/Admin/build_log.scala Tue Oct 31 16:15:59 2023 +0100
@@ -1019,10 +1019,9 @@
): Multi_Map[String, String] = {
init_database(db)
- var errors1 = errors
- def add_error(name: String, exn: Throwable): Unit = {
- errors1 = errors1.insert(name, Exn.print(exn))
- }
+ val errors_result = Synchronized(errors)
+ def add_error(name: String, exn: Throwable): Unit =
+ errors_result.change(_.insert(name, Exn.print(exn)))
val files_domain = {
val names = files.map(Log_File.plain_name).toSet
@@ -1030,18 +1029,17 @@
}
abstract class Table_Status(table: SQL.Table) {
- private var known: Set[String] =
- read_domain(db, table, private_data.log_name,
- restriction = files_domain)
+ private val known =
+ Synchronized(read_domain(db, table, private_data.log_name, restriction = files_domain))
- def required(file: JFile): Boolean = !known(Log_File.plain_name(file))
- def required(log_file: Log_File): Boolean = !known(log_file.name)
+ def required(file: JFile): Boolean = !(known.value)(Log_File.plain_name(file))
+ def required(log_file: Log_File): Boolean = !(known.value)(log_file.name)
def update_db(db: SQL.Database, log_file: Log_File): Unit
def update(log_file: Log_File): Unit = {
if (required(log_file)) {
update_db(db, log_file)
- known += log_file.name
+ known.change(_ + log_file.name)
}
}
}
@@ -1072,20 +1070,31 @@
}
) ::: ml_statistics_status
- for (file <- files.iterator if status.exists(_.required(file))) {
- val log_name = Log_File.plain_name(file)
- progress.echo("Log " + quote(log_name), verbose = true)
- Exn.result { Log_File(file) } match {
- case Exn.Res(log_file) =>
+ val consumer =
+ Consumer_Thread.fork[Log_File]("build_log_database")(
+ consume = { log_file =>
private_data.transaction_lock(db, label = "build_log_database") {
try { status.foreach(_.update(log_file)) }
- catch { case exn: Throwable => add_error(log_name, exn) }
+ catch { case exn: Throwable => add_error(log_file.name, exn) }
}
- case Exn.Exn(exn) => add_error(log_name, exn)
+ true
+ },
+ limit = 1
+ )
+
+ try {
+ for (file <- files.iterator if status.exists(_.required(file))) {
+ val log_name = Log_File.plain_name(file)
+ progress.echo("Log " + quote(log_name), verbose = true)
+ Exn.result { Log_File(file) } match {
+ case Exn.Res(log_file) => consumer.send(log_file)
+ case Exn.Exn(exn) => add_error(log_name, exn)
+ }
}
}
+ finally { consumer.shutdown() }
- errors1
+ errors_result.value
}
def read_meta_info(db: SQL.Database, log_name: String): Option[Meta_Info] = {