--- a/src/Pure/Admin/ci_build.scala Fri Jun 07 15:04:07 2024 +0200
+++ b/src/Pure/Admin/ci_build.scala Fri Jun 07 15:47:19 2024 +0200
@@ -7,6 +7,8 @@
package isabelle
+import scala.collection.mutable
+
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util.{Properties => JProperties, Map => JMap}
@@ -23,21 +25,6 @@
}
- /* executor profile */
-
- case class Profile(threads: Int, jobs: Int, numa: Boolean)
-
- object Profile {
- def from_host: Profile = {
- Isabelle_System.hostname() match {
- case "hpcisabelle" => Profile(8, 8, numa = true)
- case "lxcisa1" => Profile(4, 10, numa = false)
- case _ => Profile(2, 2, numa = false)
- }
- }
- }
-
-
/* build config */
case class Build_Config(
@@ -62,8 +49,31 @@
password = options.string("ci_mail_password"))
}
+
/* ci build jobs */
+ sealed trait Hosts {
+ def hosts_spec: String
+ def max_jobs: Option[Int]
+ def prefs: List[Options.Spec]
+ def numa_shuffling: Boolean
+ def build_cluster: Boolean
+ }
+
+ case class Local(host_spec: String, jobs: Int, threads: Int, numa_shuffling: Boolean = true)
+ extends Hosts {
+ def hosts_spec: String = host_spec
+ def max_jobs: Option[Int] = Some(jobs)
+ def prefs: List[Options.Spec] = List(Options.Spec.eq("threads", threads.toString))
+ def build_cluster: Boolean = false
+ }
+
+ case class Cluster(hosts_spec: String, numa_shuffling: Boolean = true) extends Hosts {
+ def max_jobs: Option[Int] = None
+ def prefs: List[Options.Spec] = Nil
+ def build_cluster: Boolean = true
+ }
+
sealed trait Trigger
case object On_Commit extends Trigger
@@ -76,13 +86,13 @@
(before.time < start1.time && start1.time <= now.time)
}
}
-
+
case class Timed(in_interval: (Date, Date) => Boolean) extends Trigger
sealed case class Job(
name: String,
description: String,
- profile: Profile,
+ hosts: Hosts,
config: Build_Config,
components: List[String] = Nil,
trigger: Trigger = On_Commit
@@ -102,7 +112,7 @@
val timing =
Job(
"benchmark", "runs benchmark and timing sessions",
- Profile(threads = 6, jobs = 1, numa = false),
+ Local("benchmark", jobs = 1, threads = 6, numa_shuffling = false),
Build_Config(
documents = false,
select = List(
@@ -161,8 +171,8 @@
def print_section(title: String): Unit =
println("\n=== " + title + " ===\n")
- def ci_build(options: Options, job: Job): Unit = {
- val profile = job.profile
+ def ci_build(options: Options, build_hosts: List[Build_Cluster.Host], job: Job): Unit = {
+ val hosts = job.hosts
val config = job.config
val isabelle_home = Path.explode(Isabelle_System.getenv_strict("ISABELLE_HOME"))
@@ -175,12 +185,9 @@
print_section("CONFIGURATION")
println(Build_Log.Settings.show())
- val build_options =
- with_documents(options, config).int.update("threads", profile.threads) +
- "parallel_proofs=1" + "system_heaps"
+ val build_options = with_documents(options, config) + "parallel_proofs=1" + "system_heaps"
- println(
- "jobs = " + profile.jobs + ", threads = " + profile.threads + ", numa = " + profile.numa)
+ println(hosts)
print_section("BUILD")
println("Build started at " + formatted_time)
@@ -193,12 +200,13 @@
val start_time = Time.now()
val results = progress.interrupt_handler {
Build.build(
- build_options,
+ build_options ++ hosts.prefs,
+ build_hosts = build_hosts,
selection = config.selection,
progress = progress,
clean_build = config.clean,
- numa_shuffling = profile.numa,
- max_jobs = Some(profile.jobs),
+ numa_shuffling = hosts.numa_shuffling,
+ max_jobs = hosts.max_jobs,
dirs = config.include,
select_dirs = config.select)
}
@@ -241,17 +249,23 @@
/* arguments */
var options = Options.init()
+ val build_hosts = new mutable.ListBuffer[Build_Cluster.Host]
val getopts = Getopts("""
Usage: isabelle ci_build [OPTIONS] JOB
Options are:
+ -H HOSTS host specifications of the form NAMES:PARAMETERS (separated by commas)
-o OPTION override Isabelle system OPTION (via NAME=VAL or NAME)
- Runs Isabelle builds in ci environment, with the following build jobs:
+ Runs Isabelle builds in ci environment. For cluster builds, build hosts must
+ be passed explicitly (no registry).
+
+ The following build jobs are available:
""" + Library.indent_lines(4, show_jobs) + "\n",
- "o:" -> (arg => options = options + arg))
+ "o:" -> (arg => options = options + arg),
+ "H:" -> (arg => build_hosts ++= Build_Cluster.Host.parse(Registry.load(Nil), arg)))
val more_args = getopts(args)
@@ -260,7 +274,7 @@
case _ => getopts.usage()
}
- ci_build(options, job)
+ ci_build(options, build_hosts.toList, job)
})
}
--- a/src/Pure/Build/build_manager.scala Fri Jun 07 15:04:07 2024 +0200
+++ b/src/Pure/Build/build_manager.scala Fri Jun 07 15:47:19 2024 +0200
@@ -42,7 +42,10 @@
case class CI_Build(name: String, components: List[Component]) extends Build_Config {
def fresh_build: Boolean = true
- def command(build_hosts: List[Build_Cluster.Host]): String = " ci_build " + name
+ def command(build_hosts: List[Build_Cluster.Host]): String =
+ " ci_build" +
+ build_hosts.map(host => " -H " + Bash.string(host.print)).mkString +
+ " " + name
}
object User_Build {
@@ -72,7 +75,7 @@
" build" +
if_proper(afp_rev, " -A:") +
base_sessions.map(session => " -B " + Bash.string(session)).mkString +
- if_proper(build_hosts, build_hosts.map(host => " -H " + Bash.string(host.print)).mkString) +
+ build_hosts.map(host => " -H " + Bash.string(host.print)).mkString +
if_proper(presentation, " -P:") +
if_proper(requirements, " -R") +
if_proper(all_sessions, " -a") +
@@ -92,6 +95,8 @@
sealed case class Task(
build_config: Build_Config,
+ build_cluster: Boolean,
+ hosts_spec: String,
id: UUID.T = UUID.random(),
submit_date: Date = Date.now(),
priority: Priority = Priority.normal,
@@ -100,6 +105,9 @@
def name: String = id.toString
def kind: String = build_config.name
def components: List[Component] = build_config.components
+
+ def build_hosts: List[Build_Cluster.Host] =
+ Build_Cluster.Host.parse(Registry.global, hosts_spec)
}
sealed case class Job(
@@ -107,6 +115,8 @@
kind: String,
number: Long,
isabelle_rev: String,
+ build_cluster: Boolean,
+ hostnames: List[String],
components: List[Component],
start_date: Date = Date.now(),
cancelled: Boolean = false
@@ -156,12 +166,22 @@
def num_builds = running.size + finished.size
- def next: List[Task] =
- if (pending.isEmpty) Nil
+ def next(build_hosts: List[Build_Cluster.Host]): Option[Task] = {
+ val cluster_running = running.values.exists(_.build_cluster)
+ val available = build_hosts.map(_.hostname).toSet - running.values.flatMap(_.hostnames).toSet
+ val ready =
+ for {
+ (_, task) <- pending
+ if !task.build_cluster || !cluster_running
+ if task.build_hosts.map(_.hostname).forall(available.contains)
+ } yield task
+
+ if (ready.isEmpty) None
else {
- val priority = pending.values.map(_.priority).maxBy(_.ordinal)
- pending.values.filter(_.priority == priority).toList.sortBy(_.submit_date)(Date.Ordering)
+ val priority = ready.map(_.priority).maxBy(_.ordinal)
+ ready.filter(_.priority == priority).toList.sortBy(_.submit_date)(Date.Ordering).headOption
}
+ }
def add_running(job: Job): State = copy(running = running + (job.name -> job))
def remove_running(name: String): State = copy(running = running - name)
@@ -255,6 +275,8 @@
object Pending {
val kind = SQL.Column.string("kind")
+ val build_cluster = SQL.Column.bool("build_cluster")
+ val hosts_spec = SQL.Column.string("hosts_spec")
val id = SQL.Column.string("id").make_primary_key
val submit_date = SQL.Column.date("submit_date")
val priority = SQL.Column.string("priority")
@@ -277,10 +299,10 @@
val verbose = SQL.Column.bool("verbose")
val table =
- make_table(List(kind, id, submit_date, priority, isabelle_rev, components, prefs,
- requirements, all_sessions, base_sessions, exclude_session_groups, exclude_sessions,
- session_groups, sessions, build_heap, clean_build, export_files, fresh_build,
- presentation, verbose),
+ make_table(List(kind, build_cluster, hosts_spec, id, submit_date, priority, isabelle_rev,
+ components, prefs, requirements, all_sessions, base_sessions, exclude_session_groups,
+ exclude_sessions, session_groups, sessions, build_heap, clean_build, export_files,
+ fresh_build, presentation, verbose),
name = "pending")
}
@@ -288,6 +310,8 @@
db.execute_query_statement(Pending.table.select(), Map.from[String, Task], get =
{ res =>
val kind = res.string(Pending.kind)
+ val build_cluster = res.bool(Pending.build_cluster)
+ val hosts_spec = res.string(Pending.hosts_spec)
val id = res.string(Pending.id)
val submit_date = res.date(Pending.submit_date)
val priority = Priority.valueOf(res.string(Pending.priority))
@@ -319,7 +343,8 @@
clean_build, export_files, fresh_build, presentation, verbose)
}
- val task = Task(build_config, UUID.make(id), submit_date, priority, isabelle_rev)
+ val task = Task(build_config, build_cluster, hosts_spec, UUID.make(id), submit_date,
+ priority, isabelle_rev)
task.name -> task
})
@@ -340,11 +365,13 @@
for (name <- update.insert) yield { (stmt: SQL.Statement) =>
val task = pending(name)
stmt.string(1) = task.kind
- stmt.string(2) = task.id.toString
- stmt.date(3) = task.submit_date
- stmt.string(4) = task.priority.toString
- stmt.string(5) = task.isabelle_rev
- stmt.string(6) = task.components.mkString(",")
+ stmt.bool(2) = task.build_cluster
+ stmt.string(3) = task.hosts_spec
+ stmt.string(4) = task.id.toString
+ stmt.date(5) = task.submit_date
+ stmt.string(6) = task.priority.toString
+ stmt.string(7) = task.isabelle_rev
+ stmt.string(8) = task.components.mkString(",")
def get[A](f: User_Build => A): Option[A] =
task.build_config match {
@@ -352,20 +379,20 @@
case _ => None
}
- stmt.string(7) = get(user_build => user_build.prefs.map(_.print).mkString(","))
- stmt.bool(8) = get(_.requirements)
- stmt.bool(9) = get(_.all_sessions)
- stmt.string(10) = get(_.base_sessions.mkString(","))
- stmt.string(11) = get(_.exclude_session_groups.mkString(","))
- stmt.string(12) = get(_.exclude_sessions.mkString(","))
- stmt.string(13) = get(_.session_groups.mkString(","))
- stmt.string(14) = get(_.sessions.mkString(","))
- stmt.bool(15) = get(_.build_heap)
- stmt.bool(16) = get(_.clean_build)
- stmt.bool(17) = get(_.export_files)
- stmt.bool(18) = get(_.fresh_build)
- stmt.bool(19) = get(_.presentation)
- stmt.bool(20) = get(_.verbose)
+ stmt.string(9) = get(user_build => user_build.prefs.map(_.print).mkString(","))
+ stmt.bool(10) = get(_.requirements)
+ stmt.bool(11) = get(_.all_sessions)
+ stmt.string(12) = get(_.base_sessions.mkString(","))
+ stmt.string(13) = get(_.exclude_session_groups.mkString(","))
+ stmt.string(14) = get(_.exclude_sessions.mkString(","))
+ stmt.string(15) = get(_.session_groups.mkString(","))
+ stmt.string(16) = get(_.sessions.mkString(","))
+ stmt.bool(17) = get(_.build_heap)
+ stmt.bool(18) = get(_.clean_build)
+ stmt.bool(19) = get(_.export_files)
+ stmt.bool(20) = get(_.fresh_build)
+ stmt.bool(21) = get(_.presentation)
+ stmt.bool(22) = get(_.verbose)
})
}
@@ -380,12 +407,15 @@
val kind = SQL.Column.string("kind")
val number = SQL.Column.long("number")
val isabelle_rev = SQL.Column.string("isabelle_rev")
+ val build_cluster = SQL.Column.bool("build_cluster")
+ val hostnames = SQL.Column.string("hostnames")
val components = SQL.Column.string("components")
val start_date = SQL.Column.date("start_date")
val cancelled = SQL.Column.bool("cancelled")
val table =
- make_table(List(id, kind, number, isabelle_rev, components, start_date, cancelled),
+ make_table(List(id, kind, number, isabelle_rev, build_cluster, hostnames, components,
+ start_date, cancelled),
name = "running")
}
@@ -396,12 +426,14 @@
val kind = res.string(Running.kind)
val number = res.long(Running.number)
val isabelle_rev = res.string(Running.isabelle_rev)
+ val build_cluster = res.bool(Running.build_cluster)
+ val hostnames = space_explode(',', res.string(Running.hostnames))
val components = space_explode(',', res.string(Running.components)).map(Component.parse)
val start_date = res.date(Running.start_date)
val cancelled = res.bool(Running.cancelled)
- val job =
- Job(UUID.make(id), kind, number, isabelle_rev, components, start_date, cancelled)
+ val job = Job(UUID.make(id), kind, number, isabelle_rev, build_cluster, hostnames,
+ components, start_date, cancelled)
job.name -> job
})
@@ -425,9 +457,11 @@
stmt.string(2) = job.kind
stmt.long(3) = job.number
stmt.string(4) = job.isabelle_rev
- stmt.string(5) = job.components.mkString(",")
- stmt.date(6) = job.start_date
- stmt.bool(7) = job.cancelled
+ stmt.bool(5) = job.build_cluster
+ stmt.string(6) = job.hostnames.mkString(",")
+ stmt.string(7) = job.components.mkString(",")
+ stmt.date(8) = job.start_date
+ stmt.bool(9) = job.cancelled
})
}
update
@@ -631,13 +665,13 @@
private def start_next(): Option[Context] =
synchronized_database("start_next") {
- _state.next.headOption.flatMap { task =>
+ _state.next(build_hosts).flatMap { task =>
progress.echo("Initializing " + task.name)
_state = _state.remove_pending(task.name)
val number = _state.next_number(task.kind)
- val context = Context(store, task, number, build_hosts)
+ val context = Context(store, task, number)
Exn.capture {
context.init()
@@ -646,6 +680,8 @@
val isabelle_rev =
sync(isabelle_repository, task.isabelle_rev, context.task_dir)
+ val hostnames = task.build_hosts.map(_.hostname).distinct
+
val components =
for (component <- task.components)
yield sync_dirs.find(_.name == component.name) match {
@@ -657,7 +693,7 @@
else error("Unknown component " + component)
}
- Job(task.id, task.kind, number, isabelle_rev, components)
+ Job(task.id, task.kind, number, isabelle_rev, task.build_cluster, hostnames, components)
} match {
case Exn.Res(job) =>
_state = _state.add_running(job)
@@ -704,17 +740,18 @@
def init: Runner.State = Runner.State.empty
def loop_body(state: Runner.State): Runner.State = {
- if (state.is_empty && !progress.stopped) {
- start_next() match {
- case None => state
- case Some(context) => state.init(context)
+ val state1 =
+ if (progress.stopped) state
+ else {
+ start_next() match {
+ case None => state
+ case Some(context) => state.init(context)
+ }
}
- }
- else {
- val (state1, results) = stop_cancelled(state).update
- results.foreach(finish_job)
- state1
- }
+ val state2 = stop_cancelled(state1)
+ val (state3, results) = state2.update
+ results.foreach(finish_job)
+ state3
}
}
@@ -761,7 +798,8 @@
if isabelle_updated || ci_job.components.exists(updated_components.contains)
if !_state.pending.values.exists(_.kind == ci_job.name)
} {
- val task = Task(CI_Build(ci_job), priority = Priority.low, isabelle_rev = "default")
+ val task = Task(CI_Build(ci_job), ci_job.hosts.build_cluster, ci_job.hosts.hosts_spec,
+ priority = Priority.low, isabelle_rev = "default")
_state = _state.add_pending(task)
}
}
@@ -796,7 +834,8 @@
for (ci_job <-ci_jobs)
ci_job.trigger match {
case isabelle.CI_Build.Timed(in_interval) if in_interval(previous, next) =>
- val task = Task(CI_Build(ci_job), isabelle_rev = "default")
+ val task = Task(CI_Build(ci_job), ci_job.hosts.build_cluster, ci_job.hosts.hosts_spec,
+ isabelle_rev = "default")
_state = _state.add_pending(task)
case _ =>
}
@@ -1063,21 +1102,21 @@
/* context */
- case class Context(
- store: Store,
- task: Task,
- number: Long,
- build_hosts: List[Build_Cluster.Host]
- ) {
- def name: String = task.kind + "/" + number
+ case class Context(store: Store, task: Task, number: Long) {
+ def name = task.kind + "/" + number
+ def progress: Progress = new File_Progress(store.log_file(name))
+
+ def task_dir: Path = store.task_dir(task)
+
def init(): Unit = Isabelle_System.make_directory(store.log_file(name).dir)
- def task_dir: Path = store.task_dir(task)
- def components: List[Component] = task.build_config.components
- def isabelle_identifier: String = store.identifier
- def fresh_build: Boolean = task.build_config.fresh_build
- def command: String = task.build_config.command(build_hosts)
- def progress: Progress = new File_Progress(store.log_file(name))
- def open_ssh(): SSH.Session = store.open_ssh()
+
+ def isabelle_identifier: String =
+ if (task.build_cluster) store.options.string("build_cluster_identifier") else store.identifier
+
+ def open_ssh(): SSH.System = {
+ if (task.build_cluster) store.open_ssh()
+ else Library.the_single(task.build_hosts).open_ssh(store.options)
+ }
}
@@ -1087,10 +1126,14 @@
def open(context: Context): Build_Process = new Build_Process(context.open_ssh(), context)
}
- class Build_Process(ssh: SSH.Session, context: Context) {
+ class Build_Process(ssh: SSH.System, context: Context) {
+ private val task = context.task
+ private val progress = context.progress
+
+
+ /* resources with cleanup operations */
+
private val _dir = ssh.tmp_dir()
- private val _progress = context.progress
-
private val _isabelle =
try {
val rsync_context = Rsync.Context(ssh = ssh)
@@ -1099,7 +1142,7 @@
rsync_context.target(_dir))).check
Isabelle_System.rm_tree(context.task_dir)
- Other_Isabelle(_dir, context.isabelle_identifier, ssh, _progress)
+ Other_Isabelle(_dir, context.isabelle_identifier, ssh, progress)
}
catch { case exn: Throwable => close(); throw exn }
@@ -1107,30 +1150,23 @@
try {
val init_components =
for {
- component <- context.components
+ component <- task.build_config.components
target = _dir + Sync.DIRS + Path.basic(component.name)
if Components.is_component_dir(target)
} yield "init_component " + quote(target.absolute.implode)
_isabelle.init(
other_settings = _isabelle.init_components() ::: init_components,
- fresh = context.fresh_build, echo = true)
+ fresh = task.build_config.fresh_build, echo = true)
- val cmd = context.command
- _progress.echo("isabelle" + cmd)
+ val cmd = task.build_config.command(task.build_hosts)
+ progress.echo("isabelle" + cmd)
val script = File.bash_path(Isabelle_Tool.exe(_isabelle.isabelle_home)) + cmd
ssh.bash_process(_isabelle.bash_context(script), settings = false)
}
catch { case exn: Throwable => close(); throw exn }
- def run(): Process_Result = {
- val process_result =
- _process.result(progress_stdout = _progress.echo(_), progress_stderr = _progress.echo(_))
- close()
- process_result
- }
-
def cancel(): Unit = Option(_process).foreach(_.interrupt())
def close(): Unit = {
@@ -1138,6 +1174,16 @@
Isabelle_System.rm_tree(context.task_dir)
ssh.close()
}
+
+
+ /* execution */
+
+ def run(): Process_Result = {
+ val process_result =
+ _process.result(progress_stdout = progress.echo(_), progress_stderr = progress.echo(_))
+ close()
+ process_result
+ }
}
@@ -1250,10 +1296,11 @@
val id = UUID.random()
val afp_rev = if (afp_root.nonEmpty) Some("") else None
- val build_config = User_Build(afp_rev, prefs, requirements, all_sessions, base_sessions,
- exclude_session_groups, exclude_sessions, session_groups, sessions, build_heap, clean_build,
- export_files, fresh_build, presentation, verbose)
- val task = Task(build_config, id, Date.now(), Priority.high)
+ val hosts_spec = options.string("build_manager_cluster")
+ val build_config = User_Build(afp_rev, prefs, requirements, all_sessions,
+ base_sessions, exclude_session_groups, exclude_sessions, session_groups, sessions, build_heap,
+ clean_build, export_files, fresh_build, presentation, verbose)
+ val task = Task(build_config, true, hosts_spec, id, Date.now(), Priority.high)
val dir = store.task_dir(task)
@@ -1315,7 +1362,7 @@
Options are:
-A ROOT include AFP with given root directory (":" for """ + AFP.BASE.implode + """)
-D DIR include extra component in given directory
- -H HOSTS additional cluster host specifications of the form
+ -H HOSTS host specifications for all available hosts of the form
NAMES:PARAMETERS (separated by commas)
-o OPTION override Isabelle system OPTION (via NAME=VAL or NAME)
-p PORT explicit web server port