src/Pure/Build/build_cluster.scala
author wenzelm
Thu, 22 Feb 2024 17:24:43 +0100
changeset 79704 512d701d0df9
parent 79681 df1059ea8846
child 79948 8fe1ed4b5705
permissions -rw-r--r--
tuned signature;

/*  Title:      Pure/Build/build_cluster.scala
    Author:     Makarius

Management of build cluster: independent ssh hosts with access to shared
PostgreSQL server.

NB: extensible classes allow quite different implementations in user-space,
via the service class Build.Engine and overridable methods
Build.Engine.open_build_process(), Build_Process.open_build_cluster().
*/

package isabelle


object Build_Cluster {
  /* host specifications */

  object Host {
    private val rfc822_specials = """()<>@,;:\"[]"""

    private val HOSTNAME = "hostname"
    private val USER = "user"
    private val PORT = "port"
    private val JOBS = "jobs"
    private val NUMA = "numa"
    private val DIRS = "dirs"
    private val HOME = "home"
    private val SHARED = "shared"

    val parameters: Options =
      Options.inline("""
        option hostname : string = "" -- "explicit SSH hostname"
        option user : string = ""     -- "explicit SSH user"
        option port : int = 0         -- "explicit SSH port"
        option jobs : int = 1         -- "maximum number of parallel jobs"
        option numa : bool = false    -- "cyclic shuffling of NUMA CPU nodes"
        option dirs : string = ""     -- "additional session directories (separated by colon)"
        option home : string = ""     -- "alternative user home (via $USER_HOME)"
        option shared : bool = false  -- "shared home directory: omit sync + init"
      """)

    def is_parameter(spec: Options.Spec): Boolean = parameters.defined(spec.name)

    lazy val test_options: Options = Options.init0()

    def apply(
      name: String = "",
      hostname: String = parameters.string(HOSTNAME),
      user: String = parameters.string(USER),
      port: Int = parameters.int(PORT),
      jobs: Int = parameters.int(JOBS),
      numa: Boolean = parameters.bool(NUMA),
      dirs: String = parameters.string(DIRS),
      home: String = parameters.string(HOME),
      shared: Boolean = parameters.bool(SHARED),
      options: List[Options.Spec] = Nil
    ): Host = {
      new Host(name, hostname, user, port, jobs, numa, dirs, home, shared, options)
    }

    def parse(registry: Registry, str: String): List[Host] = {
      def err(msg: String): Nothing =
        cat_error(msg, "The error(s) above occurred in host specification " + quote(str))

      val names = str.takeWhile(c => !rfc822_specials.contains(c) || c == ',')
      val more_specs =
        try {
          val n = str.length
          val m = names.length
          val l =
            if (m == n) n
            else if (str(m) == ':') m + 1
            else error("Missing \":\" after host name")
          Options.Spec.parse(str.substring(l))
        }
        catch { case ERROR(msg) => err(msg) }

      def get_registry(a: String): Registry.Cluster.Value =
        Registry.Cluster.try_unqualify(a) match {
          case Some(b) => Registry.Cluster.get(registry, b)
          case None => List(a -> Registry.Host.get(registry, a))
        }

      for (name <- space_explode(',', names); (host, host_specs) <- get_registry(name))
      yield {
        val (params, options) =
          try {
            val (specs1, specs2) = (host_specs ::: more_specs).partition(is_parameter)
            (parameters ++ specs1, { test_options ++ specs2; specs2 })
          }
          catch { case ERROR(msg) => err(msg) }

        apply(name = host,
          hostname = params.string(HOSTNAME),
          user = params.string(USER),
          port = params.int(PORT),
          jobs = params.int(JOBS),
          numa = params.bool(NUMA),
          dirs = params.string(DIRS),
          home = params.string(HOME),
          shared = params.bool(SHARED),
          options = options)
      }
    }

    def parse_single(registry: Registry, str: String): Host =
      Library.the_single(parse(registry, str), message = "Single host expected: " + quote(str))
  }

