--- a/etc/build.props Fri Oct 27 18:27:06 2023 +0200
+++ b/etc/build.props Sat Oct 28 17:35:26 2023 +0200
@@ -153,6 +153,7 @@
src/Pure/PIDE/yxml.scala \
src/Pure/ROOT.scala \
src/Pure/System/bash.scala \
+ src/Pure/System/benchmark.scala \
src/Pure/System/classpath.scala \
src/Pure/System/command_line.scala \
src/Pure/System/components.scala \
@@ -195,6 +196,7 @@
src/Pure/Tools/build_cluster.scala \
src/Pure/Tools/build_job.scala \
src/Pure/Tools/build_process.scala \
+ src/Pure/Tools/build_schedule.scala \
src/Pure/Tools/check_keywords.scala \
src/Pure/Tools/debugger.scala \
src/Pure/Tools/doc.scala \
@@ -317,6 +319,7 @@
isabelle.Bash$Handler \
isabelle.Bibtex$File_Format \
isabelle.Build$Default_Engine \
+ isabelle.Build_Schedule$Engine \
isabelle.Document_Build$Build_Engine \
isabelle.Document_Build$LIPIcs_LuaLaTeX_Engine \
isabelle.Document_Build$LIPIcs_PDFLaTeX_Engine \
--- a/src/Pure/Admin/build_log.scala Fri Oct 27 18:27:06 2023 +0200
+++ b/src/Pure/Admin/build_log.scala Sat Oct 28 17:35:26 2023 +0200
@@ -400,6 +400,7 @@
sealed case class Session_Entry(
chapter: String = "",
groups: List[String] = Nil,
+ hostname: Option[String] = None,
threads: Option[Int] = None,
timing: Timing = Timing.zero,
ml_timing: Timing = Timing.zero,
@@ -443,7 +444,7 @@
val Session_Timing =
new Regex("""^Timing (\S+) \((\d+) threads, (\d+\.\d+)s elapsed time, (\d+\.\d+)s cpu time, (\d+\.\d+)s GC time.*$""")
val Session_Started1 = new Regex("""^(?:Running|Building) (\S+) \.\.\.$""")
- val Session_Started2 = new Regex("""^(?:Running|Building) (\S+) \(?on \S+\)? \.\.\.$""")
+ val Session_Started2 = new Regex("""^(?:Running|Building) (\S+) \(?on ([^\s/]+)/?(\d+)\+?(\S+)\)? \.\.\.$""")
val Sources = new Regex("""^Sources (\S+) (\S{""" + SHA1.digest_length + """})$""")
val Heap = new Regex("""^Heap (\S+) \((\d+) bytes\)$""")
@@ -460,6 +461,7 @@
var chapter = Map.empty[String, String]
var groups = Map.empty[String, List[String]]
+ var hostnames = Map.empty[String, String]
var threads = Map.empty[String, Int]
var timing = Map.empty[String, Timing]
var ml_timing = Map.empty[String, Timing]
@@ -489,8 +491,9 @@
case Session_Started1(name) =>
started += name
- case Session_Started2(name) =>
+ case Session_Started2(name, hostname, numa_node, rel_cpus) =>
started += name
+ hostnames += (name -> hostname)
case Session_Finished1(name,
Value.Int(e1), Value.Int(e2), Value.Int(e3),
@@ -555,6 +558,7 @@
Session_Entry(
chapter = chapter.getOrElse(name, ""),
groups = groups.getOrElse(name, Nil),
+ hostname = hostnames.get(name),
threads = threads.get(name),
timing = timing.getOrElse(name, Timing.zero),
ml_timing = ml_timing.getOrElse(name, Timing.zero),
@@ -628,8 +632,13 @@
/* SQL data model */
- object Data extends SQL.Data("isabelle_build_log") {
- override def tables: SQL.Tables = ???
+ object private_data extends SQL.Data("isabelle_build_log") {
+ override def tables: SQL.Tables =
+ SQL.Tables(
+ meta_info_table,
+ sessions_table,
+ theories_table,
+ ml_statistics_table)
/* main content */
@@ -639,6 +648,7 @@
val theory_name = SQL.Column.string("theory_name").make_primary_key
val chapter = SQL.Column.string("chapter")
val groups = SQL.Column.string("groups")
+ val hostname = SQL.Column.string("hostname")
val threads = SQL.Column.int("threads")
val timing_elapsed = SQL.Column.long("timing_elapsed")
val timing_cpu = SQL.Column.long("timing_cpu")
@@ -663,7 +673,7 @@
val sessions_table =
make_table(
- List(log_name, session_name, chapter, groups, threads, timing_elapsed, timing_cpu,
+ List(log_name, session_name, chapter, groups, hostname, threads, timing_elapsed, timing_cpu,
timing_gc, timing_factor, ml_timing_elapsed, ml_timing_cpu, ml_timing_gc, ml_timing_factor,
heap_size, status, errors, sources),
name = "sessions")
@@ -678,19 +688,6 @@
make_table(List(log_name, session_name, ml_statistics), name = "ml_statistics")
- /* AFP versions */
-
- val isabelle_afp_versions_table: SQL.Table = {
- val version1 = Prop.isabelle_version
- val version2 = Prop.afp_version
- make_table(List(version1.make_primary_key, version2),
- body =
- SQL.select(List(version1, version2), distinct = true) + meta_info_table +
- SQL.where_and(version1.defined, version2.defined),
- name = "isabelle_afp_versions")
- }
-
-
/* earliest pull date for repository version (PostgreSQL queries) */
def pull_date(afp: Boolean = false): SQL.Column =
@@ -842,6 +839,15 @@
ssh_user = options.string("build_log_ssh_user"),
synchronous_commit = options.string("build_log_database_synchronous_commit"))
+ def init_database(db: SQL.Database, minimal: Boolean = false): Unit =
+ private_data.transaction_lock(db, create = true, label = "build_log_init") {
+ if (!minimal) {
+ db.create_view(private_data.pull_date_table())
+ db.create_view(private_data.pull_date_table(afp = true))
+ }
+ db.create_view(private_data.universal_table)
+ }
+
def snapshot_database(
db: PostgreSQL.Database,
sqlite_database: Path,
@@ -855,15 +861,15 @@
db.transaction {
db2.transaction {
// main content
- db2.create_table(Data.meta_info_table)
- db2.create_table(Data.sessions_table)
- db2.create_table(Data.theories_table)
- db2.create_table(Data.ml_statistics_table)
+ db2.create_table(private_data.meta_info_table)
+ db2.create_table(private_data.sessions_table)
+ db2.create_table(private_data.theories_table)
+ db2.create_table(private_data.ml_statistics_table)
val recent_log_names =
db.execute_query_statement(
- Data.select_recent_log_names(days),
- List.from[String], res => res.string(Data.log_name))
+ private_data.select_recent_log_names(days),
+ List.from[String], res => res.string(private_data.log_name))
for (log_name <- recent_log_names) {
read_meta_info(db, log_name).foreach(meta_info =>
@@ -880,11 +886,11 @@
// pull_date
for (afp <- List(false, true)) {
val afp_rev = if (afp) Some("") else None
- val table = Data.pull_date_table(afp)
+ val table = private_data.pull_date_table(afp)
db2.create_table(table)
db2.using_statement(table.insert()) { stmt2 =>
db.using_statement(
- Data.recent_pull_date_table(days, afp_rev = afp_rev).query) { stmt =>
+ private_data.recent_pull_date_table(days, afp_rev = afp_rev).query) { stmt =>
using(stmt.execute_query()) { res =>
while (res.next()) {
for ((c, i) <- table.columns.zipWithIndex) {
@@ -898,7 +904,7 @@
}
// full view
- db2.create_view(Data.universal_table)
+ db2.create_view(private_data.universal_table)
}
}
db2.vacuum()
@@ -911,90 +917,93 @@
Set.from[String], res => res.string(column))
def update_meta_info(db: SQL.Database, log_name: String, meta_info: Meta_Info): Unit =
- db.using_statement(db.insert_permissive(Data.meta_info_table)) { stmt =>
- stmt.string(1) = log_name
- for ((c, i) <- Data.meta_info_table.columns.tail.zipWithIndex) {
- if (c.T == SQL.Type.Date) stmt.date(i + 2) = meta_info.get_date(c)
- else stmt.string(i + 2) = meta_info.get(c)
+ db.execute_statement(db.insert_permissive(private_data.meta_info_table),
+ { stmt =>
+ stmt.string(1) = log_name
+ for ((c, i) <- private_data.meta_info_table.columns.tail.zipWithIndex) {
+ if (c.T == SQL.Type.Date) stmt.date(i + 2) = meta_info.get_date(c)
+ else stmt.string(i + 2) = meta_info.get(c)
+ }
}
- stmt.execute()
- }
+ )
- def update_sessions(db: SQL.Database, log_name: String, build_info: Build_Info): Unit =
- db.using_statement(db.insert_permissive(Data.sessions_table)) { stmt =>
- val sessions =
- if (build_info.sessions.isEmpty) Build_Info.sessions_dummy
- else build_info.sessions
- for ((session_name, session) <- sessions) {
+ def update_sessions(db: SQL.Database, log_name: String, build_info: Build_Info): Unit = {
+ val sessions =
+ if (build_info.sessions.isEmpty) Build_Info.sessions_dummy
+ else build_info.sessions
+ db.execute_batch_statement(db.insert_permissive(private_data.sessions_table),
+ for ((session_name, session) <- sessions) yield { (stmt: SQL.Statement) =>
stmt.string(1) = log_name
stmt.string(2) = session_name
stmt.string(3) = proper_string(session.chapter)
stmt.string(4) = session.proper_groups
- stmt.int(5) = session.threads
- stmt.long(6) = session.timing.elapsed.proper_ms
- stmt.long(7) = session.timing.cpu.proper_ms
- stmt.long(8) = session.timing.gc.proper_ms
- stmt.double(9) = session.timing.factor
- stmt.long(10) = session.ml_timing.elapsed.proper_ms
- stmt.long(11) = session.ml_timing.cpu.proper_ms
- stmt.long(12) = session.ml_timing.gc.proper_ms
- stmt.double(13) = session.ml_timing.factor
- stmt.long(14) = session.heap_size.map(_.bytes)
- stmt.string(15) = session.status.map(_.toString)
- stmt.bytes(16) = compress_errors(session.errors, cache = cache.compress)
- stmt.string(17) = session.sources
- stmt.execute()
+ stmt.string(5) = session.hostname
+ stmt.int(6) = session.threads
+ stmt.long(7) = session.timing.elapsed.proper_ms
+ stmt.long(8) = session.timing.cpu.proper_ms
+ stmt.long(9) = session.timing.gc.proper_ms
+ stmt.double(10) = session.timing.factor
+ stmt.long(11) = session.ml_timing.elapsed.proper_ms
+ stmt.long(12) = session.ml_timing.cpu.proper_ms
+ stmt.long(13) = session.ml_timing.gc.proper_ms
+ stmt.double(14) = session.ml_timing.factor
+ stmt.long(15) = session.heap_size.map(_.bytes)
+ stmt.string(16) = session.status.map(_.toString)
+ stmt.bytes(17) = compress_errors(session.errors, cache = cache.compress)
+ stmt.string(18) = session.sources
}
- }
+ )
+ }
- def update_theories(db: SQL.Database, log_name: String, build_info: Build_Info): Unit =
- db.using_statement(db.insert_permissive(Data.theories_table)) { stmt =>
- val sessions =
- if (build_info.sessions.forall({ case (_, session) => session.theory_timings.isEmpty }))
- Build_Info.sessions_dummy
- else build_info.sessions
+ def update_theories(db: SQL.Database, log_name: String, build_info: Build_Info): Unit = {
+ val sessions =
+ if (build_info.sessions.forall({ case (_, session) => session.theory_timings.isEmpty }))
+ Build_Info.sessions_dummy
+ else build_info.sessions
+ db.execute_batch_statement(db.insert_permissive(private_data.theories_table),
for {
(session_name, session) <- sessions
(theory_name, timing) <- session.theory_timings
- } {
+ } yield { (stmt: SQL.Statement) =>
stmt.string(1) = log_name
stmt.string(2) = session_name
stmt.string(3) = theory_name
stmt.long(4) = timing.elapsed.ms
stmt.long(5) = timing.cpu.ms
stmt.long(6) = timing.gc.ms
- stmt.execute()
}
- }
+ )
+ }
- def update_ml_statistics(db: SQL.Database, log_name: String, build_info: Build_Info): Unit =
- db.using_statement(db.insert_permissive(Data.ml_statistics_table)) { stmt =>
- val ml_stats: List[(String, Option[Bytes])] =
- Par_List.map[(String, Session_Entry), (String, Option[Bytes])](
- { case (a, b) => (a, Properties.compress(b.ml_statistics, cache = cache.compress).proper) },
- build_info.sessions.iterator.filter(p => p._2.ml_statistics.nonEmpty).toList)
- val entries = if (ml_stats.nonEmpty) ml_stats else List("" -> None)
- for ((session_name, ml_statistics) <- entries) {
+ def update_ml_statistics(db: SQL.Database, log_name: String, build_info: Build_Info): Unit = {
+ val ml_stats: List[(String, Option[Bytes])] =
+ Par_List.map[(String, Session_Entry), (String, Option[Bytes])](
+ { case (a, b) => (a, Properties.compress(b.ml_statistics, cache = cache.compress).proper) },
+ build_info.sessions.iterator.filter(p => p._2.ml_statistics.nonEmpty).toList)
+ val entries = if (ml_stats.nonEmpty) ml_stats else List("" -> None)
+ db.execute_batch_statement(db.insert_permissive(private_data.ml_statistics_table),
+ for ((session_name, ml_statistics) <- entries) yield { (stmt: SQL.Statement) =>
stmt.string(1) = log_name
stmt.string(2) = session_name
stmt.bytes(3) = ml_statistics
- stmt.execute()
}
- }
+ )
+ }
def write_info(db: SQL.Database, files: List[JFile],
ml_statistics: Boolean = false,
progress: Progress = new Progress,
errors: Multi_Map[String, String] = Multi_Map.empty
): Multi_Map[String, String] = {
+ init_database(db)
+
var errors1 = errors
def add_error(name: String, exn: Throwable): Unit = {
errors1 = errors1.insert(name, Exn.print(exn))
}
abstract class Table_Status(table: SQL.Table) {
- db.create_table(table)
- private var known: Set[String] = domain(db, table, Data.log_name)
+ private var known: Set[String] = domain(db, table, private_data.log_name)
def required(file: JFile): Boolean = !known(Log_File.plain_name(file))
def required(log_file: Log_File): Boolean = !known(log_file.name)
@@ -1009,19 +1018,19 @@
}
val status =
List(
- new Table_Status(Data.meta_info_table) {
+ new Table_Status(private_data.meta_info_table) {
override def update_db(db: SQL.Database, log_file: Log_File): Unit =
update_meta_info(db, log_file.name, log_file.parse_meta_info())
},
- new Table_Status(Data.sessions_table) {
+ new Table_Status(private_data.sessions_table) {
override def update_db(db: SQL.Database, log_file: Log_File): Unit =
update_sessions(db, log_file.name, log_file.parse_build_info())
},
- new Table_Status(Data.theories_table) {
+ new Table_Status(private_data.theories_table) {
override def update_db(db: SQL.Database, log_file: Log_File): Unit =
update_theories(db, log_file.name, log_file.parse_build_info())
},
- new Table_Status(Data.ml_statistics_table) {
+ new Table_Status(private_data.ml_statistics_table) {
override def update_db(db: SQL.Database, log_file: Log_File): Unit =
if (ml_statistics) {
update_ml_statistics(db, log_file.name,
@@ -1029,15 +1038,15 @@
}
})
- val file_groups =
+ val file_groups_iterator =
files.filter(file => status.exists(_.required(file))).
grouped(options.int("build_log_transaction_size") max 1)
- for (file_group <- file_groups) {
+ for (file_group <- file_groups_iterator) {
val log_files =
Par_List.map[JFile, Exn.Result[Log_File]](
file => Exn.result { Log_File(file) }, file_group)
- db.transaction {
+ private_data.transaction_lock(db, label = "build_log_database") {
for (case Exn.Res(log_file) <- log_files) {
progress.echo("Log " + quote(log_file.name), verbose = true)
try { status.foreach(_.update(log_file)) }
@@ -1049,18 +1058,14 @@
}
}
- db.create_view(Data.pull_date_table())
- db.create_view(Data.pull_date_table(afp = true))
- db.create_view(Data.universal_table)
-
errors1
}
def read_meta_info(db: SQL.Database, log_name: String): Option[Meta_Info] = {
- val table = Data.meta_info_table
+ val table = private_data.meta_info_table
val columns = table.columns.tail
db.execute_query_statementO[Meta_Info](
- table.select(columns, sql = Data.log_name.where_equal(log_name)),
+ table.select(columns, sql = private_data.log_name.where_equal(log_name)),
{ res =>
val results =
columns.map(c => c.name ->
@@ -1082,56 +1087,57 @@
session_names: List[String] = Nil,
ml_statistics: Boolean = false
): Build_Info = {
- val table1 = Data.sessions_table
- val table2 = Data.ml_statistics_table
+ val table1 = private_data.sessions_table
+ val table2 = private_data.ml_statistics_table
val columns1 = table1.columns.tail.map(_.apply(table1))
val (columns, from) =
if (ml_statistics) {
- val columns = columns1 ::: List(Data.ml_statistics(table2))
+ val columns = columns1 ::: List(private_data.ml_statistics(table2))
val join =
table1.ident + SQL.join_outer + table2.ident + " ON " +
SQL.and(
- Data.log_name(table1).ident + " = " + Data.log_name(table2).ident,
- Data.session_name(table1).ident + " = " + Data.session_name(table2).ident)
+ private_data.log_name(table1).ident + " = " + private_data.log_name(table2).ident,
+ private_data.session_name(table1).ident + " = " + private_data.session_name(table2).ident)
(columns, SQL.enclose(join))
}
else (columns1, table1.ident)
val where =
SQL.where_and(
- Data.log_name(table1).equal(log_name),
- Data.session_name(table1).ident + " <> ''",
- if_proper(session_names, Data.session_name(table1).member(session_names)))
+ private_data.log_name(table1).equal(log_name),
+ private_data.session_name(table1).ident + " <> ''",
+ if_proper(session_names, private_data.session_name(table1).member(session_names)))
val sessions =
db.execute_query_statement(
SQL.select(columns, sql = from + where),
Map.from[String, Session_Entry],
{ res =>
- val session_name = res.string(Data.session_name)
+ val session_name = res.string(private_data.session_name)
val session_entry =
Session_Entry(
- chapter = res.string(Data.chapter),
- groups = split_lines(res.string(Data.groups)),
- threads = res.get_int(Data.threads),
+ chapter = res.string(private_data.chapter),
+ groups = split_lines(res.string(private_data.groups)),
+ hostname = res.get_string(private_data.hostname),
+ threads = res.get_int(private_data.threads),
timing =
res.timing(
- Data.timing_elapsed,
- Data.timing_cpu,
- Data.timing_gc),
+ private_data.timing_elapsed,
+ private_data.timing_cpu,
+ private_data.timing_gc),
ml_timing =
res.timing(
- Data.ml_timing_elapsed,
- Data.ml_timing_cpu,
- Data.ml_timing_gc),
- sources = res.get_string(Data.sources),
- heap_size = res.get_long(Data.heap_size).map(Space.bytes),
- status = res.get_string(Data.status).map(Session_Status.valueOf),
- errors = uncompress_errors(res.bytes(Data.errors), cache = cache),
+ private_data.ml_timing_elapsed,
+ private_data.ml_timing_cpu,
+ private_data.ml_timing_gc),
+ sources = res.get_string(private_data.sources),
+ heap_size = res.get_long(private_data.heap_size).map(Space.bytes),
+ status = res.get_string(private_data.status).map(Session_Status.valueOf),
+ errors = uncompress_errors(res.bytes(private_data.errors), cache = cache),
ml_statistics =
if (ml_statistics) {
- Properties.uncompress(res.bytes(Data.ml_statistics), cache = cache)
+ Properties.uncompress(res.bytes(private_data.ml_statistics), cache = cache)
}
else Nil)
session_name -> session_entry
--- a/src/Pure/Admin/build_status.scala Fri Oct 27 18:27:06 2023 +0200
+++ b/src/Pure/Admin/build_status.scala Sat Oct 28 17:35:26 2023 +0200
@@ -38,36 +38,36 @@
): PostgreSQL.Source = {
val columns =
List(
- Build_Log.Data.pull_date(afp = false),
- Build_Log.Data.pull_date(afp = true),
+ Build_Log.private_data.pull_date(afp = false),
+ Build_Log.private_data.pull_date(afp = true),
Build_Log.Prop.build_host,
Build_Log.Prop.isabelle_version,
Build_Log.Prop.afp_version,
Build_Log.Settings.ISABELLE_BUILD_OPTIONS,
Build_Log.Settings.ML_PLATFORM,
- Build_Log.Data.session_name,
- Build_Log.Data.chapter,
- Build_Log.Data.groups,
- Build_Log.Data.threads,
- Build_Log.Data.timing_elapsed,
- Build_Log.Data.timing_cpu,
- Build_Log.Data.timing_gc,
- Build_Log.Data.ml_timing_elapsed,
- Build_Log.Data.ml_timing_cpu,
- Build_Log.Data.ml_timing_gc,
- Build_Log.Data.heap_size,
- Build_Log.Data.status,
- Build_Log.Data.errors) :::
- (if (ml_statistics) List(Build_Log.Data.ml_statistics) else Nil)
+ Build_Log.private_data.session_name,
+ Build_Log.private_data.chapter,
+ Build_Log.private_data.groups,
+ Build_Log.private_data.threads,
+ Build_Log.private_data.timing_elapsed,
+ Build_Log.private_data.timing_cpu,
+ Build_Log.private_data.timing_gc,
+ Build_Log.private_data.ml_timing_elapsed,
+ Build_Log.private_data.ml_timing_cpu,
+ Build_Log.private_data.ml_timing_gc,
+ Build_Log.private_data.heap_size,
+ Build_Log.private_data.status,
+ Build_Log.private_data.errors) :::
+ (if (ml_statistics) List(Build_Log.private_data.ml_statistics) else Nil)
- Build_Log.Data.universal_table.select(columns, distinct = true, sql =
+ Build_Log.private_data.universal_table.select(columns, distinct = true, sql =
SQL.where_and(
- Build_Log.Data.pull_date(afp).ident + " > " + Build_Log.Data.recent_time(days(options)),
- Build_Log.Data.status.member(
+ Build_Log.private_data.pull_date(afp).ident + " > " + Build_Log.private_data.recent_time(days(options)),
+ Build_Log.private_data.status.member(
List(
Build_Log.Session_Status.finished.toString,
Build_Log.Session_Status.failed.toString)),
- if_proper(only_sessions, Build_Log.Data.session_name.member(only_sessions)),
+ if_proper(only_sessions, Build_Log.private_data.session_name.member(only_sessions)),
if_proper(sql, SQL.enclose(sql))))
}
}
@@ -261,16 +261,16 @@
db.using_statement(sql) { stmt =>
using(stmt.execute_query()) { res =>
while (res.next()) {
- val session_name = res.string(Build_Log.Data.session_name)
- val chapter = res.string(Build_Log.Data.chapter)
- val groups = split_lines(res.string(Build_Log.Data.groups))
+ val session_name = res.string(Build_Log.private_data.session_name)
+ val chapter = res.string(Build_Log.private_data.chapter)
+ val groups = split_lines(res.string(Build_Log.private_data.groups))
val threads = {
val threads1 =
res.string(Build_Log.Settings.ISABELLE_BUILD_OPTIONS) match {
case Threads_Option(Value.Int(i)) => i
case _ => 1
}
- val threads2 = res.get_int(Build_Log.Data.threads).getOrElse(1)
+ val threads2 = res.get_int(Build_Log.private_data.threads).getOrElse(1)
threads1 max threads2
}
val ml_platform = res.string(Build_Log.Settings.ML_PLATFORM)
@@ -292,7 +292,7 @@
val ml_stats =
ML_Statistics(
if (ml_statistics) {
- Properties.uncompress(res.bytes(Build_Log.Data.ml_statistics), cache = store.cache)
+ Properties.uncompress(res.bytes(Build_Log.private_data.ml_statistics), cache = store.cache)
}
else Nil,
domain = ml_statistics_domain,
@@ -301,32 +301,32 @@
val entry =
Entry(
chapter = chapter,
- pull_date = res.date(Build_Log.Data.pull_date(afp = false)),
+ pull_date = res.date(Build_Log.private_data.pull_date(afp = false)),
afp_pull_date =
- if (afp) res.get_date(Build_Log.Data.pull_date(afp = true)) else None,
+ if (afp) res.get_date(Build_Log.private_data.pull_date(afp = true)) else None,
isabelle_version = isabelle_version,
afp_version = afp_version,
timing =
res.timing(
- Build_Log.Data.timing_elapsed,
- Build_Log.Data.timing_cpu,
- Build_Log.Data.timing_gc),
+ Build_Log.private_data.timing_elapsed,
+ Build_Log.private_data.timing_cpu,
+ Build_Log.private_data.timing_gc),
ml_timing =
res.timing(
- Build_Log.Data.ml_timing_elapsed,
- Build_Log.Data.ml_timing_cpu,
- Build_Log.Data.ml_timing_gc),
+ Build_Log.private_data.ml_timing_elapsed,
+ Build_Log.private_data.ml_timing_cpu,
+ Build_Log.private_data.ml_timing_gc),
maximum_code = Space.B(ml_stats.maximum(ML_Statistics.CODE_SIZE)),
average_code = Space.B(ml_stats.average(ML_Statistics.CODE_SIZE)),
maximum_stack = Space.B(ml_stats.maximum(ML_Statistics.STACK_SIZE)),
average_stack = Space.B(ml_stats.average(ML_Statistics.STACK_SIZE)),
maximum_heap = Space.B(ml_stats.maximum(ML_Statistics.HEAP_SIZE)),
average_heap = Space.B(ml_stats.average(ML_Statistics.HEAP_SIZE)),
- stored_heap = Space.bytes(res.long(Build_Log.Data.heap_size)),
- status = Build_Log.Session_Status.valueOf(res.string(Build_Log.Data.status)),
+ stored_heap = Space.bytes(res.long(Build_Log.private_data.heap_size)),
+ status = Build_Log.Session_Status.valueOf(res.string(Build_Log.private_data.status)),
errors =
Build_Log.uncompress_errors(
- res.bytes(Build_Log.Data.errors), cache = store.cache))
+ res.bytes(Build_Log.private_data.errors), cache = store.cache))
val sessions = data_entries.getOrElse(data_name, Map.empty)
val session =
--- a/src/Pure/Admin/isabelle_cronjob.scala Fri Oct 27 18:27:06 2023 +0200
+++ b/src/Pure/Admin/isabelle_cronjob.scala Sat Oct 28 17:35:26 2023 +0200
@@ -118,14 +118,14 @@
val afp = afp_rev.isDefined
db.execute_query_statement(
- Build_Log.Data.select_recent_versions(
+ Build_Log.private_data.select_recent_versions(
days = days, rev = rev, afp_rev = afp_rev, sql = SQL.where(sql)),
List.from[Item],
{ res =>
- val known = res.bool(Build_Log.Data.known)
+ val known = res.bool(Build_Log.private_data.known)
val isabelle_version = res.string(Build_Log.Prop.isabelle_version)
val afp_version = if (afp) proper_string(res.string(Build_Log.Prop.afp_version)) else None
- val pull_date = res.date(Build_Log.Data.pull_date(afp))
+ val pull_date = res.date(Build_Log.private_data.pull_date(afp))
Item(known, isabelle_version, afp_version, pull_date)
})
}
--- a/src/Pure/Concurrent/isabelle_thread.scala Fri Oct 27 18:27:06 2023 +0200
+++ b/src/Pure/Concurrent/isabelle_thread.scala Sat Oct 28 17:35:26 2023 +0200
@@ -74,7 +74,7 @@
def max_threads(): Int = {
val m = Value.Int.unapply(System.getProperty("isabelle.threads", "0")) getOrElse 0
- if (m > 0) m else (Runtime.getRuntime.availableProcessors max 1) min 8
+ if (m > 0) m else (Host.num_cpus() max 1) min 8
}
lazy val pool: ThreadPoolExecutor = {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/System/benchmark.scala Sat Oct 28 17:35:26 2023 +0200
@@ -0,0 +1,70 @@
+/* Title: Pure/System/benchmark.scala
+ Author: Fabian Huch, TU Muenchen
+
+Host platform benchmarks for performance estimation.
+*/
+
+package isabelle
+
+
+object Benchmark {
+
+ def benchmark_command(
+ host: Build_Cluster.Host,
+ ssh: SSH.System = SSH.Local,
+ isabelle_home: Path = Path.current,
+ ): String = {
+ val options = Options.Spec("build_hostname", Some(host.name)) :: host.options
+ ssh.bash_path(isabelle_home + Path.explode("bin/isabelle")) + " benchmark" +
+ options.map(opt => " -o " + Bash.string(opt.print)).mkString +
+ " " + Bash.string(host.name)
+ }
+
+ def benchmark(options: Options, progress: Progress = new Progress()): Unit = {
+ val hostname = options.string("build_hostname")
+ val store = Store(options)
+
+ using(store.open_server()) { server =>
+ val db = store.open_build_database(path = Host.private_data.database, server = server)
+
+ progress.echo("Starting benchmark...")
+ val start = Time.now()
+
+ // TODO proper benchmark
+ def fib(n: Long): Long = if (n < 2) 1 else fib(n - 2) + fib(n - 1)
+ val result = fib(42)
+
+ val stop = Time.now()
+ val timing = stop - start
+
+ val score = Time.seconds(100).ms.toDouble / (1 + timing.ms)
+ progress.echo(
+ "Finished benchmark in " + timing.message + ". Score: " + String.format("%.2f", score))
+
+ Host.write_info(db, Host.Info.gather(hostname, score = Some(score)))
+ }
+ }
+
+ val isabelle_tool = Isabelle_Tool("benchmark", "run system benchmark",
+ Scala_Project.here,
+ { args =>
+ var options = Options.init()
+
+ val getopts = Getopts("""
+Usage: isabelle benchmark [OPTIONS]
+
+ Options are:
+ -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME)
+
+ Run a system benchmark.
+""",
+ "o:" -> (arg => options = options + arg))
+
+ val more_args = getopts(args)
+ if (more_args.nonEmpty) getopts.usage()
+
+ val progress = new Console_Progress()
+
+ benchmark(options, progress = progress)
+ })
+}
\ No newline at end of file
--- a/src/Pure/System/host.scala Fri Oct 27 18:27:06 2023 +0200
+++ b/src/Pure/System/host.scala Sat Oct 28 17:35:26 2023 +0200
@@ -12,50 +12,104 @@
object Host {
- /* process policy via numactl tool */
+
+ object Range {
+ val Single = """^(\d+)$""".r
+ val Multiple = """^(\d+)-(\d+)$""".r
+
+ def apply(range: List[Int]): String =
+ range match {
+ case Nil => ""
+ case x :: xs =>
+ def elem(start: Int, stop: Int): String =
+ if (start == stop) start.toString else start.toString + "-" + stop.toString
+
+ val (elems, (r0, rn)) =
+ xs.foldLeft((List.empty[String], (x, x))) {
+ case ((rs, (r0, rn)), x) =>
+ if (rn + 1 == x) (rs, (r0, x)) else (rs :+ elem(r0, rn), (x, x))
+ }
+
+ (elems :+ elem(r0, rn)).mkString(",")
+ }
- def numactl(node: Int): String = "numactl -m" + node + " -N" + node
- def numactl_ok(node: Int): Boolean = Isabelle_System.bash(numactl(node) + " true").ok
+ def unapply(s: String): Option[List[Int]] =
+ space_explode(',', s).foldRight(Option(List.empty[Int])) {
+ case (Single(Value.Int(i)), Some(elems)) => Some(i :: elems)
+ case (Multiple(Value.Int(i), Value.Int(j)), Some(elems)) => Some((i to j).toList ::: elems)
+ case _ => None
+ }
- def process_policy_options(options: Options, numa_node: Option[Int]): Options =
+ def from(s: String): List[Int] =
+ s match {
+ case Range(r) => r
+ case _ => Nil
+ }
+ }
+
+
+ /* process policy via numactl and taskset tools */
+
+ def taskset(cpus: List[Int]): String = "taskset --cpu-list " + Range(cpus)
+ def taskset_ok(cpus: List[Int]): Boolean = Isabelle_System.bash(taskset(cpus) + " true").ok
+
+ def numactl(node: Int, rel_cpus: List[Int] = Nil): String =
+ "numactl -m" + node + " -N" + node + if_proper(rel_cpus, " -C+" + Range(rel_cpus))
+ def numactl_ok(node: Int, rel_cpus: List[Int] = Nil): Boolean =
+ Isabelle_System.bash(numactl(node, rel_cpus) + " true").ok
+
+ def numa_options(options: Options, numa_node: Option[Int]): Options =
numa_node match {
case None => options
case Some(node) =>
options.string("process_policy") = if (numactl_ok(node)) numactl(node) else ""
}
-
- /* allocated resources */
+ def node_options(options: Options, node: Node_Info): Options = {
+ val threads_options =
+ if (node.rel_cpus.isEmpty) options else options.int("threads") = node.rel_cpus.length
- object Node_Info { def none: Node_Info = Node_Info("", None) }
-
- sealed case class Node_Info(hostname: String, numa_node: Option[Int]) {
- override def toString: String =
- hostname + if_proper(numa_node, "/" + numa_node.get.toString)
+ node.numa_node match {
+ case None if node.rel_cpus.isEmpty =>
+ threads_options
+ case Some(numa_node) =>
+ threads_options.string("process_policy") =
+ if (numactl_ok(numa_node, node.rel_cpus)) numactl(numa_node, node.rel_cpus) else ""
+ case _ =>
+ threads_options.string("process_policy") =
+ if (taskset_ok(node.rel_cpus)) taskset(node.rel_cpus) else ""
+ }
}
- /* available NUMA nodes */
+ /* allocated resources */
+
+ object Node_Info { def none: Node_Info = Node_Info("", None, Nil) }
+
+ sealed case class Node_Info(hostname: String, numa_node: Option[Int], rel_cpus: List[Int]) {
+ override def toString: String =
+ hostname +
+ if_proper(numa_node, "/" + numa_node.get.toString) +
+ if_proper(rel_cpus, "+" + Range(rel_cpus))
+ }
+
+
+ /* statically available resources */
private val numa_info_linux: Path = Path.explode("/sys/devices/system/node/online")
- def numa_nodes(enabled: Boolean = true, ssh: SSH.System = SSH.Local): List[Int] = {
- val Single = """^(\d+)$""".r
- val Multiple = """^(\d+)-(\d+)$""".r
+ def parse_numa_info(numa_info: String): List[Int] =
+ numa_info match {
+ case Range(nodes) => nodes
+ case s => error("Cannot parse CPU NUMA node specification: " + quote(s))
+ }
- def parse(s: String): List[Int] =
- s match {
- case Single(Value.Int(i)) => List(i)
- case Multiple(Value.Int(i), Value.Int(j)) => (i to j).toList
- case _ => error("Cannot parse CPU NUMA node specification: " + quote(s))
- }
-
+ def numa_nodes(enabled: Boolean = true, ssh: SSH.System = SSH.Local): List[Int] = {
val numa_info = if (ssh.isabelle_platform.is_linux) Some(numa_info_linux) else None
for {
path <- numa_info.toList
if enabled && ssh.is_file(path)
- s <- space_explode(',', ssh.read(path).trim)
- n <- parse(s)
+ n <- parse_numa_info(ssh.read(path).trim)
} yield n
}
@@ -68,6 +122,28 @@
}
catch { case ERROR(_) => None }
+ def num_cpus(ssh: SSH.System = SSH.Local): Int =
+ if (ssh.is_local) Runtime.getRuntime.availableProcessors
+ else {
+ val command =
+ if (ssh.isabelle_platform.is_macos) "sysctl -n hw.ncpu" else "nproc"
+ val result = ssh.execute(command).check
+ Library.trim_line(result.out) match {
+ case Value.Int(n) => n
+ case _ => 1
+ }
+ }
+
+ object Info {
+ def gather(hostname: String, ssh: SSH.System = SSH.Local, score: Option[Double] = None): Info =
+ Info(hostname, numa_nodes(ssh = ssh), num_cpus(ssh = ssh), score)
+ }
+
+ sealed case class Info(
+ hostname: String,
+ numa_nodes: List[Int],
+ num_cpus: Int,
+ benchmark_score: Option[Double])
/* shuffling of NUMA nodes */
@@ -95,7 +171,7 @@
object private_data extends SQL.Data("isabelle_host") {
val database: Path = Path.explode("$ISABELLE_HOME_USER/host.db")
- override lazy val tables = SQL.Tables(Node_Info.table)
+ override lazy val tables = SQL.Tables(Node_Info.table, Info.table)
object Node_Info {
val hostname = SQL.Column.string("hostname").make_primary_key
@@ -119,6 +195,38 @@
stmt.int(2) = numa_next
})
}
+
+ object Info {
+ val hostname = SQL.Column.string("hostname").make_primary_key
+ val numa_info = SQL.Column.string("numa_info")
+ val num_cpus = SQL.Column.int("num_cpus")
+ val benchmark_score = SQL.Column.double("benchmark_score")
+
+ val table =
+ make_table(List(hostname, numa_info, num_cpus, benchmark_score), name = "info")
+ }
+
+ def write_info(db: SQL.Database, info: Info): Unit = {
+ db.execute_statement(Info.table.delete(sql = Info.hostname.where_equal(info.hostname)))
+ db.execute_statement(Info.table.insert(), body =
+ { stmt =>
+ stmt.string(1) = info.hostname
+ stmt.string(2) = info.numa_nodes.mkString(",")
+ stmt.int(3) = info.num_cpus
+ stmt.double(4) = info.benchmark_score
+ })
+ }
+
+ def read_info(db: SQL.Database, hostname: String): Option[Host.Info] =
+ db.execute_query_statementO[Host.Info](
+ Info.table.select(Info.table.columns.tail, sql = Info.hostname.where_equal(hostname)),
+ { res =>
+ val numa_info = res.string(Info.numa_info)
+ val num_cpus = res.int(Info.num_cpus)
+ val benchmark_score = res.get_double(Info.benchmark_score)
+
+ Host.Info(hostname, parse_numa_info(numa_info), num_cpus, benchmark_score)
+ })
}
def next_numa_node(
@@ -145,4 +253,13 @@
Some(n)
}
}
+
+ def write_info(db: SQL.Database, info: Info): Unit =
+ private_data.transaction_lock(db, create = true, label = "Host.write_info") {
+ private_data.write_info(db, info)
+ }
+ def read_info(db: SQL.Database, hostname: String): Option[Host.Info] =
+ private_data.transaction_lock(db, create = true, label = "Host.read_info") {
+ private_data.read_info(db, hostname)
+ }
}
--- a/src/Pure/System/isabelle_tool.scala Fri Oct 27 18:27:06 2023 +0200
+++ b/src/Pure/System/isabelle_tool.scala Sat Oct 28 17:35:26 2023 +0200
@@ -120,6 +120,7 @@
class Isabelle_Scala_Tools(val tools: Isabelle_Tool*) extends Isabelle_System.Service
class Tools extends Isabelle_Scala_Tools(
+ Benchmark.isabelle_tool,
Build.isabelle_tool1,
Build.isabelle_tool2,
Build.isabelle_tool3,
--- a/src/Pure/Tools/build.scala Fri Oct 27 18:27:06 2023 +0200
+++ b/src/Pure/Tools/build.scala Sat Oct 28 17:35:26 2023 +0200
@@ -114,9 +114,6 @@
else options1 + "build_database_server" + "build_database"
}
- def process_options(options: Options, node_info: Host.Node_Info): Options =
- Host.process_policy_options(options, node_info.numa_node)
-
final def build_store(options: Options,
build_hosts: List[Build_Cluster.Host] = Nil,
cache: Term.Cache = Term.Cache.make()
--- a/src/Pure/Tools/build_cluster.scala Fri Oct 27 18:27:06 2023 +0200
+++ b/src/Pure/Tools/build_cluster.scala Sat Oct 28 17:35:26 2023 +0200
@@ -158,6 +158,12 @@
def init(): Unit = remote_isabelle.init()
+ def benchmark(): Unit = {
+ val script =
+ Benchmark.benchmark_command(host, ssh = ssh, isabelle_home = remote_isabelle_home)
+ remote_isabelle.bash(script).check
+ }
+
def start(): Process_Result = {
val remote_ml_platform = remote_isabelle.getenv("ML_PLATFORM")
if (remote_ml_platform != build_context.ml_platform) {
@@ -207,6 +213,7 @@
def return_code(exn: Throwable): Unit = return_code(Process_Result.RC(exn))
def open(): Unit = ()
def init(): Unit = ()
+ def benchmark(): Unit = ()
def start(): Unit = ()
def active(): Boolean = false
def join: List[Build_Cluster.Result] = Nil
@@ -269,7 +276,7 @@
}
- /* init remote Isabelle distributions */
+ /* init and benchmark remote Isabelle distributions */
private var _init = List.empty[Future[Unit]]
@@ -286,6 +293,17 @@
}
}
}
+
+ override def benchmark(): Unit = synchronized {
+ _init.foreach(_.join)
+ _init =
+ for (session <- _sessions if !session.host.shared) yield {
+ Future.thread(session.host.message("session")) {
+ capture(session.host, "benchmark") { session.benchmark() }
+ }
+ }
+ _init.foreach(_.join)
+ }
/* workers */
--- a/src/Pure/Tools/build_job.scala Fri Oct 27 18:27:06 2023 +0200
+++ b/src/Pure/Tools/build_job.scala Sat Oct 28 17:35:26 2023 +0200
@@ -117,7 +117,7 @@
private val future_result: Future[Option[Result]] =
Future.thread("build", uninterruptible = true) {
val info = session_background.sessions_structure(session_name)
- val options = build_context.engine.process_options(info.options, node_info)
+ val options = Host.node_options(info.options, node_info)
val store = build_context.store
--- a/src/Pure/Tools/build_process.scala Fri Oct 27 18:27:06 2023 +0200
+++ b/src/Pure/Tools/build_process.scala Sat Oct 28 17:35:26 2023 +0200
@@ -56,6 +56,7 @@
worker_uuid: String,
build_uuid: String,
node_info: Host.Node_Info,
+ start_date: Date,
build: Option[Build_Job]
) extends Library.Named {
def join_build: Option[Build_Job.Result] = build.flatMap(_.join)
@@ -614,9 +615,13 @@
val build_uuid = Generic.build_uuid
val hostname = SQL.Column.string("hostname")
val numa_node = SQL.Column.int("numa_node")
+ val rel_cpus = SQL.Column.string("rel_cpus")
+ val start_date = SQL.Column.date("start_date")
val table =
- make_table(List(name, worker_uuid, build_uuid, hostname, numa_node), name = "running")
+ make_table(
+ List(name, worker_uuid, build_uuid, hostname, numa_node, rel_cpus, start_date),
+ name = "running")
}
def read_running(db: SQL.Database): State.Running =
@@ -629,7 +634,11 @@
val build_uuid = res.string(Running.build_uuid)
val hostname = res.string(Running.hostname)
val numa_node = res.get_int(Running.numa_node)
- name -> Job(name, worker_uuid, build_uuid, Host.Node_Info(hostname, numa_node), None)
+ val rel_cpus = res.string(Running.rel_cpus)
+ val start_date = res.date(Running.start_date)
+
+ val node_info = Host.Node_Info(hostname, numa_node, Host.Range.from(rel_cpus))
+ name -> Job(name, worker_uuid, build_uuid, node_info, start_date, None)
}
)
@@ -655,6 +664,8 @@
stmt.string(3) = job.build_uuid
stmt.string(4) = job.node_info.hostname
stmt.int(5) = job.node_info.numa_node
+ stmt.string(6) = Host.Range(job.node_info.rel_cpus)
+ stmt.date(7) = job.start_date
})
}
@@ -669,7 +680,8 @@
val worker_uuid = Generic.worker_uuid
val build_uuid = Generic.build_uuid
val hostname = SQL.Column.string("hostname")
- val numa_node = SQL.Column.string("numa_node")
+ val numa_node = SQL.Column.int("numa_node")
+ val rel_cpus = SQL.Column.string("rel_cpus")
val rc = SQL.Column.int("rc")
val out = SQL.Column.string("out")
val err = SQL.Column.string("err")
@@ -681,7 +693,7 @@
val table =
make_table(
- List(name, worker_uuid, build_uuid, hostname, numa_node,
+ List(name, worker_uuid, build_uuid, hostname, numa_node, rel_cpus,
rc, out, err, timing_elapsed, timing_cpu, timing_gc, output_shasum, current),
name = "results")
}
@@ -701,7 +713,8 @@
val build_uuid = res.string(Results.build_uuid)
val hostname = res.string(Results.hostname)
val numa_node = res.get_int(Results.numa_node)
- val node_info = Host.Node_Info(hostname, numa_node)
+ val rel_cpus = res.string(Results.rel_cpus)
+ val node_info = Host.Node_Info(hostname, numa_node, Host.Range.from(rel_cpus))
val rc = res.int(Results.rc)
val out = res.string(Results.out)
@@ -742,14 +755,15 @@
stmt.string(3) = result.build_uuid
stmt.string(4) = result.node_info.hostname
stmt.int(5) = result.node_info.numa_node
- stmt.int(6) = process_result.rc
- stmt.string(7) = cat_lines(process_result.out_lines)
- stmt.string(8) = cat_lines(process_result.err_lines)
- stmt.long(9) = process_result.timing.elapsed.ms
- stmt.long(10) = process_result.timing.cpu.ms
- stmt.long(11) = process_result.timing.gc.ms
- stmt.string(12) = result.output_shasum.toString
- stmt.bool(13) = result.current
+ stmt.string(6) = Host.Range(result.node_info.rel_cpus)
+ stmt.int(7) = process_result.rc
+ stmt.string(8) = cat_lines(process_result.out_lines)
+ stmt.string(9) = cat_lines(process_result.err_lines)
+ stmt.long(10) = process_result.timing.elapsed.ms
+ stmt.long(11) = process_result.timing.cpu.ms
+ stmt.long(12) = process_result.timing.gc.ms
+ stmt.string(13) = result.output_shasum.toString
+ stmt.bool(14) = result.current
})
}
@@ -872,8 +886,8 @@
build_options.seconds(option)
}
- private val _host_database: Option[SQL.Database] =
- try { store.maybe_open_build_database(path = Host.private_data.database, server = server) }
+ protected val _host_database: SQL.Database =
+ try { store.open_build_database(path = Host.private_data.database, server = server) }
catch { case exn: Throwable => close(); throw exn }
protected val (progress, worker_uuid) = synchronized {
@@ -913,7 +927,7 @@
def close(): Unit = synchronized {
Option(_database_server).flatten.foreach(_.close())
Option(_build_database).flatten.foreach(_.close())
- Option(_host_database).flatten.foreach(_.close())
+ Option(_host_database).foreach(_.close())
Option(_build_cluster).foreach(_.close())
progress match {
case db_progress: Database_Progress => db_progress.exit(close = true)
@@ -970,6 +984,13 @@
else Nil
}
+ protected def next_node_info(state: Build_Process.State, session_name: String): Host.Node_Info = {
+ def used_nodes: Set[Int] =
+ Set.from(for (job <- state.running.valuesIterator; i <- job.node_info.numa_node) yield i)
+ val numa_node = Host.next_numa_node(_host_database, hostname, state.numa_nodes, used_nodes)
+ Host.Node_Info(hostname, numa_node, Nil)
+ }
+
protected def start_session(
state: Build_Process.State,
session_name: String,
@@ -1025,17 +1046,10 @@
else state
}
else {
- def used_nodes: Set[Int] =
- Set.from(for (job <- state.running.valuesIterator; i <- job.node_info.numa_node) yield i)
- val numa_node =
- for {
- db <- _host_database
- n <- Host.next_numa_node(db, hostname, state.numa_nodes, used_nodes)
- } yield n
- val node_info = Host.Node_Info(hostname, numa_node)
+ val node_info = next_node_info(state, session_name)
val print_node_info =
- node_info.numa_node.isDefined ||
+ node_info.numa_node.isDefined || node_info.rel_cpus.nonEmpty ||
_build_database.isDefined && _build_database.get.is_postgresql
progress.echo(
@@ -1048,7 +1062,8 @@
Build_Job.start_session(build_context, session, progress, log, server,
build_deps.background(session_name), sources_shasum, input_shasum, node_info, store_heap)
- val job = Build_Process.Job(session_name, worker_uuid, build_uuid, node_info, Some(build))
+ val job =
+ Build_Process.Job(session_name, worker_uuid, build_uuid, node_info, Date.now(), Some(build))
state.add_running(job)
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/Tools/build_schedule.scala Sat Oct 28 17:35:26 2023 +0200
@@ -0,0 +1,556 @@
+/* Title: Pure/Tools/build_schedule.scala
+ Author: Fabian Huch, TU Muenchen
+
+Build schedule generated by Heuristic methods, allowing for more efficient builds.
+ */
+package isabelle
+
+
+import Host.Node_Info
+import scala.annotation.tailrec
+
+
+object Build_Schedule {
+ val engine_name = "build_schedule"
+
+ object Config {
+ def from_job(job: Build_Process.Job): Config = Config(job.name, job.node_info)
+ }
+
+ case class Config(job_name: String, node_info: Node_Info) {
+ def job_of(start_time: Time): Build_Process.Job =
+ Build_Process.Job(job_name, "", "", node_info, Date(start_time), None)
+ }
+
+ /* organized historic timing information (extracted from build logs) */
+
+ case class Timing_Entry(job_name: String, hostname: String, threads: Int, elapsed: Time)
+
+ class Timing_Data private(data: List[Timing_Entry], val host_infos: Host_Infos) {
+ require(data.nonEmpty)
+
+ def speedup(time: Time, factor: Double): Time =
+ Time.ms((time.ms * factor).toLong)
+
+ def is_empty = data.isEmpty
+ def size = data.length
+
+ private lazy val by_job =
+ data.groupBy(_.job_name).view.mapValues(new Timing_Data(_, host_infos)).toMap
+ private lazy val by_threads =
+ data.groupBy(_.threads).view.mapValues(new Timing_Data(_, host_infos)).toMap
+ private lazy val by_hostname =
+ data.groupBy(_.hostname).view.mapValues(new Timing_Data(_, host_infos)).toMap
+
+ def mean_time: Time = Timing_Data.mean_time(data.map(_.elapsed))
+
+ private def best_entry: Timing_Entry = data.minBy(_.elapsed.ms)
+
+ def best_threads(job_name: String): Option[Int] = by_job.get(job_name).map(_.best_entry.threads)
+
+ def best_time(job_name: String): Time =
+ by_job.get(job_name).map(_.best_entry.elapsed).getOrElse(
+ estimate_config(Config(job_name, Node_Info(best_entry.hostname, None, Nil))))
+
+ private def hostname_factor(from: String, to: String): Double =
+ host_infos.host_factor(host_infos.the_host(from), host_infos.the_host(to))
+
+ def approximate_threads(threads: Int): Option[Time] = {
+ val approximations =
+ by_job.values.filter(_.size > 1).map { data =>
+ val (ref_hostname, x0) =
+ data.by_hostname.toList.flatMap((hostname, data) =>
+ data.by_threads.keys.map(hostname -> _)).minBy((_, n) => Math.abs(n - threads))
+
+ def unify_hosts(data: Timing_Data): List[Time] =
+ data.by_hostname.toList.map((hostname, data) =>
+ speedup(data.mean_time, hostname_factor(hostname, ref_hostname)))
+
+ val entries =
+ data.by_threads.toList.map((threads, data) =>
+ threads -> Timing_Data.median_time(unify_hosts(data)))
+
+ val y0 = data.by_hostname(ref_hostname).by_threads(x0).mean_time
+ val (x1, y1_data) =
+ data.by_hostname(ref_hostname).by_threads.toList.minBy((n, _) => Math.abs(n - threads))
+ val y1 = y1_data.mean_time
+
+ val a = (y1.ms - y0.ms).toDouble / (x1 - x0)
+ val b = y0.ms - a * x0
+ Time.ms((a * threads + b).toLong)
+ }
+ if (approximations.isEmpty) None else Some(Timing_Data.mean_time(approximations))
+ }
+
+ def threads_factor(divided: Int, divisor: Int): Double =
+ (approximate_threads(divided), approximate_threads(divisor)) match {
+ case (Some(dividend), Some(divisor)) => dividend.ms.toDouble / divisor.ms
+ case _ => divided.toDouble / divisor
+ }
+
+ def estimate_config(config: Config): Time =
+ by_job.get(config.job_name) match {
+ case None => mean_time
+ case Some(data) =>
+ val hostname = config.node_info.hostname
+ val threads = host_infos.num_threads(config.node_info)
+ data.by_threads.get(threads) match {
+ case None => // interpolate threads
+ data.by_hostname.get(hostname).flatMap(
+ _.approximate_threads(threads)).getOrElse {
+ // per machine, try to approximate config for threads
+ val approximated =
+ data.by_hostname.toList.flatMap((hostname1, data) =>
+ data.approximate_threads(threads).map(time =>
+ speedup(time, hostname_factor(hostname1, hostname))))
+
+ if (approximated.nonEmpty) Timing_Data.mean_time(approximated)
+ else {
+ // no machine where config can be approximated
+ data.approximate_threads(threads).getOrElse {
+ // only single data point, use global curve to approximate
+ val global_factor = threads_factor(data.by_threads.keys.head, threads)
+ speedup(data.by_threads.values.head.mean_time, global_factor)
+ }
+ }
+ }
+
+ case Some(data) => // time for job/thread exists, interpolate machine
+ data.by_hostname.get(hostname).map(_.mean_time).getOrElse {
+ Timing_Data.median_time(
+ data.by_hostname.toList.map((hostname1, data) =>
+ speedup(data.mean_time, hostname_factor(hostname1, hostname))))
+ }
+ }
+ }
+ }
+
+ object Timing_Data {
+ def median_time(obs: List[Time]): Time = obs.sortBy(_.ms).drop(obs.length / 2).head
+ def mean_time(obs: Iterable[Time]): Time = Time.ms(obs.map(_.ms).sum / obs.size)
+
+ private val dummy_entries =
+ List(
+ Timing_Entry("dummy", "dummy", 1, Time.minutes(5)),
+ Timing_Entry("dummy", "dummy", 8, Time.minutes(1)))
+
+ def dummy: Timing_Data = new Timing_Data(dummy_entries, Host_Infos.dummy)
+
+ def make(
+ host_infos: Host_Infos,
+ build_history: List[(Build_Log.Meta_Info, Build_Log.Build_Info)],
+ ): Timing_Data = {
+ val hosts = host_infos.hosts
+ val measurements =
+ for {
+ (meta_info, build_info) <- build_history
+ build_host <- meta_info.get(Build_Log.Prop.build_host).toList
+ (job_name, session_info) <- build_info.sessions.toList
+ hostname = session_info.hostname.getOrElse(build_host)
+ host <- hosts.find(_.info.hostname == build_host).toList
+ threads = session_info.threads.getOrElse(host.info.num_cpus)
+ } yield (job_name, hostname, threads) -> session_info.timing.elapsed
+
+ val entries =
+ if (measurements.isEmpty) dummy_entries
+ else
+ measurements.groupMap(_._1)(_._2).toList.map {
+ case ((job_name, hostname, threads), timings) =>
+ Timing_Entry(job_name, hostname, threads, median_time(timings))
+ }
+
+ new Timing_Data(entries, host_infos)
+ }
+ }
+
+
+ /* host information */
+
+ case class Host(info: isabelle.Host.Info, build: Build_Cluster.Host)
+
+ object Host_Infos {
+ def dummy: Host_Infos =
+ new Host_Infos(
+ List(Host(isabelle.Host.Info("dummy", Nil, 8, Some(1.0)), Build_Cluster.Host("dummy"))))
+
+ def apply(build_hosts: List[Build_Cluster.Host], db: SQL.Database): Host_Infos = {
+ def get_host(build_host: Build_Cluster.Host): Host = {
+ val info =
+ isabelle.Host.read_info(db, build_host.name).getOrElse(
+ error("No benchmark for " + quote(build_host.name)))
+ Host(info, build_host)
+ }
+
+ new Host_Infos(build_hosts.map(get_host))
+ }
+ }
+
+ class Host_Infos private(val hosts: List[Host]) {
+ private val by_hostname = hosts.map(host => host.info.hostname -> host).toMap
+
+ def host_factor(from: Host, to: Host): Double =
+ from.info.benchmark_score.get / to.info.benchmark_score.get
+
+ val host_speeds: Ordering[Host] =
+ Ordering.fromLessThan((host1, host2) => host_factor(host1, host2) > 1)
+
+ def the_host(hostname: String): Host =
+ by_hostname.getOrElse(hostname, error("Unknown host " + quote(hostname)))
+ def the_host(node_info: Node_Info): Host = the_host(node_info.hostname)
+
+ def num_threads(node_info: Node_Info): Int =
+ if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus.length
+ else the_host(node_info).info.num_cpus
+
+ def available(state: Build_Process.State): Resources = {
+ val allocated =
+ state.running.values.map(_.node_info).groupMapReduce(the_host)(List(_))(_ ::: _)
+ Resources(this, allocated)
+ }
+ }
+
+
+ /* offline tracking of resource allocations */
+
+ case class Resources(
+ host_infos: Host_Infos,
+ allocated_nodes: Map[Host, List[Node_Info]]
+ ) {
+ val unused_hosts: List[Host] = host_infos.hosts.filter(allocated(_).isEmpty)
+
+ def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host, Nil)
+
+ def allocate(node: Node_Info): Resources = {
+ val host = host_infos.the_host(node)
+ copy(allocated_nodes = allocated_nodes + (host -> (node :: allocated(host))))
+ }
+
+ def try_allocate_tasks(
+ hosts: List[Host],
+ tasks: List[(Build_Process.Task, Int)]
+ ): (List[Config], Resources) =
+ tasks match {
+ case Nil => (Nil, this)
+ case (task, threads) :: tasks =>
+ val (config, resources) =
+ hosts.find(available(_, threads)) match {
+ case Some(host) =>
+ val node_info = next_node(host, threads)
+ (Some(Config(task.name, node_info)), allocate(node_info))
+ case None => (None, this)
+ }
+ val (configs, resources1) = resources.try_allocate_tasks(hosts, tasks)
+ (configs ++ config, resources1)
+ }
+
+ def next_node(host: Host, threads: Int): Node_Info = {
+ val numa_node_num_cpus = host.info.num_cpus / (host.info.numa_nodes.length max 1)
+ def explicit_cpus(node_info: Node_Info): List[Int] =
+ if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus else (0 until numa_node_num_cpus).toList
+
+ val used_nodes = allocated(host).groupMapReduce(_.numa_node)(explicit_cpus)(_ ::: _)
+
+ val available_nodes = host.info.numa_nodes
+ val numa_node =
+ if (!host.build.numa) None
+ else available_nodes.sortBy(n => used_nodes.getOrElse(Some(n), Nil).length).headOption
+
+ val used_cpus = used_nodes.getOrElse(numa_node, Nil).toSet
+ val available_cpus = (0 until numa_node_num_cpus).filterNot(used_cpus.contains).toList
+
+ val rel_cpus = if (available_cpus.length >= threads) available_cpus.take(threads) else Nil
+
+ Node_Info(host.info.hostname, numa_node, rel_cpus)
+ }
+
+ def available(host: Host, threads: Int): Boolean = {
+ val used = allocated(host)
+
+ if (used.length >= host.build.jobs) false
+ else {
+ if (host.info.numa_nodes.length <= 1)
+ used.map(host_infos.num_threads).sum + threads <= host.info.num_cpus
+ else {
+ def node_threads(n: Int): Int =
+ used.filter(_.numa_node.contains(n)).map(host_infos.num_threads).sum
+
+ host.info.numa_nodes.exists(
+ node_threads(_) + threads <= host.info.num_cpus / host.info.numa_nodes.length)
+ }
+ }
+ }
+ }
+
+
+ /* schedule generation */
+
+ case class State(build_state: Build_Process.State, current_time: Time) {
+ def start(config: Config): State =
+ copy(build_state =
+ build_state.copy(running = build_state.running +
+ (config.job_name -> config.job_of(current_time))))
+
+ def step(timing_data: Timing_Data): State = {
+ val remaining =
+ build_state.running.values.toList.map { job =>
+ val elapsed = current_time - job.start_date.time
+ val predicted = timing_data.estimate_config(Config.from_job(job))
+ val remaining = if (elapsed > predicted) Time.zero else predicted - elapsed
+ job -> remaining
+ }
+
+ if (remaining.isEmpty) error("Schedule step without running sessions")
+ else {
+ val (job, elapsed) = remaining.minBy(_._2.ms)
+ State(build_state.remove_running(job.name).remove_pending(job.name), current_time + elapsed)
+ }
+ }
+
+ def finished: Boolean = build_state.pending.isEmpty && build_state.running.isEmpty
+ }
+
+ abstract class Scheduler {
+ def ready_jobs(state: Build_Process.State): Build_Process.State.Pending =
+ state.pending.filter(entry => entry.is_ready && !state.is_running(entry.name))
+
+ def next(timing: Timing_Data, state: Build_Process.State): List[Config]
+
+ def build_duration(timing_data: Timing_Data, build_state: Build_Process.State): Time = {
+ @tailrec
+ def simulate(state: State): State =
+ if (state.finished) state
+ else {
+ val state1 =
+ next(timing_data, state.build_state).foldLeft(state)(_.start(_)).step(timing_data)
+ simulate(state1)
+ }
+
+ val start = Time.now()
+ simulate(State(build_state, start)).current_time - start
+ }
+ }
+
+
+ /* heuristics */
+
+ class Timing_Heuristic(threshold: Time) extends Scheduler {
+ def next(timing_data: Timing_Data, state: Build_Process.State): List[Config] = {
+ val host_infos = timing_data.host_infos
+ val resources = host_infos.available(state)
+
+ def best_threads(task: Build_Process.Task): Int =
+ timing_data.best_threads(task.name).getOrElse(
+ host_infos.hosts.map(_.info.num_cpus).max min 8)
+
+ val ready = ready_jobs(state)
+ val free = resources.unused_hosts
+
+ if (ready.length <= free.length)
+ resources.try_allocate_tasks(free, ready.map(task => task -> best_threads(task)))._1
+ else {
+ val pending_tasks = state.pending.map(_.name).toSet
+ val graph = state.sessions.graph.restrict(pending_tasks)
+
+ val accumulated_time =
+ graph.node_depth(timing_data.best_time(_).ms).filter((name, _) => graph.is_maximal(name))
+
+ val path_time =
+ accumulated_time.flatMap((name, ms) => graph.all_preds(List(name)).map(_ -> ms)).toMap
+
+ def is_critical(task: String): Boolean = path_time(task) > threshold.ms
+
+ val (critical, other) =
+ ready.sortBy(task => path_time(task.name)).partition(task => is_critical(task.name))
+
+ val critical_graph = graph.restrict(is_critical)
+ def parallel_paths(node: String): Int =
+ critical_graph.imm_succs(node).map(suc => parallel_paths(suc) max 1).sum max 1
+
+ val (critical_hosts, other_hosts) =
+ host_infos.hosts.sorted(host_infos.host_speeds).reverse.splitAt(
+ critical.map(_.name).map(parallel_paths).sum)
+
+ val (configs1, resources1) =
+ resources.try_allocate_tasks(critical_hosts,
+ critical.map(task => task -> best_threads(task)))
+
+ val (configs2, _) = resources1.try_allocate_tasks(other_hosts, other.map(_ -> 1))
+
+ configs1 ::: configs2
+ }
+ }
+ }
+
+ class Meta_Heuristic(schedulers: List[Scheduler]) extends Scheduler {
+ require(schedulers.nonEmpty)
+
+ def next(timing_data: Timing_Data, state: Build_Process.State): List[Config] = {
+ val (best, _) = schedulers.map(h => h -> h.build_duration(timing_data, state)).minBy(_._2.ms)
+ best.next(timing_data, state)
+ }
+ }
+
+
+ /* process for scheduled build */
+
+ class Scheduled_Build_Process(
+ scheduler: Scheduler,
+ build_context: Build.Context,
+ build_progress: Progress,
+ server: SSH.Server,
+ ) extends Build_Process(build_context, build_progress, server) {
+ protected val start_date = Date.now()
+
+
+ /* global resources with common close() operation */
+
+ private final lazy val _log_store: Build_Log.Store = Build_Log.store(build_options)
+ private final lazy val _log_database: SQL.Database =
+ try {
+ val db = _log_store.open_database(server = this.server)
+ _log_store.init_database(db)
+ db
+ }
+ catch { case exn: Throwable => close(); throw exn }
+
+ override def close(): Unit = {
+ super.close()
+ Option(_log_database).foreach(_.close())
+ }
+
+
+ /* previous results via build log */
+
+ override def open_build_cluster(): Build_Cluster = {
+ val build_cluster = super.open_build_cluster()
+ build_cluster.init()
+ if (build_context.master && build_context.max_jobs > 0) {
+ val benchmark_options = build_options.string("build_hostname") = hostname
+ Benchmark.benchmark(benchmark_options, progress)
+ }
+ build_cluster.benchmark()
+ build_cluster
+ }
+
+ private val timing_data: Timing_Data = {
+ val cluster_hosts: List[Build_Cluster.Host] =
+ if (build_context.max_jobs == 0) build_context.build_hosts
+ else {
+ val local_build_host =
+ Build_Cluster.Host(
+ hostname, jobs = build_context.max_jobs, numa = build_context.numa_shuffling)
+ local_build_host :: build_context.build_hosts
+ }
+
+ val host_infos = Host_Infos(cluster_hosts, _host_database)
+
+ val build_history =
+ for {
+ log_name <- _log_database.execute_query_statement(
+ Build_Log.private_data.meta_info_table.select(List(Build_Log.private_data.log_name)),
+ List.from[String], res => res.string(Build_Log.private_data.log_name))
+ meta_info <- _log_store.read_meta_info(_log_database, log_name)
+ build_info = _log_store.read_build_info(_log_database, log_name)
+ } yield (meta_info, build_info)
+
+ Timing_Data.make(host_infos, build_history)
+ }
+
+ def write_build_log(results: Build.Results, state: Build_Process.State.Results): Unit = {
+ val sessions =
+ for {
+ (session_name, result) <- state.toList
+ if !result.current
+ } yield {
+ val info = build_context.sessions_structure(session_name)
+ val entry =
+ if (!results.cancelled(session_name)) {
+ val status =
+ if (result.ok) Build_Log.Session_Status.finished
+ else Build_Log.Session_Status.failed
+
+ Build_Log.Session_Entry(
+ chapter = info.chapter,
+ groups = info.groups,
+ hostname = Some(result.node_info.hostname),
+ threads = Some(timing_data.host_infos.num_threads(result.node_info)),
+ timing = result.process_result.timing,
+ sources = Some(result.output_shasum.digest.toString),
+ status = Some(status))
+ }
+ else
+ Build_Log.Session_Entry(
+ chapter = info.chapter,
+ groups = info.groups,
+ status = Some(Build_Log.Session_Status.cancelled))
+ session_name -> entry
+ }
+
+ val settings =
+ Build_Log.Settings.all_settings.map(_.name).map(name =>
+ name -> Isabelle_System.getenv(name))
+ val props =
+ List(
+ Build_Log.Prop.build_id.name -> build_context.build_uuid,
+ Build_Log.Prop.build_engine.name -> build_context.engine.name,
+ Build_Log.Prop.build_host.name -> hostname,
+ Build_Log.Prop.build_start.name -> Build_Log.print_date(start_date))
+
+ val meta_info = Build_Log.Meta_Info(props, settings)
+ val build_info = Build_Log.Build_Info(sessions.toMap)
+ val log_name = Build_Log.log_filename(engine = engine_name, date = start_date)
+
+ _log_store.update_sessions(_log_database, log_name.file_name, build_info)
+ _log_store.update_meta_info(_log_database, log_name.file_name, meta_info)
+ }
+
+
+ /* build process */
+
+ case class Cache(state: Build_Process.State, configs: List[Config], estimate: Date) {
+ def is_current(state: Build_Process.State): Boolean = this.state == state
+ def is_current_estimate(estimate: Date): Boolean =
+ this.estimate.time - estimate.time >= Time.seconds(1)
+ }
+
+ private var cache = Cache(Build_Process.State(), Nil, Date.now())
+
+ override def next_node_info(state: Build_Process.State, session_name: String): Node_Info = {
+ val configs =
+ if (cache.is_current(state)) cache.configs
+ else scheduler.next(timing_data, state)
+ configs.find(_.job_name == session_name).get.node_info
+ }
+
+ override def next_jobs(state: Build_Process.State): List[String] =
+ if (cache.is_current(state)) cache.configs.map(_.job_name)
+ else {
+ val next = scheduler.next(timing_data, state)
+ val estimate = Date(Time.now() + scheduler.build_duration(timing_data, state))
+ progress.echo_if(build_context.master && cache.is_current_estimate(estimate),
+ "Estimated completion: " + estimate)
+
+ val configs = next.filter(_.node_info.hostname == hostname)
+ cache = Cache(state, configs, estimate)
+ configs.map(_.job_name)
+ }
+
+ override def run(): Build.Results = {
+ val results = super.run()
+ if (build_context.master) write_build_log(results, snapshot().results)
+ results
+ }
+ }
+
+ class Engine extends Build.Engine(engine_name) {
+ override def open_build_process(
+ context: Build.Context,
+ progress: Progress,
+ server: SSH.Server
+ ): Build_Process = {
+ val heuristics = List(5, 10, 20).map(minutes => Timing_Heuristic(Time.minutes(minutes)))
+ val scheduler = new Meta_Heuristic(heuristics)
+ new Scheduled_Build_Process(scheduler, context, progress, server)
+ }
+ }
+}
--- a/src/Pure/Tools/dump.scala Fri Oct 27 18:27:06 2023 +0200
+++ b/src/Pure/Tools/dump.scala Sat Oct 28 17:35:26 2023 +0200
@@ -98,7 +98,7 @@
): Context = {
val session_options: Options = {
val options1 =
- Host.process_policy_options(options, Host.numa_node0()) +
+ Host.numa_options(options, Host.numa_node0()) +
"parallel_proofs=0" +
"completion_limit=0" +
"editor_tracing_messages=0"