--- a/src/Pure/Admin/ci_build.scala Sun Jun 09 21:16:38 2024 +0200
+++ b/src/Pure/Admin/ci_build.scala Sun Jun 09 22:40:13 2024 +0200
@@ -7,6 +7,8 @@
package isabelle
+import scala.collection.mutable
+
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util.{Properties => JProperties, Map => JMap}
@@ -23,21 +25,6 @@
}
- /* executor profile */
-
- case class Profile(threads: Int, jobs: Int, numa: Boolean)
-
- object Profile {
- def from_host: Profile = {
- Isabelle_System.hostname() match {
- case "hpcisabelle" => Profile(8, 8, numa = true)
- case "lxcisa1" => Profile(4, 10, numa = false)
- case _ => Profile(2, 2, numa = false)
- }
- }
- }
-
-
/* build config */
case class Build_Config(
@@ -62,8 +49,31 @@
password = options.string("ci_mail_password"))
}
+
/* ci build jobs */
+ sealed trait Hosts {
+ def hosts_spec: String
+ def max_jobs: Option[Int]
+ def prefs: List[Options.Spec]
+ def numa_shuffling: Boolean
+ def build_cluster: Boolean
+ }
+
+ case class Local(host_spec: String, jobs: Int, threads: Int, numa_shuffling: Boolean = true)
+ extends Hosts {
+ def hosts_spec: String = host_spec
+ def max_jobs: Option[Int] = Some(jobs)
+ def prefs: List[Options.Spec] = List(Options.Spec.eq("threads", threads.toString))
+ def build_cluster: Boolean = false
+ }
+
+ case class Cluster(hosts_spec: String, numa_shuffling: Boolean = true) extends Hosts {
+ def max_jobs: Option[Int] = None
+ def prefs: List[Options.Spec] = Nil
+ def build_cluster: Boolean = true
+ }
+
sealed trait Trigger
case object On_Commit extends Trigger
@@ -76,13 +86,13 @@
(before.time < start1.time && start1.time <= now.time)
}
}
-
+
case class Timed(in_interval: (Date, Date) => Boolean) extends Trigger
sealed case class Job(
name: String,
description: String,
- profile: Profile,
+ hosts: Hosts,
config: Build_Config,
components: List[String] = Nil,
trigger: Trigger = On_Commit
@@ -102,7 +112,7 @@
val timing =
Job(
"benchmark", "runs benchmark and timing sessions",
- Profile(threads = 6, jobs = 1, numa = false),
+ Local("benchmark", jobs = 1, threads = 6, numa_shuffling = false),
Build_Config(
documents = false,
select = List(
@@ -161,8 +171,8 @@
def print_section(title: String): Unit =
println("\n=== " + title + " ===\n")
- def ci_build(options: Options, job: Job): Unit = {
- val profile = job.profile
+ def ci_build(options: Options, build_hosts: List[Build_Cluster.Host], job: Job): Unit = {
+ val hosts = job.hosts
val config = job.config
val isabelle_home = Path.explode(Isabelle_System.getenv_strict("ISABELLE_HOME"))
@@ -175,12 +185,9 @@
print_section("CONFIGURATION")
println(Build_Log.Settings.show())
- val build_options =
- with_documents(options, config).int.update("threads", profile.threads) +
- "parallel_proofs=1" + "system_heaps"
+ val build_options = with_documents(options, config) + "parallel_proofs=1" + "system_heaps"
- println(
- "jobs = " + profile.jobs + ", threads = " + profile.threads + ", numa = " + profile.numa)
+ println(hosts)
print_section("BUILD")
println("Build started at " + formatted_time)
@@ -193,12 +200,13 @@
val start_time = Time.now()
val results = progress.interrupt_handler {
Build.build(
- build_options,
+ build_options ++ hosts.prefs,
+ build_hosts = build_hosts,
selection = config.selection,
progress = progress,
clean_build = config.clean,
- numa_shuffling = profile.numa,
- max_jobs = Some(profile.jobs),
+ numa_shuffling = hosts.numa_shuffling,
+ max_jobs = hosts.max_jobs,
dirs = config.include,
select_dirs = config.select)
}
@@ -241,17 +249,23 @@
/* arguments */
var options = Options.init()
+ val build_hosts = new mutable.ListBuffer[Build_Cluster.Host]
val getopts = Getopts("""
Usage: isabelle ci_build [OPTIONS] JOB
Options are:
+ -H HOSTS host specifications of the form NAMES:PARAMETERS (separated by commas)
-o OPTION override Isabelle system OPTION (via NAME=VAL or NAME)
- Runs Isabelle builds in ci environment, with the following build jobs:
+ Runs Isabelle builds in ci environment. For cluster builds, build hosts must
+ be passed explicitly (no registry).
+
+ The following build jobs are available:
""" + Library.indent_lines(4, show_jobs) + "\n",
- "o:" -> (arg => options = options + arg))
+ "o:" -> (arg => options = options + arg),
+ "H:" -> (arg => build_hosts ++= Build_Cluster.Host.parse(Registry.load(Nil), arg)))
val more_args = getopts(args)
@@ -260,7 +274,7 @@
case _ => getopts.usage()
}
- ci_build(options, job)
+ ci_build(options, build_hosts.toList, job)
})
}
--- a/src/Pure/Build/build_manager.scala Sun Jun 09 21:16:38 2024 +0200
+++ b/src/Pure/Build/build_manager.scala Sun Jun 09 22:40:13 2024 +0200
@@ -42,7 +42,10 @@
case class CI_Build(name: String, components: List[Component]) extends Build_Config {
def fresh_build: Boolean = true
- def command(build_hosts: List[Build_Cluster.Host]): String = " ci_build " + name
+ def command(build_hosts: List[Build_Cluster.Host]): String =
+ " ci_build" +
+ build_hosts.map(host => " -H " + Bash.string(host.print)).mkString +
+ " " + name
}
object User_Build {
@@ -72,7 +75,7 @@
" build" +
if_proper(afp_rev, " -A:") +
base_sessions.map(session => " -B " + Bash.string(session)).mkString +
- if_proper(build_hosts, build_hosts.map(host => " -H " + Bash.string(host.print)).mkString) +
+ build_hosts.map(host => " -H " + Bash.string(host.print)).mkString +
if_proper(presentation, " -P:") +
if_proper(requirements, " -R") +
if_proper(all_sessions, " -a") +
@@ -92,6 +95,8 @@
sealed case class Task(
build_config: Build_Config,
+ build_cluster: Boolean,
+ hosts_spec: String,
id: UUID.T = UUID.random(),
submit_date: Date = Date.now(),
priority: Priority = Priority.normal,
@@ -100,6 +105,9 @@
def name: String = id.toString
def kind: String = build_config.name
def components: List[Component] = build_config.components
+
+ def build_hosts: List[Build_Cluster.Host] =
+ Build_Cluster.Host.parse(Registry.global, hosts_spec)
}
sealed case class Job(
@@ -107,6 +115,8 @@
kind: String,
number: Long,
isabelle_rev: String,
+ build_cluster: Boolean,
+ hostnames: List[String],
components: List[Component],
start_date: Date = Date.now(),
cancelled: Boolean = false
@@ -156,12 +166,22 @@
def num_builds = running.size + finished.size
- def next: List[Task] =
- if (pending.isEmpty) Nil
+ def next(build_hosts: List[Build_Cluster.Host]): Option[Task] = {
+ val cluster_running = running.values.exists(_.build_cluster)
+ val available = build_hosts.map(_.hostname).toSet - running.values.flatMap(_.hostnames).toSet
+ val ready =
+ for {
+ (_, task) <- pending
+ if !task.build_cluster || !cluster_running
+ if task.build_hosts.map(_.hostname).forall(available.contains)
+ } yield task
+
+ if (ready.isEmpty) None
else {
- val priority = pending.values.map(_.priority).maxBy(_.ordinal)
- pending.values.filter(_.priority == priority).toList.sortBy(_.submit_date)(Date.Ordering)
+ val priority = ready.map(_.priority).maxBy(_.ordinal)
+ ready.filter(_.priority == priority).toList.sortBy(_.submit_date)(Date.Ordering).headOption
}
+ }
def add_running(job: Job): State = copy(running = running + (job.name -> job))
def remove_running(name: String): State = copy(running = running - name)
@@ -255,6 +275,8 @@
object Pending {
val kind = SQL.Column.string("kind")
+ val build_cluster = SQL.Column.bool("build_cluster")
+ val hosts_spec = SQL.Column.string("hosts_spec")
val id = SQL.Column.string("id").make_primary_key
val submit_date = SQL.Column.date("submit_date")
val priority = SQL.Column.string("priority")
@@ -277,10 +299,10 @@
val verbose = SQL.Column.bool("verbose")
val table =
- make_table(List(kind, id, submit_date, priority, isabelle_rev, components, prefs,
- requirements, all_sessions, base_sessions, exclude_session_groups, exclude_sessions,
- session_groups, sessions, build_heap, clean_build, export_files, fresh_build,
- presentation, verbose),
+ make_table(List(kind, build_cluster, hosts_spec, id, submit_date, priority, isabelle_rev,
+ components, prefs, requirements, all_sessions, base_sessions, exclude_session_groups,
+ exclude_sessions, session_groups, sessions, build_heap, clean_build, export_files,
+ fresh_build, presentation, verbose),
name = "pending")
}
@@ -288,6 +310,8 @@
db.execute_query_statement(Pending.table.select(), Map.from[String, Task], get =
{ res =>
val kind = res.string(Pending.kind)
+ val build_cluster = res.bool(Pending.build_cluster)
+ val hosts_spec = res.string(Pending.hosts_spec)
val id = res.string(Pending.id)
val submit_date = res.date(Pending.submit_date)
val priority = Priority.valueOf(res.string(Pending.priority))
@@ -319,7 +343,8 @@
clean_build, export_files, fresh_build, presentation, verbose)
}
- val task = Task(build_config, UUID.make(id), submit_date, priority, isabelle_rev)
+ val task = Task(build_config, build_cluster, hosts_spec, UUID.make(id), submit_date,
+ priority, isabelle_rev)
task.name -> task
})
@@ -340,11 +365,13 @@
for (name <- update.insert) yield { (stmt: SQL.Statement) =>
val task = pending(name)
stmt.string(1) = task.kind
- stmt.string(2) = task.id.toString
- stmt.date(3) = task.submit_date
- stmt.string(4) = task.priority.toString
- stmt.string(5) = task.isabelle_rev
- stmt.string(6) = task.components.mkString(",")
+ stmt.bool(2) = task.build_cluster
+ stmt.string(3) = task.hosts_spec
+ stmt.string(4) = task.id.toString
+ stmt.date(5) = task.submit_date
+ stmt.string(6) = task.priority.toString
+ stmt.string(7) = task.isabelle_rev
+ stmt.string(8) = task.components.mkString(",")
def get[A](f: User_Build => A): Option[A] =
task.build_config match {
@@ -352,20 +379,20 @@
case _ => None
}
- stmt.string(7) = get(user_build => user_build.prefs.map(_.print).mkString(","))
- stmt.bool(8) = get(_.requirements)
- stmt.bool(9) = get(_.all_sessions)
- stmt.string(10) = get(_.base_sessions.mkString(","))
- stmt.string(11) = get(_.exclude_session_groups.mkString(","))
- stmt.string(12) = get(_.exclude_sessions.mkString(","))
- stmt.string(13) = get(_.session_groups.mkString(","))
- stmt.string(14) = get(_.sessions.mkString(","))
- stmt.bool(15) = get(_.build_heap)
- stmt.bool(16) = get(_.clean_build)
- stmt.bool(17) = get(_.export_files)
- stmt.bool(18) = get(_.fresh_build)
- stmt.bool(19) = get(_.presentation)
- stmt.bool(20) = get(_.verbose)
+ stmt.string(9) = get(user_build => user_build.prefs.map(_.print).mkString(","))
+ stmt.bool(10) = get(_.requirements)
+ stmt.bool(11) = get(_.all_sessions)
+ stmt.string(12) = get(_.base_sessions.mkString(","))
+ stmt.string(13) = get(_.exclude_session_groups.mkString(","))
+ stmt.string(14) = get(_.exclude_sessions.mkString(","))
+ stmt.string(15) = get(_.session_groups.mkString(","))
+ stmt.string(16) = get(_.sessions.mkString(","))
+ stmt.bool(17) = get(_.build_heap)
+ stmt.bool(18) = get(_.clean_build)
+ stmt.bool(19) = get(_.export_files)
+ stmt.bool(20) = get(_.fresh_build)
+ stmt.bool(21) = get(_.presentation)
+ stmt.bool(22) = get(_.verbose)
})
}
@@ -380,12 +407,15 @@
val kind = SQL.Column.string("kind")
val number = SQL.Column.long("number")
val isabelle_rev = SQL.Column.string("isabelle_rev")
+ val build_cluster = SQL.Column.bool("build_cluster")
+ val hostnames = SQL.Column.string("hostnames")
val components = SQL.Column.string("components")
val start_date = SQL.Column.date("start_date")
val cancelled = SQL.Column.bool("cancelled")
val table =
- make_table(List(id, kind, number, isabelle_rev, components, start_date, cancelled),
+ make_table(List(id, kind, number, isabelle_rev, build_cluster, hostnames, components,
+ start_date, cancelled),
name = "running")
}
@@ -396,12 +426,14 @@
val kind = res.string(Running.kind)
val number = res.long(Running.number)
val isabelle_rev = res.string(Running.isabelle_rev)
+ val build_cluster = res.bool(Running.build_cluster)
+ val hostnames = space_explode(',', res.string(Running.hostnames))
val components = space_explode(',', res.string(Running.components)).map(Component.parse)
val start_date = res.date(Running.start_date)
val cancelled = res.bool(Running.cancelled)
- val job =
- Job(UUID.make(id), kind, number, isabelle_rev, components, start_date, cancelled)
+ val job = Job(UUID.make(id), kind, number, isabelle_rev, build_cluster, hostnames,
+ components, start_date, cancelled)
job.name -> job
})
@@ -425,9 +457,11 @@
stmt.string(2) = job.kind
stmt.long(3) = job.number
stmt.string(4) = job.isabelle_rev
- stmt.string(5) = job.components.mkString(",")
- stmt.date(6) = job.start_date
- stmt.bool(7) = job.cancelled
+ stmt.bool(5) = job.build_cluster
+ stmt.string(6) = job.hostnames.mkString(",")
+ stmt.string(7) = job.components.mkString(",")
+ stmt.date(8) = job.start_date
+ stmt.bool(9) = job.cancelled
})
}
update
@@ -567,42 +601,49 @@
}
class State private(
- processes: Map[String, Future[Bash.Process]],
- results: Map[String, Future[Process_Result]]
+ process_futures: Map[String, Future[Build_Process]],
+ result_futures: Map[String, Future[Process_Result]]
) {
- def is_empty = processes.isEmpty && results.isEmpty
+ def is_empty = process_futures.isEmpty && result_futures.isEmpty
- def init(build_config: Build_Config, job: Job, context: Context): State = {
- val process = Future.fork(context.process(build_config))
- val result =
+ def init(context: Context): State = {
+ val process_future = Future.fork(Build_Process.open(context))
+ val result_future =
Future.fork(
- process.join_result match {
- case Exn.Res(res) => context.run(res)
- case Exn.Exn(_) => Process_Result(Process_Result.RC.interrupt)
+ process_future.join_result match {
+ case Exn.Res(process) => process.run()
+ case Exn.Exn(exn) => Process_Result(Process_Result.RC.interrupt).error(exn.getMessage)
})
- new State(processes + (job.name -> process), results + (job.name -> result))
+ new State(
+ process_futures + (context.name -> process_future),
+ result_futures + (context.name -> result_future))
}
- def running: List[String] = processes.keys.toList
+ def running: List[String] = process_futures.keys.toList
def update: (State, Map[String, Process_Result]) = {
val finished =
- for ((name, future) <- results if future.is_finished) yield name -> future.join
+ for ((name, future) <- result_futures if future.is_finished)
+ yield name ->
+ (future.join_result match {
+ case Exn.Res(result) => result
+ case Exn.Exn(exn) => Process_Result(Process_Result.RC.interrupt).error(exn.getMessage)
+ })
- val processes1 = processes.filterNot((name, _) => finished.contains(name))
- val results1 = results.filterNot((name, _) => finished.contains(name))
+ val process_futures1 = process_futures.filterNot((name, _) => finished.contains(name))
+ val result_futures1 = result_futures.filterNot((name, _) => finished.contains(name))
- (new State(processes1, results1), finished)
+ (new State(process_futures1, result_futures1), finished)
}
def cancel(cancelled: List[String]): State = {
for (name <- cancelled) {
- val process = processes(name)
- if (process.is_finished) process.join.interrupt()
- else process.cancel()
+ val process_future = process_futures(name)
+ if (process_future.is_finished) process_future.join.cancel()
+ else process_future.cancel()
}
- new State(processes.filterNot((name, _) => cancelled.contains(name)), results)
+ new State(process_futures.filterNot((name, _) => cancelled.contains(name)), result_futures)
}
}
}
@@ -627,54 +668,55 @@
}
}
- private def start_next(): Option[(Build_Config, Job)] =
- synchronized_database("start_job") {
- _state.next.headOption.flatMap { task =>
+ private def start_next(): Option[Context] =
+ synchronized_database("start_next") {
+ _state.next(build_hosts).flatMap { task =>
progress.echo("Initializing " + task.name)
_state = _state.remove_pending(task.name)
- val context = Context(store, task, build_hosts)
val number = _state.next_number(task.kind)
+ val context = Context(store, task, number)
Exn.capture {
- store.sync_permissions(context.dir)
+ context.init()
+ store.sync_permissions(context.task_dir)
val isabelle_rev =
- sync(isabelle_repository, task.isabelle_rev, context.isabelle_dir)
+ sync(isabelle_repository, task.isabelle_rev, context.task_dir)
+
+ val hostnames = task.build_hosts.map(_.hostname).distinct
val components =
for (component <- task.components)
yield sync_dirs.find(_.name == component.name) match {
case Some(sync_dir) =>
- val target = context.isabelle_dir + sync_dir.target
+ val target = context.task_dir + sync_dir.target
component.copy(rev = sync(sync_dir.hg, component.rev, target))
case None =>
if (component.rev.isEmpty) component
else error("Unknown component " + component)
}
- Job(task.id, task.kind, number, isabelle_rev, components)
+ Job(task.id, task.kind, number, isabelle_rev, task.build_cluster, hostnames, components)
} match {
case Exn.Res(job) =>
_state = _state.add_running(job)
- val context1 = context.move(Context(store, job))
val msg = "Starting " + job.name
echo(msg + " (id " + job.id + ")")
- context1.progress.echo(msg)
+ context.progress.echo(msg)
- Some(task.build_config, job)
+ Some(context)
case Exn.Exn(exn) =>
val result = Result(task.kind, number, Status.aborted)
- val context1 = Context(store, result)
+ _state = _state.add_finished(result)
val msg = "Failed to start job: " + exn.getMessage
echo_error_message(msg)
- context1.progress.echo_error_message(msg)
+ context.progress.echo_error_message(msg)
- context.remove()
- _state = _state.add_finished(result)
+ Isabelle_System.rm_tree(context.task_dir)
None
}
@@ -690,12 +732,11 @@
private def finish_job(name: String, process_result: Process_Result): Unit =
synchronized_database("finish_job") {
val job = _state.running(name)
- val context = Context(store, job, build_hosts)
+ val result = Result(job.kind, job.number, Status.from_result(process_result), Some(job.id))
- val result = Result(job.kind, job.number, Status.from_result(process_result), Some(job.id))
- context.copy_results(Context(store, result))
- context.remove()
- echo("Finished job " + job.id + " with status code " + process_result.rc)
+ val interrupted_error = process_result.interrupted && process_result.err.nonEmpty
+ val err_msg = if_proper(interrupted_error, ": " + process_result.err)
+ echo("Finished job " + job.id + " with status code " + process_result.rc + err_msg)
_state = _state
.remove_running(job.name)
@@ -706,18 +747,18 @@
def init: Runner.State = Runner.State.empty
def loop_body(state: Runner.State): Runner.State = {
- if (state.is_empty && !progress.stopped) {
- start_next() match {
- case None => state
- case Some((build_config, job)) =>
- state.init(build_config, job, Context(store, job, build_hosts))
+ val state1 =
+ if (progress.stopped) state
+ else {
+ start_next() match {
+ case None => state
+ case Some(context) => state.init(context)
+ }
}
- }
- else {
- val (state1, results) = stop_cancelled(state).update
- results.foreach(finish_job)
- state1
- }
+ val state2 = stop_cancelled(state1)
+ val (state3, results) = state2.update
+ results.foreach(finish_job)
+ state3
}
}
@@ -764,7 +805,8 @@
if isabelle_updated || ci_job.components.exists(updated_components.contains)
if !_state.pending.values.exists(_.kind == ci_job.name)
} {
- val task = Task(CI_Build(ci_job), priority = Priority.low, isabelle_rev = "default")
+ val task = Task(CI_Build(ci_job), ci_job.hosts.build_cluster, ci_job.hosts.hosts_spec,
+ priority = Priority.low, isabelle_rev = "default")
_state = _state.add_pending(task)
}
}
@@ -799,7 +841,8 @@
for (ci_job <-ci_jobs)
ci_job.trigger match {
case isabelle.CI_Build.Timed(in_interval) if in_interval(previous, next) =>
- val task = Task(CI_Build(ci_job), isabelle_rev = "default")
+ val task = Task(CI_Build(ci_job), ci_job.hosts.build_cluster, ci_job.hosts.hosts_spec,
+ isabelle_rev = "default")
_state = _state.add_pending(task)
case _ =>
}
@@ -838,17 +881,17 @@
for {
(name, (time, log)) <- logs
if time + keep > Time.now()
- } yield name -> (time, Context(store, state.get(name).get).log)
+ } yield name -> (time, File.read(store.log_file(name)))
}
- def lookup(store: Store, elem: T): String = synchronized {
- logs.get(elem.name) match {
+ def lookup(store: Store, name: String): String = synchronized {
+ logs.get(name) match {
case Some((_, log)) =>
- logs += elem.name -> (Time.now(), log)
+ logs += name -> (Time.now(), log)
case None =>
- logs += elem.name -> (Time.now(), Context(store, elem).log)
+ logs += name -> (Time.now(), File.read(store.log_file(name)))
}
- logs(elem.name)._2
+ logs(name)._2
}
}
}
@@ -912,7 +955,9 @@
def render_job(job: Job): XML.Body =
par(link_build(job.name, job.number) :: text(": running since " + job.start_date)) ::
- render_if(finished.headOption.exists(_.status != Status.ok), render_previous(finished))
+ render_if(
+ finished.headOption.exists(_.status != Status.ok) && job.kind != User_Build.name,
+ render_previous(finished))
def render_result(result: Result): XML.Body =
par(
@@ -974,11 +1019,11 @@
if (job.cancelled) text("Cancelling...")
else text("Running...") ::: render_cancel(job.id)) ::
render_rev(job.isabelle_rev, job.components) :::
- source(cache.lookup(store, job)) :: Nil
+ source(cache.lookup(store, job.name)) :: Nil
case result: Result =>
par(text("Date: " + result.date)) ::
par(text("Status: " + result.status)) ::
- source(cache.lookup(store, result)) :: Nil
+ source(cache.lookup(store, result.name)) :: Nil
}
}
@@ -1066,62 +1111,86 @@
/* context */
- object Context {
- def apply(store: Store, elem: T, build_hosts: List[Build_Cluster.Host] = Nil): Context =
- new Context(store, store.dir(elem), build_hosts)
+ case class Context(store: Store, task: Task, number: Long) {
+ def name = task.kind + "/" + number
+ def progress: Progress = new File_Progress(store.log_file(name))
+
+ def task_dir: Path = store.task_dir(task)
+
+ def init(): Unit = Isabelle_System.make_directory(store.log_file(name).dir)
+
+ def isabelle_identifier: String =
+ if (task.build_cluster) store.options.string("build_cluster_identifier") else store.identifier
+
+ def open_ssh(): SSH.System = {
+ if (task.build_cluster) store.open_ssh()
+ else Library.the_single(task.build_hosts).open_ssh(store.options)
+ }
+ }
+
+
+ /* build process */
+
+ object Build_Process {
+ def open(context: Context): Build_Process = new Build_Process(context.open_ssh(), context)
}
- class Context private(store: Store, val dir: Path, val build_hosts: List[Build_Cluster.Host]) {
- def isabelle_dir: Path = dir + Path.basic("isabelle")
+ class Build_Process(ssh: SSH.System, context: Context) {
+ private val task = context.task
+ private val progress = context.progress
+
+
+ /* resources with cleanup operations */
- private val log_file = dir + Path.basic("log")
- val progress = new File_Progress(log_file, verbose = true)
- def log: String =
- Exn.capture(File.read(log_file)) match {
- case Exn.Exn(_) => ""
- case Exn.Res(res) => res
+ private val _dir = ssh.tmp_dir()
+ private val _isabelle =
+ try {
+ val rsync_context = Rsync.Context(ssh = ssh)
+ val source = File.standard_path(context.task_dir)
+ Rsync.exec(rsync_context, clean = true, args = List("--", Url.direct_path(source),
+ rsync_context.target(_dir))).check
+
+ Isabelle_System.rm_tree(context.task_dir)
+ Other_Isabelle(_dir, context.isabelle_identifier, ssh, progress)
}
+ catch { case exn: Throwable => close(); throw exn }
- def move(other: Context): Context = {
- Isabelle_System.make_directory(other.dir.dir)
- Isabelle_System.move_file(dir, other.dir)
- other
- }
+ private val _process =
+ try {
+ val init_components =
+ for {
+ component <- task.build_config.components
+ target = _dir + Sync.DIRS + Path.basic(component.name)
+ if Components.is_component_dir(target)
+ } yield "init_component " + quote(target.absolute.implode)
+
+ _isabelle.init(
+ other_settings = _isabelle.init_components() ::: init_components,
+ fresh = task.build_config.fresh_build, echo = true)
- def copy_results(other: Context): Context = {
- Isabelle_System.make_directory(other.dir)
- Isabelle_System.copy_file(log_file, other.log_file)
- other
+ val cmd = task.build_config.command(task.build_hosts)
+ progress.echo("isabelle" + cmd)
+
+ val script = File.bash_path(Isabelle_Tool.exe(_isabelle.isabelle_home)) + cmd
+ ssh.bash_process(_isabelle.bash_context(script), settings = false)
+ }
+ catch { case exn: Throwable => close(); throw exn }
+
+ def cancel(): Unit = Option(_process).foreach(_.interrupt())
+
+ def close(): Unit = {
+ Option(_dir).foreach(ssh.rm_tree)
+ Isabelle_System.rm_tree(context.task_dir)
+ ssh.close()
}
- def remove(): Unit = Isabelle_System.rm_tree(dir)
- lazy val ssh = store.open_ssh()
-
- def process(build_config: Build_Config): Bash.Process = {
- val isabelle = Other_Isabelle(isabelle_dir, store.identifier, ssh, progress)
-
- val init_components =
- for {
- dir <- build_config.components
- target = isabelle_dir + Sync.DIRS + Path.basic(dir.name)
- if Components.is_component_dir(target)
- } yield "init_component " + quote(target.absolute.implode)
+ /* execution */
- isabelle.init(other_settings = isabelle.init_components() ::: init_components,
- fresh = build_config.fresh_build, echo = true)
-
- val cmd = build_config.command(build_hosts)
- progress.echo("isabelle" + cmd)
-
- val script = File.bash_path(Isabelle_Tool.exe(isabelle.isabelle_home)) + cmd
- ssh.bash_process(isabelle.bash_context(script), settings = false)
- }
-
- def run(process: Bash.Process): Process_Result = {
+ def run(): Process_Result = {
val process_result =
- process.result(progress_stdout = progress.echo(_), progress_stderr = progress.echo(_))
- ssh.close()
+ _process.result(progress_stdout = progress.echo(_), progress_stderr = progress.echo(_))
+ close()
process_result
}
}
@@ -1134,15 +1203,10 @@
val identifier = options.string("build_manager_identifier")
private val pending = base_dir + Path.basic("pending")
- private val running = base_dir + Path.basic("running")
private val finished = base_dir + Path.basic("finished")
- def dir(elem: T): Path =
- elem match {
- case task: Task => pending + Path.basic(task.id.toString)
- case job: Job => running + Path.make(List(job.kind, job.number.toString))
- case result: Result => finished + Path.make(List(result.kind, result.number.toString))
- }
+ def task_dir(task: Task) = pending + Path.basic(task.id.toString)
+ def log_file(name: String): Path = finished + Path.explode(name)
def sync_permissions(dir: Path, ssh: SSH.System = SSH.Local): Unit = {
ssh.execute("chmod -R g+rwx " + File.bash_path(dir))
@@ -1150,8 +1214,7 @@
}
def init_dirs(): Unit =
- List(pending, running, finished).foreach(dir =>
- sync_permissions(Isabelle_System.make_directory(dir)))
+ List(pending, finished).foreach(dir => sync_permissions(Isabelle_System.make_directory(dir)))
val ssh_group: String = options.string("build_manager_ssh_group")
@@ -1242,24 +1305,25 @@
val id = UUID.random()
val afp_rev = if (afp_root.nonEmpty) Some("") else None
- val build_config = User_Build(afp_rev, prefs, requirements, all_sessions, base_sessions,
- exclude_session_groups, exclude_sessions, session_groups, sessions, build_heap, clean_build,
- export_files, fresh_build, presentation, verbose)
- val task = Task(build_config, id, Date.now(), Priority.high)
+ val hosts_spec = options.string("build_manager_cluster")
+ val build_config = User_Build(afp_rev, prefs, requirements, all_sessions,
+ base_sessions, exclude_session_groups, exclude_sessions, session_groups, sessions, build_heap,
+ clean_build, export_files, fresh_build, presentation, verbose)
+ val task = Task(build_config, true, hosts_spec, id, Date.now(), Priority.high)
- val context = Context(store, task)
+ val dir = store.task_dir(task)
progress.interrupt_handler {
using(store.open_ssh()) { ssh =>
val rsync_context = Rsync.Context(ssh = ssh)
progress.echo("Transferring repositories...")
- Sync.sync(store.options, rsync_context, context.isabelle_dir, preserve_jars = true,
+ Sync.sync(store.options, rsync_context, dir, preserve_jars = true,
dirs = Sync.afp_dirs(afp_root), rev = rev)
- store.sync_permissions(context.dir, ssh)
+ store.sync_permissions(dir, ssh)
if (progress.stopped) {
progress.echo("Cancelling submission...")
- ssh.rm_tree(context.dir)
+ ssh.rm_tree(dir)
} else {
using(store.open_postgresql_server()) { server =>
using(store.open_database(server = server)) { db =>
@@ -1307,7 +1371,7 @@
Options are:
-A ROOT include AFP with given root directory (":" for """ + AFP.BASE.implode + """)
-D DIR include extra component in given directory
- -H HOSTS additional cluster host specifications of the form
+ -H HOSTS host specifications for all available hosts of the form
NAMES:PARAMETERS (separated by commas)
-o OPTION override Isabelle system OPTION (via NAME=VAL or NAME)
-p PORT explicit web server port