src/Pure/Admin/isabelle_cronjob.scala
author wenzelm
Sat Oct 15 21:02:39 2016 +0200 (2016-10-15)
changeset 64231 dbc8294c75d3
parent 64220 e7cbf81ec4b7
child 64232 367d83d6030e
permissions -rw-r--r--
added remote_build_history tasks: parallel on several remote hosts;
isabelle_identify: use self repos for robustness;
more logger context: options and ssh;
setup repository clones on demand;
clarified target repositories;
     1 /*  Title:      Pure/Admin/isabelle_cronjob.scala
     2     Author:     Makarius
     3 
     4 Main entry point for administrative cronjob at TUM.
     5 */
     6 
     7 package isabelle
     8 
     9 
    10 import scala.annotation.tailrec
    11 import scala.collection.mutable
    12 
    13 
    14 object Isabelle_Cronjob
    15 {
    16   /* file-system state: owned by main cronjob */
    17 
    18   val main_dir = Path.explode("~/cronjob")
    19   val main_state_file = main_dir + Path.explode("run/main.state")
    20   val current_log = main_dir + Path.explode("run/main.log")  // owned by log service
    21   val cumulative_log = main_dir + Path.explode("log/main.log")  // owned by log service
    22 
    23   val isabelle_repos = main_dir + Path.explode("isabelle")
    24   val isabelle_repos_test = main_dir + Path.explode("isabelle-test")
    25   val afp_repos = main_dir + Path.explode("AFP")
    26 
    27   val release_snapshot = Path.explode("~/html-data/release_snapshot")
    28 
    29 
    30 
    31   /** particular tasks **/
    32 
    33   /* identify Isabelle + AFP repository snapshots */
    34 
    35   private val isabelle_identify =
    36     Logger_Task("isabelle_identify", logger =>
    37       {
    38         val isabelle_id = Mercurial.repository(isabelle_repos).identify(options = "-i")
    39         val afp_id =
    40           Mercurial.setup_repository(
    41             logger.cronjob_options.string("afp_repos"), afp_repos).pull_id()
    42 
    43         File.write(logger.log_dir + Build_Log.log_filename("isabelle_identify", logger.start_date),
    44           terminate_lines(
    45             List("isabelle_identify: " + Build_Log.print_date(logger.start_date),
    46               "",
    47               "Isabelle version: " + isabelle_id,
    48               "AFP version: " + afp_id)))
    49       })
    50 
    51 
    52   /* integrity test of build_history vs. build_history_base */
    53 
    54   private val build_history_base =
    55     Logger_Task("build_history_base", logger =>
    56       {
    57         for {
    58           (result, log_path) <-
    59             Build_History.build_history(Mercurial.repository(isabelle_repos_test),
    60               rev = "build_history_base", fresh = true, build_args = List("HOL"))
    61         } {
    62           result.check
    63           File.copy(log_path, logger.log_dir + log_path.base)
    64         }
    65       })
    66 
    67 
    68   /* build release from repository snapshot */
    69 
    70   private val build_release =
    71     Logger_Task("build_release", logger =>
    72       Isabelle_System.with_tmp_dir("isadist")(base_dir =>
    73         {
    74           val new_snapshot = release_snapshot.ext("new")
    75           val old_snapshot = release_snapshot.ext("old")
    76 
    77           Isabelle_System.rm_tree(new_snapshot)
    78           Isabelle_System.rm_tree(old_snapshot)
    79 
    80           Build_Release.build_release(base_dir, parallel_jobs = 4,
    81             remote_mac = "macbroy30", website = Some(new_snapshot))
    82 
    83           if (release_snapshot.is_dir) File.mv(release_snapshot, old_snapshot)
    84           File.mv(new_snapshot, release_snapshot)
    85           Isabelle_System.rm_tree(old_snapshot)
    86         }))
    87 
    88 
    89   /* remote build_history */
    90 
    91   private sealed case class Remote_Build(
    92     host: String,
    93     user: String = "",
    94     port: Int = SSH.default_port,
    95     shared_home: Boolean = false,
    96     options: String = "",
    97     args: String = "-a")
    98 
    99   private val remote_builds =
   100     List(
   101       Remote_Build("lxbroy10", options = "-m32 -M4", shared_home = true),
   102       Remote_Build("macbroy2", options = "-m32 -M4"))
   103 
   104   private def remote_build_history(rev: String, r: Remote_Build): Logger_Task =
   105     Logger_Task("build_history-" + r.host, logger =>
   106       {
   107         using(logger.ssh_context.open_session(host = r.host, user = r.user, port = r.port))(
   108           session =>
   109             {
   110               val results =
   111                 Build_History.remote_build_history(session,
   112                   isabelle_repos,
   113                   isabelle_repos.ext(r.host),
   114                   isabelle_repos_source = logger.cronjob_options.string("isabelle_repos"),
   115                   self_update = !r.shared_home,
   116                   options = r.options + " -f -r " + File.bash_string(rev),
   117                   args = r.args)
   118               for ((log, bytes) <- results)
   119                 Bytes.write(logger.log_dir + Path.explode(log), bytes)
   120             })
   121       })
   122 
   123 
   124 
   125   /** task logging **/
   126 
   127   sealed case class Logger_Task(name: String = "", body: Logger => Unit)
   128 
   129   class Log_Service private[Isabelle_Cronjob](
   130     progress: Progress, val cronjob_options: Options, val ssh_context: SSH)
   131   {
   132     current_log.file.delete
   133 
   134     private val thread: Consumer_Thread[String] =
   135       Consumer_Thread.fork("cronjob: logger", daemon = true)(
   136         consume = (text: String) =>
   137           { // critical
   138             File.append(current_log, text + "\n")
   139             File.append(cumulative_log, text + "\n")
   140             progress.echo(text)
   141             true
   142           })
   143 
   144     def shutdown() { thread.shutdown() }
   145 
   146     val hostname = Isabelle_System.hostname()
   147 
   148     def log(date: Date, task_name: String, msg: String): Unit =
   149       if (task_name != "")
   150         thread.send(
   151           "[" + Build_Log.print_date(date) + ", " + hostname + ", " + task_name + "]: " + msg)
   152 
   153     def start_logger(start_date: Date, task_name: String): Logger =
   154       new Logger(this, start_date, task_name)
   155 
   156     def run_task(start_date: Date, task: Logger_Task)
   157     {
   158       val logger = start_logger(start_date, task.name)
   159       val res = Exn.capture { task.body(logger) }
   160       val end_date = Date.now()
   161       val err =
   162         res match {
   163           case Exn.Res(_) => None
   164           case Exn.Exn(exn) => Some(Exn.message(exn))
   165         }
   166       logger.log_end(end_date, err)
   167     }
   168 
   169     def fork_task(start_date: Date, task: Logger_Task): Task =
   170       new Task(task.name, run_task(start_date, task))
   171   }
   172 
   173   class Logger private[Isabelle_Cronjob](
   174     val log_service: Log_Service, val start_date: Date, val task_name: String)
   175   {
   176     def cronjob_options: Options = log_service.cronjob_options
   177     def ssh_context: SSH = log_service.ssh_context
   178 
   179     def log(date: Date, msg: String): Unit = log_service.log(date, task_name, msg)
   180 
   181     def log_end(end_date: Date, err: Option[String])
   182     {
   183       val elapsed_time = end_date.time - start_date.time
   184       val msg =
   185         (if (err.isEmpty) "finished" else "ERROR " + err.get) +
   186         (if (elapsed_time.seconds < 3.0) "" else " (" + elapsed_time.message_hms + " elapsed time)")
   187       log(end_date, msg)
   188     }
   189 
   190     val log_dir: Path = main_dir + Build_Log.log_subdir(start_date)
   191 
   192     Isabelle_System.mkdirs(log_dir)
   193     log(start_date, "started")
   194   }
   195 
   196   class Task private[Isabelle_Cronjob](name: String, body: => Unit)
   197   {
   198     private val future: Future[Unit] = Future.thread("cronjob: " + name) { body }
   199     def is_finished: Boolean = future.is_finished
   200   }
   201 
   202 
   203 
   204   /** cronjob **/
   205 
   206   def cronjob(progress: Progress, exclude_task: Set[String])
   207   {
   208     /* soft lock */
   209 
   210     val still_running =
   211       try { Some(File.read(main_state_file)) }
   212       catch { case ERROR(_) => None }
   213 
   214     still_running match {
   215       case None | Some("") =>
   216       case Some(running) =>
   217         error("Isabelle cronjob appears to be still running: " + running)
   218     }
   219 
   220 
   221     /* log service */
   222 
   223     val cronjob_options = Options.load(Path.explode("~~/Admin/cronjob/cronjob.options"))
   224     val ssh_context = SSH.init(Options.init())
   225     val log_service = new Log_Service(progress, cronjob_options, ssh_context)
   226 
   227     def run(start_date: Date, task: Logger_Task) { log_service.run_task(start_date, task) }
   228 
   229     def run_now(task: Logger_Task) { run(Date.now(), task) }
   230 
   231 
   232     /* structured tasks */
   233 
   234     def SEQ(tasks: Logger_Task*): Logger_Task = Logger_Task(body = _ =>
   235       for (task <- tasks.iterator if !exclude_task(task.name) || task.name == "")
   236         run_now(task))
   237 
   238     def PAR(tasks: Logger_Task*): Logger_Task = Logger_Task(body = _ =>
   239       {
   240         @tailrec def join(running: List[Task])
   241         {
   242           running.partition(_.is_finished) match {
   243             case (Nil, Nil) =>
   244             case (Nil, _ :: _) => Thread.sleep(500); join(running)
   245             case (_ :: _, remaining) => join(remaining)
   246           }
   247         }
   248         val start_date = Date.now()
   249         val running =
   250           for (task <- tasks.toList if !exclude_task(task.name))
   251             yield log_service.fork_task(start_date, task)
   252         join(running)
   253       })
   254 
   255 
   256     /* main */
   257 
   258     val main_start_date = Date.now()
   259     File.write(main_state_file, main_start_date + " " + log_service.hostname)
   260 
   261     val rev = Mercurial.repository(isabelle_repos).identify(options = "-i")
   262 
   263     run(main_start_date,
   264       Logger_Task("isabelle_cronjob", _ =>
   265         run_now(
   266           SEQ(isabelle_identify, build_history_base, build_release,
   267             PAR(remote_builds.map(remote_build_history(rev, _)):_*)))))
   268 
   269     log_service.shutdown()
   270 
   271     main_state_file.file.delete
   272   }
   273 
   274 
   275 
   276   /** command line entry point **/
   277 
   278   def main(args: Array[String])
   279   {
   280     Command_Line.tool0 {
   281       var force = false
   282       var verbose = false
   283       var exclude_task = Set.empty[String]
   284 
   285       val getopts = Getopts("""
   286 Usage: Admin/cronjob/main [OPTIONS]
   287 
   288   Options are:
   289     -f           apply force to do anything
   290     -v           verbose
   291     -x NAME      exclude tasks with this name
   292 """,
   293         "f" -> (_ => force = true),
   294         "v" -> (_ => verbose = true),
   295         "x:" -> (arg => exclude_task += arg))
   296 
   297       val more_args = getopts(args)
   298       if (more_args.nonEmpty) getopts.usage()
   299 
   300       val progress = if (verbose) new Console_Progress() else Ignore_Progress
   301 
   302       if (force) cronjob(progress, exclude_task)
   303       else error("Need to apply force to do anything")
   304     }
   305   }
   306 }