  class Host(
    val name: String,
    val hostname: String,
    val user: String,
    val port: Int,
    val jobs: Int,
    val numa: Boolean,
    val dirs: String,
    val home: String,
    val shared: Boolean,
    val options: List[Options.Spec]
  ) {
    host =>

    Path.split(host.dirs)  // check

    def ssh_host: String = proper_string(host.hostname) getOrElse host.name
    def is_local: Boolean = SSH.is_local(ssh_host)
    def is_remote: Boolean = !is_local

    override def toString: String = print

    def print: String = {
      val params =
        List(
          if (host.hostname.isEmpty) "" else Options.Spec.print(Host.HOSTNAME, host.hostname),
          if (host.user.isEmpty) "" else Options.Spec.print(Host.USER, host.user),
          if (host.port == 0) "" else Options.Spec.print(Host.PORT, host.port.toString),
          if (host.jobs == 1) "" else Options.Spec.print(Host.JOBS, host.jobs.toString),
          if_proper(host.numa, Host.NUMA),
          if_proper(host.dirs, Options.Spec.print(Host.DIRS, host.dirs)),
          if_proper(host.home, Options.Spec.print(Host.HOME, host.home)),
          if_proper(host.shared, Host.SHARED)
        ).filter(_.nonEmpty)
      val rest = (params ::: host.options.map(_.print)).mkString(",")

      SSH.print_local(host.name) + if_proper(rest, ":" + rest)
    }

    def message(msg: String): String = "Host " + quote(host.name) + if_proper(msg, ": " + msg)

    def open_ssh(options: Options): SSH.System =
      SSH.open_system(options ++ host.options, ssh_host, port = host.port,
        user = host.user, user_home = host.home)

    def open_session(build_context: Build.Context, progress: Progress = new Progress): Session = {
      val ssh_options = build_context.build_options ++ host.options
      val ssh = open_ssh(build_context.build_options)
      new Session(build_context, host, ssh_options, ssh, progress)
    }
  }


  /* SSH sessions */

  class Session(
    val build_context: Build.Context,
    val host: Host,
    val options: Options,
    val ssh: SSH.System,
    val progress: Progress
  ) extends AutoCloseable {
    override def toString: String = ssh.toString

    val build_cluster_identifier: String = options.string("build_cluster_identifier")
    val build_cluster_root: Path = Path.explode(options.string("build_cluster_root"))
    val built_cluster_isabelle_home: Path = build_cluster_root + Path.explode("isabelle")
    val build_cluster_afp_root: Option[Path] =
      if (build_context.afp_root.isEmpty) None
      else Some(built_cluster_isabelle_home + Path.explode("AFP"))

    lazy val build_cluster_isabelle: Other_Isabelle =
      Other_Isabelle(built_cluster_isabelle_home,
        isabelle_identifier = build_cluster_identifier, ssh = ssh)

    def sync(): Other_Isabelle = {
      Sync.sync(options, Rsync.Context(ssh = ssh), built_cluster_isabelle_home,
        afp_root = build_context.afp_root,
        purge_heaps = true,
        preserve_jars = true)
      build_cluster_isabelle
    }

    def init(): Unit =
      build_cluster_isabelle.init(other_settings =
        build_cluster_isabelle.init_components() ::: build_cluster_isabelle.debug_settings())

    def benchmark(): Unit = {
      val script =
        Build_Benchmark.benchmark_command(host, ssh = ssh, isabelle_home = built_cluster_isabelle_home)
      build_cluster_isabelle.bash(script).check
    }

    def start(): Process_Result = {
      val build_cluster_ml_platform = build_cluster_isabelle.getenv("ML_PLATFORM")
      if (build_cluster_ml_platform != build_context.ml_platform) {
        error("Bad ML_PLATFORM: found " + build_cluster_ml_platform +
          ", but expected " + build_context.ml_platform)
      }
      val build_options =
        for { option <- options.iterator if option.for_build_sync } yield options.spec(option.name)
      val script =
        Build.build_worker_command(host,
          ssh = ssh,
          build_options = build_options.toList,
          build_id = build_context.build_uuid,
          isabelle_home = built_cluster_isabelle_home,
          afp_root = build_cluster_afp_root,
          dirs = Path.split(host.dirs).map(build_cluster_isabelle.expand_path),
          quiet = true)
      build_cluster_isabelle.bash(script).check
    }

    override def close(): Unit = ssh.close()
  }

  class Result(val host: Host, val process_result: Exn.Result[Process_Result]) {
    def rc: Int = Process_Result.RC(process_result)
    def ok: Boolean = rc == Process_Result.RC.ok
  }


  /* build clusters */

  object none extends Build_Cluster {
    override def toString: String = "Build_Cluster.none"
  }

  def make(build_context: Build.Context, progress: Progress = new Progress): Build_Cluster = {
    val remote_hosts = build_context.build_hosts.filter(_.is_remote)
    if (remote_hosts.isEmpty) none
    else new Remote_Build_Cluster(build_context, remote_hosts, progress = progress)
  }
}

