# HG changeset patch # User Fabian Huch # Date 1717765447 -7200 # Node ID 7987b33fb6c5b9500ebab478581e61890741c25d # Parent 02424b81472a08923d1243d560753ce745873102 clarified context: operations now in build process; diff -r 02424b81472a -r 7987b33fb6c5 src/Pure/Build/build_manager.scala --- a/src/Pure/Build/build_manager.scala Fri Jun 07 14:00:59 2024 +0200 +++ b/src/Pure/Build/build_manager.scala Fri Jun 07 15:04:07 2024 +0200 @@ -572,8 +572,8 @@ ) { def is_empty = process_futures.isEmpty && result_futures.isEmpty - def init(build_config: Build_Config, job: Job, context: Context): State = { - val process_future = Future.fork(Build_Process.open(build_config, context)) + def init(context: Context): State = { + val process_future = Future.fork(Build_Process.open(context)) val result_future = Future.fork( process_future.join_result match { @@ -581,8 +581,8 @@ case Exn.Exn(_) => Process_Result(Process_Result.RC.interrupt) }) new State( - process_futures + (job.name -> process_future), - result_futures + (job.name -> result_future)) + process_futures + (context.name -> process_future), + result_futures + (context.name -> result_future)) } def running: List[String] = process_futures.keys.toList @@ -629,27 +629,28 @@ } } - private def start_next(): Option[(Build_Config, Job)] = - synchronized_database("start_job") { + private def start_next(): Option[Context] = + synchronized_database("start_next") { _state.next.headOption.flatMap { task => progress.echo("Initializing " + task.name) _state = _state.remove_pending(task.name) - val context = Context(store, task, build_hosts) val number = _state.next_number(task.kind) + val context = Context(store, task, number, build_hosts) Exn.capture { - store.sync_permissions(context.dir) + context.init() + store.sync_permissions(context.task_dir) val isabelle_rev = - sync(isabelle_repository, task.isabelle_rev, context.dir) + sync(isabelle_repository, task.isabelle_rev, context.task_dir) val components = for (component <- task.components) yield sync_dirs.find(_.name == component.name) match { case Some(sync_dir) => - val target = context.dir + sync_dir.target + val target = context.task_dir + sync_dir.target component.copy(rev = sync(sync_dir.hg, component.rev, target)) case None => if (component.rev.isEmpty) component @@ -660,23 +661,21 @@ } match { case Exn.Res(job) => _state = _state.add_running(job) - val context1 = context.move(Context(store, job)) val msg = "Starting " + job.name echo(msg + " (id " + job.id + ")") - context1.progress.echo(msg) + context.progress.echo(msg) - Some(task.build_config, job) + Some(context) case Exn.Exn(exn) => val result = Result(task.kind, number, Status.aborted) - val context1 = Context(store, result) + _state = _state.add_finished(result) val msg = "Failed to start job: " + exn.getMessage echo_error_message(msg) - context1.progress.echo_error_message(msg) + context.progress.echo_error_message(msg) - context.remove() - _state = _state.add_finished(result) + Isabelle_System.rm_tree(context.task_dir) None } @@ -692,11 +691,8 @@ private def finish_job(name: String, process_result: Process_Result): Unit = synchronized_database("finish_job") { val job = _state.running(name) - val context = Context(store, job, build_hosts) + val result = Result(job.kind, job.number, Status.from_result(process_result), Some(job.id)) - val result = Result(job.kind, job.number, Status.from_result(process_result), Some(job.id)) - context.copy_results(Context(store, result)) - context.remove() echo("Finished job " + job.id + " with status code " + process_result.rc) _state = _state @@ -711,8 +707,7 @@ if (state.is_empty && !progress.stopped) { start_next() match { case None => state - case Some((build_config, job)) => - state.init(build_config, job, Context(store, job, build_hosts)) + case Some(context) => state.init(context) } } else { @@ -840,17 +835,17 @@ for { (name, (time, log)) <- logs if time + keep > Time.now() - } yield name -> (time, Context(store, state.get(name).get).log) + } yield name -> (time, File.read(store.log_file(name))) } - def lookup(store: Store, elem: T): String = synchronized { - logs.get(elem.name) match { + def lookup(store: Store, name: String): String = synchronized { + logs.get(name) match { case Some((_, log)) => - logs += elem.name -> (Time.now(), log) + logs += name -> (Time.now(), log) case None => - logs += elem.name -> (Time.now(), Context(store, elem).log) + logs += name -> (Time.now(), File.read(store.log_file(name))) } - logs(elem.name)._2 + logs(name)._2 } } } @@ -976,11 +971,11 @@ if (job.cancelled) text("Cancelling...") else text("Running...") ::: render_cancel(job.id)) :: render_rev(job.isabelle_rev, job.components) ::: - source(cache.lookup(store, job)) :: Nil + source(cache.lookup(store, job.name)) :: Nil case result: Result => par(text("Date: " + result.date)) :: par(text("Status: " + result.status)) :: - source(cache.lookup(store, result)) :: Nil + source(cache.lookup(store, result.name)) :: Nil } } @@ -1068,58 +1063,42 @@ /* context */ - object Context { - def apply(store: Store, elem: T, build_hosts: List[Build_Cluster.Host] = Nil): Context = - new Context(store, store.dir(elem), build_hosts) - } - - class Context private(store: Store, val dir: Path, val build_hosts: List[Build_Cluster.Host]) { + case class Context( + store: Store, + task: Task, + number: Long, + build_hosts: List[Build_Cluster.Host] + ) { + def name: String = task.kind + "/" + number + def init(): Unit = Isabelle_System.make_directory(store.log_file(name).dir) + def task_dir: Path = store.task_dir(task) + def components: List[Component] = task.build_config.components def isabelle_identifier: String = store.identifier - - private val log_file = dir + Path.basic("log") - val progress = new File_Progress(log_file, verbose = true) - def log: String = - Exn.capture(File.read(log_file)) match { - case Exn.Exn(_) => "" - case Exn.Res(res) => res - } - - def move(other: Context): Context = { - Isabelle_System.make_directory(other.dir.dir) - Isabelle_System.move_file(dir, other.dir) - other - } - - def copy_results(other: Context): Context = { - Isabelle_System.make_directory(other.dir) - Isabelle_System.copy_file(log_file, other.log_file) - other - } - - def remove(): Unit = Isabelle_System.rm_tree(dir) - - def ssh = store.open_ssh() + def fresh_build: Boolean = task.build_config.fresh_build + def command: String = task.build_config.command(build_hosts) + def progress: Progress = new File_Progress(store.log_file(name)) + def open_ssh(): SSH.Session = store.open_ssh() } /* build process */ object Build_Process { - def open(build_config: Build_Config, context: Context): Build_Process = - new Build_Process(context.ssh, build_config, context) + def open(context: Context): Build_Process = new Build_Process(context.open_ssh(), context) } - class Build_Process(ssh: SSH.Session, build_config: Build_Config, context: Context) { + class Build_Process(ssh: SSH.Session, context: Context) { private val _dir = ssh.tmp_dir() private val _progress = context.progress private val _isabelle = try { val rsync_context = Rsync.Context(ssh = ssh) - val source = File.standard_path(context.dir) + val source = File.standard_path(context.task_dir) Rsync.exec(rsync_context, clean = true, args = List("--", Url.direct_path(source), rsync_context.target(_dir))).check + Isabelle_System.rm_tree(context.task_dir) Other_Isabelle(_dir, context.isabelle_identifier, ssh, _progress) } catch { case exn: Throwable => close(); throw exn } @@ -1128,16 +1107,16 @@ try { val init_components = for { - component <- build_config.components + component <- context.components target = _dir + Sync.DIRS + Path.basic(component.name) if Components.is_component_dir(target) } yield "init_component " + quote(target.absolute.implode) _isabelle.init( other_settings = _isabelle.init_components() ::: init_components, - fresh = build_config.fresh_build, echo = true) + fresh = context.fresh_build, echo = true) - val cmd = build_config.command(context.build_hosts) + val cmd = context.command _progress.echo("isabelle" + cmd) val script = File.bash_path(Isabelle_Tool.exe(_isabelle.isabelle_home)) + cmd @@ -1156,6 +1135,7 @@ def close(): Unit = { Option(_dir).foreach(ssh.rm_tree) + Isabelle_System.rm_tree(context.task_dir) ssh.close() } } @@ -1168,15 +1148,10 @@ val identifier = options.string("build_manager_identifier") private val pending = base_dir + Path.basic("pending") - private val running = base_dir + Path.basic("running") private val finished = base_dir + Path.basic("finished") - def dir(elem: T): Path = - elem match { - case task: Task => pending + Path.basic(task.id.toString) - case job: Job => running + Path.make(List(job.kind, job.number.toString)) - case result: Result => finished + Path.make(List(result.kind, result.number.toString)) - } + def task_dir(task: Task) = pending + Path.basic(task.id.toString) + def log_file(name: String): Path = finished + Path.explode(name) def sync_permissions(dir: Path, ssh: SSH.System = SSH.Local): Unit = { ssh.execute("chmod -R g+rwx " + File.bash_path(dir)) @@ -1184,8 +1159,7 @@ } def init_dirs(): Unit = - List(pending, running, finished).foreach(dir => - sync_permissions(Isabelle_System.make_directory(dir))) + List(pending, finished).foreach(dir => sync_permissions(Isabelle_System.make_directory(dir))) val ssh_group: String = options.string("build_manager_ssh_group") @@ -1281,19 +1255,19 @@ export_files, fresh_build, presentation, verbose) val task = Task(build_config, id, Date.now(), Priority.high) - val context = Context(store, task) + val dir = store.task_dir(task) progress.interrupt_handler { using(store.open_ssh()) { ssh => val rsync_context = Rsync.Context(ssh = ssh) progress.echo("Transferring repositories...") - Sync.sync(store.options, rsync_context, context.dir, preserve_jars = true, + Sync.sync(store.options, rsync_context, dir, preserve_jars = true, dirs = Sync.afp_dirs(afp_root), rev = rev) - store.sync_permissions(context.dir, ssh) + store.sync_permissions(dir, ssh) if (progress.stopped) { progress.echo("Cancelling submission...") - ssh.rm_tree(context.dir) + ssh.rm_tree(dir) } else { using(store.open_postgresql_server()) { server => using(store.open_database(server = server)) { db =>