src/Pure/Admin/isabelle_cronjob.scala
author wenzelm
Sat Oct 15 21:02:39 2016 +0200 (2016-10-15)
changeset 64231 dbc8294c75d3
parent 64220 e7cbf81ec4b7
child 64232 367d83d6030e
permissions -rw-r--r--
added remote_build_history tasks: parallel on several remote hosts;
isabelle_identify: use self repos for robustness;
more logger context: options and ssh;
setup repository clones on demand;
clarified target repositories;
wenzelm@64148
     1
/*  Title:      Pure/Admin/isabelle_cronjob.scala
wenzelm@64148
     2
    Author:     Makarius
wenzelm@64148
     3
wenzelm@64148
     4
Main entry point for administrative cronjob at TUM.
wenzelm@64148
     5
*/
wenzelm@64148
     6
wenzelm@64148
     7
package isabelle
wenzelm@64148
     8
wenzelm@64148
     9
wenzelm@64170
    10
import scala.annotation.tailrec
wenzelm@64170
    11
import scala.collection.mutable
wenzelm@64170
    12
wenzelm@64170
    13
wenzelm@64148
    14
object Isabelle_Cronjob
wenzelm@64148
    15
{
wenzelm@64194
    16
  /* file-system state: owned by main cronjob */
wenzelm@64153
    17
wenzelm@64153
    18
  val main_dir = Path.explode("~/cronjob")
wenzelm@64194
    19
  val main_state_file = main_dir + Path.explode("run/main.state")
wenzelm@64219
    20
  val current_log = main_dir + Path.explode("run/main.log")  // owned by log service
wenzelm@64219
    21
  val cumulative_log = main_dir + Path.explode("log/main.log")  // owned by log service
wenzelm@64153
    22
wenzelm@64231
    23
  val isabelle_repos = main_dir + Path.explode("isabelle")
wenzelm@64231
    24
  val isabelle_repos_test = main_dir + Path.explode("isabelle-test")
wenzelm@64231
    25
  val afp_repos = main_dir + Path.explode("AFP")
wenzelm@64153
    26
wenzelm@64218
    27
  val release_snapshot = Path.explode("~/html-data/release_snapshot")
wenzelm@64215
    28
wenzelm@64153
    29
wenzelm@64192
    30
wenzelm@64192
    31
  /** particular tasks **/
wenzelm@64192
    32
wenzelm@64194
    33
  /* identify Isabelle + AFP repository snapshots */
wenzelm@64192
    34
wenzelm@64194
    35
  private val isabelle_identify =
wenzelm@64192
    36
    Logger_Task("isabelle_identify", logger =>
wenzelm@64192
    37
      {
wenzelm@64231
    38
        val isabelle_id = Mercurial.repository(isabelle_repos).identify(options = "-i")
wenzelm@64231
    39
        val afp_id =
wenzelm@64231
    40
          Mercurial.setup_repository(
wenzelm@64231
    41
            logger.cronjob_options.string("afp_repos"), afp_repos).pull_id()
wenzelm@64192
    42
wenzelm@64195
    43
        File.write(logger.log_dir + Build_Log.log_filename("isabelle_identify", logger.start_date),
wenzelm@64192
    44
          terminate_lines(
wenzelm@64192
    45
            List("isabelle_identify: " + Build_Log.print_date(logger.start_date),
wenzelm@64192
    46
              "",
wenzelm@64192
    47
              "Isabelle version: " + isabelle_id,
wenzelm@64192
    48
              "AFP version: " + afp_id)))
wenzelm@64192
    49
      })
wenzelm@64192
    50
wenzelm@64192
    51
wenzelm@64194
    52
  /* integrity test of build_history vs. build_history_base */
wenzelm@64193
    53
wenzelm@64194
    54
  private val build_history_base =
wenzelm@64193
    55
    Logger_Task("build_history_base", logger =>
wenzelm@64193
    56
      {
wenzelm@64194
    57
        for {
wenzelm@64194
    58
          (result, log_path) <-
wenzelm@64231
    59
            Build_History.build_history(Mercurial.repository(isabelle_repos_test),
wenzelm@64231
    60
              rev = "build_history_base", fresh = true, build_args = List("HOL"))
wenzelm@64194
    61
        } {
wenzelm@64194
    62
          result.check
wenzelm@64195
    63
          File.copy(log_path, logger.log_dir + log_path.base)
wenzelm@64194
    64
        }
wenzelm@64193
    65
      })
wenzelm@64193
    66
wenzelm@64193
    67
wenzelm@64215
    68
  /* build release from repository snapshot */
wenzelm@64215
    69
wenzelm@64215
    70
  private val build_release =
wenzelm@64215
    71
    Logger_Task("build_release", logger =>
wenzelm@64220
    72
      Isabelle_System.with_tmp_dir("isadist")(base_dir =>
wenzelm@64215
    73
        {
wenzelm@64215
    74
          val new_snapshot = release_snapshot.ext("new")
wenzelm@64215
    75
          val old_snapshot = release_snapshot.ext("old")
wenzelm@64215
    76
wenzelm@64215
    77
          Isabelle_System.rm_tree(new_snapshot)
wenzelm@64215
    78
          Isabelle_System.rm_tree(old_snapshot)
wenzelm@64215
    79
wenzelm@64215
    80
          Build_Release.build_release(base_dir, parallel_jobs = 4,
wenzelm@64215
    81
            remote_mac = "macbroy30", website = Some(new_snapshot))
wenzelm@64215
    82
wenzelm@64215
    83
          if (release_snapshot.is_dir) File.mv(release_snapshot, old_snapshot)
wenzelm@64215
    84
          File.mv(new_snapshot, release_snapshot)
wenzelm@64215
    85
          Isabelle_System.rm_tree(old_snapshot)
wenzelm@64215
    86
        }))
wenzelm@64215
    87
wenzelm@64215
    88
wenzelm@64231
    89
  /* remote build_history */
wenzelm@64231
    90
wenzelm@64231
    91
  private sealed case class Remote_Build(
wenzelm@64231
    92
    host: String,
wenzelm@64231
    93
    user: String = "",
wenzelm@64231
    94
    port: Int = SSH.default_port,
wenzelm@64231
    95
    shared_home: Boolean = false,
wenzelm@64231
    96
    options: String = "",
wenzelm@64231
    97
    args: String = "-a")
wenzelm@64231
    98
wenzelm@64231
    99
  private val remote_builds =
wenzelm@64231
   100
    List(
wenzelm@64231
   101
      Remote_Build("lxbroy10", options = "-m32 -M4", shared_home = true),
wenzelm@64231
   102
      Remote_Build("macbroy2", options = "-m32 -M4"))
wenzelm@64231
   103
wenzelm@64231
   104
  private def remote_build_history(rev: String, r: Remote_Build): Logger_Task =
wenzelm@64231
   105
    Logger_Task("build_history-" + r.host, logger =>
wenzelm@64231
   106
      {
wenzelm@64231
   107
        using(logger.ssh_context.open_session(host = r.host, user = r.user, port = r.port))(
wenzelm@64231
   108
          session =>
wenzelm@64231
   109
            {
wenzelm@64231
   110
              val results =
wenzelm@64231
   111
                Build_History.remote_build_history(session,
wenzelm@64231
   112
                  isabelle_repos,
wenzelm@64231
   113
                  isabelle_repos.ext(r.host),
wenzelm@64231
   114
                  isabelle_repos_source = logger.cronjob_options.string("isabelle_repos"),
wenzelm@64231
   115
                  self_update = !r.shared_home,
wenzelm@64231
   116
                  options = r.options + " -f -r " + File.bash_string(rev),
wenzelm@64231
   117
                  args = r.args)
wenzelm@64231
   118
              for ((log, bytes) <- results)
wenzelm@64231
   119
                Bytes.write(logger.log_dir + Path.explode(log), bytes)
wenzelm@64231
   120
            })
wenzelm@64231
   121
      })
wenzelm@64231
   122
wenzelm@64231
   123
wenzelm@64192
   124
wenzelm@64192
   125
  /** task logging **/
wenzelm@64171
   126
wenzelm@64193
   127
  sealed case class Logger_Task(name: String = "", body: Logger => Unit)
wenzelm@64154
   128
wenzelm@64231
   129
  class Log_Service private[Isabelle_Cronjob](
wenzelm@64231
   130
    progress: Progress, val cronjob_options: Options, val ssh_context: SSH)
wenzelm@64171
   131
  {
wenzelm@64219
   132
    current_log.file.delete
wenzelm@64219
   133
wenzelm@64171
   134
    private val thread: Consumer_Thread[String] =
wenzelm@64171
   135
      Consumer_Thread.fork("cronjob: logger", daemon = true)(
wenzelm@64171
   136
        consume = (text: String) =>
wenzelm@64219
   137
          { // critical
wenzelm@64219
   138
            File.append(current_log, text + "\n")
wenzelm@64219
   139
            File.append(cumulative_log, text + "\n")
wenzelm@64171
   140
            progress.echo(text)
wenzelm@64171
   141
            true
wenzelm@64171
   142
          })
wenzelm@64171
   143
wenzelm@64171
   144
    def shutdown() { thread.shutdown() }
wenzelm@64171
   145
wenzelm@64171
   146
    val hostname = Isabelle_System.hostname()
wenzelm@64171
   147
wenzelm@64171
   148
    def log(date: Date, task_name: String, msg: String): Unit =
wenzelm@64193
   149
      if (task_name != "")
wenzelm@64193
   150
        thread.send(
wenzelm@64193
   151
          "[" + Build_Log.print_date(date) + ", " + hostname + ", " + task_name + "]: " + msg)
wenzelm@64171
   152
wenzelm@64171
   153
    def start_logger(start_date: Date, task_name: String): Logger =
wenzelm@64171
   154
      new Logger(this, start_date, task_name)
wenzelm@64154
   155
wenzelm@64171
   156
    def run_task(start_date: Date, task: Logger_Task)
wenzelm@64171
   157
    {
wenzelm@64171
   158
      val logger = start_logger(start_date, task.name)
wenzelm@64171
   159
      val res = Exn.capture { task.body(logger) }
wenzelm@64171
   160
      val end_date = Date.now()
wenzelm@64171
   161
      val err =
wenzelm@64171
   162
        res match {
wenzelm@64171
   163
          case Exn.Res(_) => None
wenzelm@64171
   164
          case Exn.Exn(exn) => Some(Exn.message(exn))
wenzelm@64171
   165
        }
wenzelm@64171
   166
      logger.log_end(end_date, err)
wenzelm@64171
   167
    }
wenzelm@64171
   168
wenzelm@64171
   169
    def fork_task(start_date: Date, task: Logger_Task): Task =
wenzelm@64171
   170
      new Task(task.name, run_task(start_date, task))
wenzelm@64171
   171
  }
wenzelm@64171
   172
wenzelm@64171
   173
  class Logger private[Isabelle_Cronjob](
wenzelm@64171
   174
    val log_service: Log_Service, val start_date: Date, val task_name: String)
wenzelm@64162
   175
  {
wenzelm@64231
   176
    def cronjob_options: Options = log_service.cronjob_options
wenzelm@64231
   177
    def ssh_context: SSH = log_service.ssh_context
wenzelm@64231
   178
wenzelm@64171
   179
    def log(date: Date, msg: String): Unit = log_service.log(date, task_name, msg)
wenzelm@64171
   180
wenzelm@64171
   181
    def log_end(end_date: Date, err: Option[String])
wenzelm@64171
   182
    {
wenzelm@64171
   183
      val elapsed_time = end_date.time - start_date.time
wenzelm@64171
   184
      val msg =
wenzelm@64171
   185
        (if (err.isEmpty) "finished" else "ERROR " + err.get) +
wenzelm@64197
   186
        (if (elapsed_time.seconds < 3.0) "" else " (" + elapsed_time.message_hms + " elapsed time)")
wenzelm@64171
   187
      log(end_date, msg)
wenzelm@64171
   188
    }
wenzelm@64171
   189
wenzelm@64195
   190
    val log_dir: Path = main_dir + Build_Log.log_subdir(start_date)
wenzelm@64195
   191
wenzelm@64195
   192
    Isabelle_System.mkdirs(log_dir)
wenzelm@64171
   193
    log(start_date, "started")
wenzelm@64171
   194
  }
wenzelm@64171
   195
wenzelm@64171
   196
  class Task private[Isabelle_Cronjob](name: String, body: => Unit)
wenzelm@64171
   197
  {
wenzelm@64171
   198
    private val future: Future[Unit] = Future.thread("cronjob: " + name) { body }
wenzelm@64171
   199
    def is_finished: Boolean = future.is_finished
wenzelm@64162
   200
  }
wenzelm@64153
   201
wenzelm@64170
   202
wenzelm@64170
   203
wenzelm@64153
   204
  /** cronjob **/
wenzelm@64153
   205
wenzelm@64187
   206
  def cronjob(progress: Progress, exclude_task: Set[String])
wenzelm@64153
   207
  {
wenzelm@64171
   208
    /* soft lock */
wenzelm@64153
   209
wenzelm@64153
   210
    val still_running =
wenzelm@64153
   211
      try { Some(File.read(main_state_file)) }
wenzelm@64153
   212
      catch { case ERROR(_) => None }
wenzelm@64153
   213
wenzelm@64153
   214
    still_running match {
wenzelm@64170
   215
      case None | Some("") =>
wenzelm@64153
   216
      case Some(running) =>
wenzelm@64153
   217
        error("Isabelle cronjob appears to be still running: " + running)
wenzelm@64153
   218
    }
wenzelm@64153
   219
wenzelm@64199
   220
wenzelm@64199
   221
    /* log service */
wenzelm@64199
   222
wenzelm@64231
   223
    val cronjob_options = Options.load(Path.explode("~~/Admin/cronjob/cronjob.options"))
wenzelm@64231
   224
    val ssh_context = SSH.init(Options.init())
wenzelm@64231
   225
    val log_service = new Log_Service(progress, cronjob_options, ssh_context)
wenzelm@64154
   226
wenzelm@64199
   227
    def run(start_date: Date, task: Logger_Task) { log_service.run_task(start_date, task) }
wenzelm@64199
   228
wenzelm@64199
   229
    def run_now(task: Logger_Task) { run(Date.now(), task) }
wenzelm@64154
   230
wenzelm@64154
   231
wenzelm@64199
   232
    /* structured tasks */
wenzelm@64184
   233
wenzelm@64199
   234
    def SEQ(tasks: Logger_Task*): Logger_Task = Logger_Task(body = _ =>
wenzelm@64193
   235
      for (task <- tasks.iterator if !exclude_task(task.name) || task.name == "")
wenzelm@64199
   236
        run_now(task))
wenzelm@64153
   237
wenzelm@64199
   238
    def PAR(tasks: Logger_Task*): Logger_Task = Logger_Task(body = _ =>
wenzelm@64170
   239
      {
wenzelm@64199
   240
        @tailrec def join(running: List[Task])
wenzelm@64199
   241
        {
wenzelm@64199
   242
          running.partition(_.is_finished) match {
wenzelm@64199
   243
            case (Nil, Nil) =>
wenzelm@64199
   244
            case (Nil, _ :: _) => Thread.sleep(500); join(running)
wenzelm@64199
   245
            case (_ :: _, remaining) => join(remaining)
wenzelm@64199
   246
          }
wenzelm@64170
   247
        }
wenzelm@64199
   248
        val start_date = Date.now()
wenzelm@64199
   249
        val running =
wenzelm@64199
   250
          for (task <- tasks.toList if !exclude_task(task.name))
wenzelm@64199
   251
            yield log_service.fork_task(start_date, task)
wenzelm@64199
   252
        join(running)
wenzelm@64199
   253
      })
wenzelm@64193
   254
wenzelm@64170
   255
wenzelm@64170
   256
    /* main */
wenzelm@64170
   257
wenzelm@64199
   258
    val main_start_date = Date.now()
wenzelm@64199
   259
    File.write(main_state_file, main_start_date + " " + log_service.hostname)
wenzelm@64199
   260
wenzelm@64231
   261
    val rev = Mercurial.repository(isabelle_repos).identify(options = "-i")
wenzelm@64231
   262
wenzelm@64193
   263
    run(main_start_date,
wenzelm@64193
   264
      Logger_Task("isabelle_cronjob", _ =>
wenzelm@64231
   265
        run_now(
wenzelm@64231
   266
          SEQ(isabelle_identify, build_history_base, build_release,
wenzelm@64231
   267
            PAR(remote_builds.map(remote_build_history(rev, _)):_*)))))
wenzelm@64153
   268
wenzelm@64171
   269
    log_service.shutdown()
wenzelm@64170
   270
wenzelm@64153
   271
    main_state_file.file.delete
wenzelm@64153
   272
  }
wenzelm@64153
   273
wenzelm@64153
   274
wenzelm@64153
   275
wenzelm@64153
   276
  /** command line entry point **/
wenzelm@64153
   277
wenzelm@64148
   278
  def main(args: Array[String])
wenzelm@64148
   279
  {
wenzelm@64148
   280
    Command_Line.tool0 {
wenzelm@64148
   281
      var force = false
wenzelm@64148
   282
      var verbose = false
wenzelm@64187
   283
      var exclude_task = Set.empty[String]
wenzelm@64148
   284
wenzelm@64148
   285
      val getopts = Getopts("""
wenzelm@64148
   286
Usage: Admin/cronjob/main [OPTIONS]
wenzelm@64148
   287
wenzelm@64148
   288
  Options are:
wenzelm@64148
   289
    -f           apply force to do anything
wenzelm@64148
   290
    -v           verbose
wenzelm@64187
   291
    -x NAME      exclude tasks with this name
wenzelm@64148
   292
""",
wenzelm@64148
   293
        "f" -> (_ => force = true),
wenzelm@64187
   294
        "v" -> (_ => verbose = true),
wenzelm@64187
   295
        "x:" -> (arg => exclude_task += arg))
wenzelm@64148
   296
wenzelm@64148
   297
      val more_args = getopts(args)
wenzelm@64148
   298
      if (more_args.nonEmpty) getopts.usage()
wenzelm@64148
   299
wenzelm@64153
   300
      val progress = if (verbose) new Console_Progress() else Ignore_Progress
wenzelm@64148
   301
wenzelm@64187
   302
      if (force) cronjob(progress, exclude_task)
wenzelm@64153
   303
      else error("Need to apply force to do anything")
wenzelm@64148
   304
    }
wenzelm@64148
   305
  }
wenzelm@64148
   306
}