src/Pure/Admin/isabelle_cronjob.scala
changeset 64171 568cd5123952
parent 64170 a0c2cbe2fc8e
child 64172 e7863057df41
equal deleted inserted replaced
64170:a0c2cbe2fc8e 64171:568cd5123952
    18   val main_dir = Path.explode("~/cronjob")
    18   val main_dir = Path.explode("~/cronjob")
    19   val run_dir = main_dir + Path.explode("run")
    19   val run_dir = main_dir + Path.explode("run")
    20   val log_dir = main_dir + Path.explode("log")
    20   val log_dir = main_dir + Path.explode("log")
    21 
    21 
    22   val main_state_file = run_dir + Path.explode("main.state")
    22   val main_state_file = run_dir + Path.explode("main.state")
    23   val main_log = log_dir + Path.explode("main.log")  // owned by logger
    23   val main_log = log_dir + Path.explode("main.log")  // owned by log service
    24 
    24 
    25 
    25 
    26   /* managed repository clones */
    26   /* task logging */
       
    27 
       
    28   sealed case class Logger_Task(name: String, body: Logger => Unit)
       
    29 
       
    30   class Log_Service private[Isabelle_Cronjob](progress: Progress)
       
    31   {
       
    32     private val thread: Consumer_Thread[String] =
       
    33       Consumer_Thread.fork("cronjob: logger", daemon = true)(
       
    34         consume = (text: String) =>
       
    35           {
       
    36             File.append(main_log, text + "\n")   // critical
       
    37             progress.echo(text)
       
    38             true
       
    39           })
       
    40 
       
    41     def shutdown() { thread.shutdown() }
       
    42 
       
    43     val hostname = Isabelle_System.hostname()
       
    44 
       
    45     def log(date: Date, task_name: String, msg: String): Unit =
       
    46       thread.send(
       
    47         "[" + Build_Log.print_date(date) + ", " + hostname + ", " + task_name + "]: " + msg)
       
    48 
       
    49     def start_logger(start_date: Date, task_name: String): Logger =
       
    50       new Logger(this, start_date, task_name)
       
    51 
       
    52     def run_task(start_date: Date, task: Logger_Task)
       
    53     {
       
    54       val logger = start_logger(start_date, task.name)
       
    55       val res = Exn.capture { task.body(logger) }
       
    56       val end_date = Date.now()
       
    57       val err =
       
    58         res match {
       
    59           case Exn.Res(_) => None
       
    60           case Exn.Exn(exn) => Some(Exn.message(exn))
       
    61         }
       
    62       logger.log_end(end_date, err)
       
    63     }
       
    64 
       
    65     def fork_task(start_date: Date, task: Logger_Task): Task =
       
    66       new Task(task.name, run_task(start_date, task))
       
    67   }
       
    68 
       
    69   class Logger private[Isabelle_Cronjob](
       
    70     val log_service: Log_Service, val start_date: Date, val task_name: String)
       
    71   {
       
    72     def log(date: Date, msg: String): Unit = log_service.log(date, task_name, msg)
       
    73 
       
    74     def log_end(end_date: Date, err: Option[String])
       
    75     {
       
    76       val elapsed_time = end_date.time - start_date.time
       
    77       val msg =
       
    78         (if (err.isEmpty) "finished" else "ERROR " + err.get) +
       
    79         (if (elapsed_time.seconds < 3.0) "" else ", elapsed time " + elapsed_time.message_hms)
       
    80       log(end_date, msg)
       
    81     }
       
    82 
       
    83     log(start_date, "started")
       
    84   }
       
    85 
       
    86   class Task private[Isabelle_Cronjob](name: String, body: => Unit)
       
    87   {
       
    88     private val future: Future[Unit] = Future.thread("cronjob: " + name) { body }
       
    89     def is_finished: Boolean = future.is_finished
       
    90   }
       
    91 
       
    92 
       
    93 
       
    94   /** particular tasks **/
       
    95 
       
    96   /* identify repository snapshots */
    27 
    97 
    28   val isabelle_repos = main_dir + Path.explode("isabelle-build_history")
    98   val isabelle_repos = main_dir + Path.explode("isabelle-build_history")
    29   val afp_repos = main_dir + Path.explode("AFP-build_history")
    99   val afp_repos = main_dir + Path.explode("AFP-build_history")
    30 
   100 
    31   def pull_repos(root: Path): String =
   101   val isabelle_identify =
    32   {
   102     Logger_Task("isabelle_identify", logger =>
    33     val hg = Mercurial.repository(root)
   103       {
    34     hg.pull(options = "-q")
   104         def pull_repos(root: Path): String =
    35     hg.identify("tip", options = "-i")
   105         {
    36   }
   106           val hg = Mercurial.repository(root)
    37 
   107           hg.pull(options = "-q")
    38 
   108           hg.identify("tip", options = "-i")
    39 
   109         }
    40   /** particular tasks **/
   110 
    41 
   111         val isabelle_id = pull_repos(isabelle_repos)
    42   /* identify repository snapshots */
   112         val afp_id = pull_repos(afp_repos)
    43 
   113 
    44   def isabelle_identify(start_date: Date)
   114         val log_path = log_dir + Build_Log.log_path("isabelle_identify", logger.start_date)
    45   {
   115         Isabelle_System.mkdirs(log_path.dir)
    46     val isabelle_id = pull_repos(isabelle_repos)
   116         File.write(log_path,
    47     val afp_id = pull_repos(afp_repos)
   117           Library.terminate_lines(
    48 
   118             List("isabelle_identify: " + Build_Log.print_date(logger.start_date),
    49     val log_path = log_dir + Build_Log.log_path("isabelle_identify", start_date)
   119               "",
    50     Isabelle_System.mkdirs(log_path.dir)
   120               "Isabelle version: " + isabelle_id,
    51     File.write(log_path,
   121               "AFP version: " + afp_id)))
    52       Library.terminate_lines(
   122       })
    53         List("isabelle_identify: " + Build_Log.print_date(start_date),
       
    54           "",
       
    55           "Isabelle version: " + isabelle_id,
       
    56           "AFP version: " + afp_id)))
       
    57   }
       
    58 
   123 
    59 
   124 
    60 
   125 
    61   /** cronjob **/
   126   /** cronjob **/
    62 
   127 
    63   private class Task(val name: String, body: Date => Unit)
   128   private val all_tasks = List(isabelle_identify)
    64   {
       
    65     override def toString: String = "cronjob: " + name
       
    66 
       
    67     val start_date: Date = Date.now()
       
    68 
       
    69     private val future: Future[Unit] = Future.thread(toString) { body(start_date) }
       
    70     def is_finished: Boolean = future.is_finished
       
    71 
       
    72     def success: Boolean =
       
    73       future.join_result match {
       
    74         case Exn.Res(_) => true
       
    75         case Exn.Exn(_) => false
       
    76       }
       
    77   }
       
    78 
   129 
    79   def cronjob(progress: Progress)
   130   def cronjob(progress: Progress)
    80   {
   131   {
    81     /* check */
   132     /* soft lock */
    82 
   133 
    83     val still_running =
   134     val still_running =
    84       try { Some(File.read(main_state_file)) }
   135       try { Some(File.read(main_state_file)) }
    85       catch { case ERROR(_) => None }
   136       catch { case ERROR(_) => None }
    86 
   137 
    88       case None | Some("") =>
   139       case None | Some("") =>
    89       case Some(running) =>
   140       case Some(running) =>
    90         error("Isabelle cronjob appears to be still running: " + running)
   141         error("Isabelle cronjob appears to be still running: " + running)
    91     }
   142     }
    92 
   143 
    93 
   144     val main_start_date = Date.now()
    94     /* logger */
   145     val log_service = new Log_Service(progress)
    95 
   146 
    96     val hostname = Isabelle_System.hostname()
   147     File.write(main_state_file, main_start_date + " " + log_service.hostname)
    97 
   148 
    98     val logger: Consumer_Thread[String] =
   149 
    99       Consumer_Thread.fork("cronjob: logger", daemon = true)(
   150     /* parallel tasks */
   100         consume = (text: String) =>
   151 
   101           {
   152     def parallel_tasks(tasks: List[Logger_Task])
   102             File.append(main_log, text + "\n")
       
   103             progress.echo(text)
       
   104             true
       
   105           })
       
   106 
       
   107     def log(date: Date, task_name: String, msg: String)
       
   108     {
       
   109       logger.send(
       
   110         "[" + Build_Log.print_date(date) + ", " + hostname + ", " + task_name + "]: " + msg)
       
   111     }
       
   112 
       
   113     def log_start(task: Task) { log(task.start_date, task.name, "started") }
       
   114 
       
   115     def log_end(end_date: Date, task: Task)
       
   116     {
       
   117       val elapsed_time = end_date.time - task.start_date.time
       
   118       val msg =
       
   119         (if (task.success) "finished" else "FAILED") +
       
   120         (if (elapsed_time.seconds < 3.0) "" else ", elapsed time " + elapsed_time.message_hms)
       
   121       log(end_date, task.name, msg)
       
   122     }
       
   123 
       
   124 
       
   125     /* manage tasks */
       
   126 
       
   127     def manage_tasks(task_specs: List[(String, Date => Unit)])
       
   128     {
   153     {
   129       @tailrec def await(running: List[Task])
   154       @tailrec def await(running: List[Task])
   130       {
   155       {
   131         running.partition(_.is_finished) match {
   156         running.partition(_.is_finished) match {
   132           case (Nil, Nil) =>
   157           case (Nil, Nil) =>
   133           case (Nil, _ :: _) => Thread.sleep(500); await(running)
   158           case (Nil, _ :: _) => Thread.sleep(500); await(running)
   134           case (finished, remaining) =>
   159           case (_ :: _, remaining) => await(remaining)
   135             val end_date = Date.now()
       
   136             finished.foreach(log_end(end_date, _))
       
   137             await(remaining)
       
   138         }
   160         }
   139       }
   161       }
   140       await(task_specs.map({ case (name, body) => new Task(name, body) }))
   162       val start_date = Date.now()
       
   163       await(tasks.map(task => log_service.fork_task(start_date, task)))
   141     }
   164     }
   142 
   165 
   143 
   166 
   144     /* main */
   167     /* main */
   145 
   168 
   146     val main_task = new Task("main", _ => ())
   169     log_service.run_task(main_start_date,
   147     File.write(main_state_file, main_task.start_date + " " + hostname)
   170       Logger_Task("isabelle_cronjob", _ => parallel_tasks(all_tasks)))
   148     log_start(main_task)
   171 
   149 
   172     log_service.shutdown()
   150     manage_tasks(List("isabelle_identify" -> isabelle_identify _))
   173 
   151 
       
   152     log_end(Date.now(), main_task)
       
   153     main_state_file.file.delete
   174     main_state_file.file.delete
   154 
       
   155     logger.shutdown()
       
   156   }
   175   }
   157 
   176 
   158 
   177 
   159 
   178 
   160   /** command line entry point **/
   179   /** command line entry point **/