// NB: extensible via Build.Engine.build_process() and Build_Process.init_cluster()
trait Build_Cluster extends AutoCloseable {
  def rc: Int = Process_Result.RC.ok
  def ok: Boolean = rc == Process_Result.RC.ok
  def return_code(rc: Int): Unit = ()
  def return_code(res: Process_Result): Unit = return_code(res.rc)
  def return_code(exn: Throwable): Unit = return_code(Process_Result.RC(exn))
  def open(): Build_Cluster = this
  def init(): Build_Cluster = this
  def benchmark(): Build_Cluster = this
  def start(): Build_Cluster = this
  def active(): Boolean = false
  def join: List[Build_Cluster.Result] = Nil
  def stop(): Unit = { join; close() }
  override def close(): Unit = ()
}

class Remote_Build_Cluster(
  val build_context: Build.Context,
  val remote_hosts: List[Build_Cluster.Host],
  val progress: Progress = new Progress
) extends Build_Cluster {
  require(remote_hosts.nonEmpty && remote_hosts.forall(_.is_remote), "remote hosts required")

  override def toString: String =
    remote_hosts.iterator.map(_.name).mkString("Build_Cluster(", ", ", ")")


  /* cumulative return code */

  private val _rc = Synchronized(Process_Result.RC.ok)

  override def rc: Int = _rc.value

  override def return_code(rc: Int): Unit =
    _rc.change(rc0 => Process_Result.RC.merge(rc0, rc))

  def capture[A](host: Build_Cluster.Host, op: String)(
    e: => A,
    msg: String = host.message(op + " ..."),
    err: Throwable => String = { exn =>
      return_code(exn)
      host.message("failed to " + op + "\n" + Exn.print(exn))
    }
  ): Exn.Result[A] = {
    progress.capture(e, msg = msg, err = err)
  }


  /* open remote sessions */

  private var _sessions = List.empty[Build_Cluster.Session]

  override def open(): Build_Cluster = synchronized {
    require(_sessions.isEmpty, "build cluster already open")

    val attempts =
      Par_List.map((host: Build_Cluster.Host) =>
        capture(host, "open") { host.open_session(build_context, progress = progress) },
        remote_hosts, thread = true)

    if (attempts.forall(Exn.is_res)) {
      _sessions = attempts.map(Exn.the_res)
      _sessions
    }
    else {
      for (case Exn.Res(session) <- attempts) session.close()
      error("Failed to connect build cluster")
    }

    this
  }


  /* init and benchmark remote Isabelle distributions */

  private var _init = List.empty[Future[Unit]]

  override def init(): Build_Cluster = synchronized {
    require(_sessions.nonEmpty, "build cluster not yet open")

    if (_init.isEmpty) {
      _init =
        for (session <- _sessions if !session.host.shared) yield {
          Future.thread(session.host.message("session")) {
            capture(session.host, "sync") { session.sync() }
            capture(session.host, "init") { session.init() }
          }
        }
    }

    this
  }

  override def benchmark(): Build_Cluster = synchronized {
    _init.foreach(_.join)
    _init =
      for (session <- _sessions if !session.host.shared) yield {
        Future.thread(session.host.message("session")) {
          capture(session.host, "benchmark") { session.benchmark() }
        }
      }
    _init.foreach(_.join)

    this
  }


  /* workers */

  private var _workers = List.empty[Future[Process_Result]]

  override def active(): Boolean = synchronized { _workers.nonEmpty }

  override def start(): Build_Cluster = synchronized {
    require(_sessions.nonEmpty, "build cluster not yet open")
    require(_workers.isEmpty, "build cluster already active")

    init()
    _init.foreach(_.join)

    _workers =
      for (session <- _sessions) yield {
        Future.thread(session.host.message("work")) {
          Exn.release(capture(session.host, "work") { session.start() })
        }
      }

    this
  }

  override def join: List[Build_Cluster.Result] = synchronized {
    _init.foreach(_.join)
    if (_workers.isEmpty) Nil
    else {
      assert(_sessions.length == _workers.length)
      for ((session, worker) <- _sessions zip _workers)
        yield Build_Cluster.Result(session.host, worker.join_result)
    }
  }


  /* close */

  override def close(): Unit = synchronized {
    _init.foreach(_.join)
    _workers.foreach(_.join_result)
    _sessions.foreach(_.close())

    _init = Nil
    _workers = Nil
    _sessions = Nil
  }
}