--- a/src/Pure/Build/build_manager.scala Fri Jun 07 14:00:59 2024 +0200
+++ b/src/Pure/Build/build_manager.scala Fri Jun 07 15:04:07 2024 +0200
@@ -572,8 +572,8 @@
) {
def is_empty = process_futures.isEmpty && result_futures.isEmpty
- def init(build_config: Build_Config, job: Job, context: Context): State = {
- val process_future = Future.fork(Build_Process.open(build_config, context))
+ def init(context: Context): State = {
+ val process_future = Future.fork(Build_Process.open(context))
val result_future =
Future.fork(
process_future.join_result match {
@@ -581,8 +581,8 @@
case Exn.Exn(_) => Process_Result(Process_Result.RC.interrupt)
})
new State(
- process_futures + (job.name -> process_future),
- result_futures + (job.name -> result_future))
+ process_futures + (context.name -> process_future),
+ result_futures + (context.name -> result_future))
}
def running: List[String] = process_futures.keys.toList
@@ -629,27 +629,28 @@
}
}
- private def start_next(): Option[(Build_Config, Job)] =
- synchronized_database("start_job") {
+ private def start_next(): Option[Context] =
+ synchronized_database("start_next") {
_state.next.headOption.flatMap { task =>
progress.echo("Initializing " + task.name)
_state = _state.remove_pending(task.name)
- val context = Context(store, task, build_hosts)
val number = _state.next_number(task.kind)
+ val context = Context(store, task, number, build_hosts)
Exn.capture {
- store.sync_permissions(context.dir)
+ context.init()
+ store.sync_permissions(context.task_dir)
val isabelle_rev =
- sync(isabelle_repository, task.isabelle_rev, context.dir)
+ sync(isabelle_repository, task.isabelle_rev, context.task_dir)
val components =
for (component <- task.components)
yield sync_dirs.find(_.name == component.name) match {
case Some(sync_dir) =>
- val target = context.dir + sync_dir.target
+ val target = context.task_dir + sync_dir.target
component.copy(rev = sync(sync_dir.hg, component.rev, target))
case None =>
if (component.rev.isEmpty) component
@@ -660,23 +661,21 @@
} match {
case Exn.Res(job) =>
_state = _state.add_running(job)
- val context1 = context.move(Context(store, job))
val msg = "Starting " + job.name
echo(msg + " (id " + job.id + ")")
- context1.progress.echo(msg)
+ context.progress.echo(msg)
- Some(task.build_config, job)
+ Some(context)
case Exn.Exn(exn) =>
val result = Result(task.kind, number, Status.aborted)
- val context1 = Context(store, result)
+ _state = _state.add_finished(result)
val msg = "Failed to start job: " + exn.getMessage
echo_error_message(msg)
- context1.progress.echo_error_message(msg)
+ context.progress.echo_error_message(msg)
- context.remove()
- _state = _state.add_finished(result)
+ Isabelle_System.rm_tree(context.task_dir)
None
}
@@ -692,11 +691,8 @@
private def finish_job(name: String, process_result: Process_Result): Unit =
synchronized_database("finish_job") {
val job = _state.running(name)
- val context = Context(store, job, build_hosts)
+ val result = Result(job.kind, job.number, Status.from_result(process_result), Some(job.id))
- val result = Result(job.kind, job.number, Status.from_result(process_result), Some(job.id))
- context.copy_results(Context(store, result))
- context.remove()
echo("Finished job " + job.id + " with status code " + process_result.rc)
_state = _state
@@ -711,8 +707,7 @@
if (state.is_empty && !progress.stopped) {
start_next() match {
case None => state
- case Some((build_config, job)) =>
- state.init(build_config, job, Context(store, job, build_hosts))
+ case Some(context) => state.init(context)
}
}
else {
@@ -840,17 +835,17 @@
for {
(name, (time, log)) <- logs
if time + keep > Time.now()
- } yield name -> (time, Context(store, state.get(name).get).log)
+ } yield name -> (time, File.read(store.log_file(name)))
}
- def lookup(store: Store, elem: T): String = synchronized {
- logs.get(elem.name) match {
+ def lookup(store: Store, name: String): String = synchronized {
+ logs.get(name) match {
case Some((_, log)) =>
- logs += elem.name -> (Time.now(), log)
+ logs += name -> (Time.now(), log)
case None =>
- logs += elem.name -> (Time.now(), Context(store, elem).log)
+ logs += name -> (Time.now(), File.read(store.log_file(name)))
}
- logs(elem.name)._2
+ logs(name)._2
}
}
}
@@ -976,11 +971,11 @@
if (job.cancelled) text("Cancelling...")
else text("Running...") ::: render_cancel(job.id)) ::
render_rev(job.isabelle_rev, job.components) :::
- source(cache.lookup(store, job)) :: Nil
+ source(cache.lookup(store, job.name)) :: Nil
case result: Result =>
par(text("Date: " + result.date)) ::
par(text("Status: " + result.status)) ::
- source(cache.lookup(store, result)) :: Nil
+ source(cache.lookup(store, result.name)) :: Nil
}
}
@@ -1068,58 +1063,42 @@
/* context */
- object Context {
- def apply(store: Store, elem: T, build_hosts: List[Build_Cluster.Host] = Nil): Context =
- new Context(store, store.dir(elem), build_hosts)
- }
-
- class Context private(store: Store, val dir: Path, val build_hosts: List[Build_Cluster.Host]) {
+ case class Context(
+ store: Store,
+ task: Task,
+ number: Long,
+ build_hosts: List[Build_Cluster.Host]
+ ) {
+ def name: String = task.kind + "/" + number
+ def init(): Unit = Isabelle_System.make_directory(store.log_file(name).dir)
+ def task_dir: Path = store.task_dir(task)
+ def components: List[Component] = task.build_config.components
def isabelle_identifier: String = store.identifier
-
- private val log_file = dir + Path.basic("log")
- val progress = new File_Progress(log_file, verbose = true)
- def log: String =
- Exn.capture(File.read(log_file)) match {
- case Exn.Exn(_) => ""
- case Exn.Res(res) => res
- }
-
- def move(other: Context): Context = {
- Isabelle_System.make_directory(other.dir.dir)
- Isabelle_System.move_file(dir, other.dir)
- other
- }
-
- def copy_results(other: Context): Context = {
- Isabelle_System.make_directory(other.dir)
- Isabelle_System.copy_file(log_file, other.log_file)
- other
- }
-
- def remove(): Unit = Isabelle_System.rm_tree(dir)
-
- def ssh = store.open_ssh()
+ def fresh_build: Boolean = task.build_config.fresh_build
+ def command: String = task.build_config.command(build_hosts)
+ def progress: Progress = new File_Progress(store.log_file(name))
+ def open_ssh(): SSH.Session = store.open_ssh()
}
/* build process */
object Build_Process {
- def open(build_config: Build_Config, context: Context): Build_Process =
- new Build_Process(context.ssh, build_config, context)
+ def open(context: Context): Build_Process = new Build_Process(context.open_ssh(), context)
}
- class Build_Process(ssh: SSH.Session, build_config: Build_Config, context: Context) {
+ class Build_Process(ssh: SSH.Session, context: Context) {
private val _dir = ssh.tmp_dir()
private val _progress = context.progress
private val _isabelle =
try {
val rsync_context = Rsync.Context(ssh = ssh)
- val source = File.standard_path(context.dir)
+ val source = File.standard_path(context.task_dir)
Rsync.exec(rsync_context, clean = true, args = List("--", Url.direct_path(source),
rsync_context.target(_dir))).check
+ Isabelle_System.rm_tree(context.task_dir)
Other_Isabelle(_dir, context.isabelle_identifier, ssh, _progress)
}
catch { case exn: Throwable => close(); throw exn }
@@ -1128,16 +1107,16 @@
try {
val init_components =
for {
- component <- build_config.components
+ component <- context.components
target = _dir + Sync.DIRS + Path.basic(component.name)
if Components.is_component_dir(target)
} yield "init_component " + quote(target.absolute.implode)
_isabelle.init(
other_settings = _isabelle.init_components() ::: init_components,
- fresh = build_config.fresh_build, echo = true)
+ fresh = context.fresh_build, echo = true)
- val cmd = build_config.command(context.build_hosts)
+ val cmd = context.command
_progress.echo("isabelle" + cmd)
val script = File.bash_path(Isabelle_Tool.exe(_isabelle.isabelle_home)) + cmd
@@ -1156,6 +1135,7 @@
def close(): Unit = {
Option(_dir).foreach(ssh.rm_tree)
+ Isabelle_System.rm_tree(context.task_dir)
ssh.close()
}
}
@@ -1168,15 +1148,10 @@
val identifier = options.string("build_manager_identifier")
private val pending = base_dir + Path.basic("pending")
- private val running = base_dir + Path.basic("running")
private val finished = base_dir + Path.basic("finished")
- def dir(elem: T): Path =
- elem match {
- case task: Task => pending + Path.basic(task.id.toString)
- case job: Job => running + Path.make(List(job.kind, job.number.toString))
- case result: Result => finished + Path.make(List(result.kind, result.number.toString))
- }
+ def task_dir(task: Task) = pending + Path.basic(task.id.toString)
+ def log_file(name: String): Path = finished + Path.explode(name)
def sync_permissions(dir: Path, ssh: SSH.System = SSH.Local): Unit = {
ssh.execute("chmod -R g+rwx " + File.bash_path(dir))
@@ -1184,8 +1159,7 @@
}
def init_dirs(): Unit =
- List(pending, running, finished).foreach(dir =>
- sync_permissions(Isabelle_System.make_directory(dir)))
+ List(pending, finished).foreach(dir => sync_permissions(Isabelle_System.make_directory(dir)))
val ssh_group: String = options.string("build_manager_ssh_group")
@@ -1281,19 +1255,19 @@
export_files, fresh_build, presentation, verbose)
val task = Task(build_config, id, Date.now(), Priority.high)
- val context = Context(store, task)
+ val dir = store.task_dir(task)
progress.interrupt_handler {
using(store.open_ssh()) { ssh =>
val rsync_context = Rsync.Context(ssh = ssh)
progress.echo("Transferring repositories...")
- Sync.sync(store.options, rsync_context, context.dir, preserve_jars = true,
+ Sync.sync(store.options, rsync_context, dir, preserve_jars = true,
dirs = Sync.afp_dirs(afp_root), rev = rev)
- store.sync_permissions(context.dir, ssh)
+ store.sync_permissions(dir, ssh)
if (progress.stopped) {
progress.echo("Cancelling submission...")
- ssh.rm_tree(context.dir)
+ ssh.rm_tree(dir)
} else {
using(store.open_postgresql_server()) { server =>
using(store.open_database(server = server)) { db =>