more explicit management of tasks;
authorwenzelm
Wed, 12 Oct 2016 19:03:35 +0200
changeset 64170 a0c2cbe2fc8e
parent 64169 3b618d52119e
child 64171 568cd5123952
more explicit management of tasks; separate logger thread with exclusive access to main.log;
src/Pure/Admin/isabelle_cronjob.scala
--- a/src/Pure/Admin/isabelle_cronjob.scala	Wed Oct 12 15:51:20 2016 +0200
+++ b/src/Pure/Admin/isabelle_cronjob.scala	Wed Oct 12 19:03:35 2016 +0200
@@ -7,6 +7,10 @@
 package isabelle
 
 
+import scala.annotation.tailrec
+import scala.collection.mutable
+
+
 object Isabelle_Cronjob
 {
   /** file-system state: owned by main cronjob **/
@@ -16,7 +20,7 @@
   val log_dir = main_dir + Path.explode("log")
 
   val main_state_file = run_dir + Path.explode("main.state")
-  val main_log = log_dir + Path.explode("main.log")
+  val main_log = log_dir + Path.explode("main.log")  // owned by logger
 
 
   /* managed repository clones */
@@ -31,66 +35,124 @@
     hg.identify("tip", options = "-i")
   }
 
+
+
+  /** particular tasks **/
+
+  /* identify repository snapshots */
+
+  def isabelle_identify(start_date: Date)
+  {
+    val isabelle_id = pull_repos(isabelle_repos)
+    val afp_id = pull_repos(afp_repos)
+
+    val log_path = log_dir + Build_Log.log_path("isabelle_identify", start_date)
+    Isabelle_System.mkdirs(log_path.dir)
+    File.write(log_path,
+      Library.terminate_lines(
+        List("isabelle_identify: " + Build_Log.print_date(start_date),
+          "",
+          "Isabelle version: " + isabelle_id,
+          "AFP version: " + afp_id)))
+  }
+
+
+
   /** cronjob **/
 
+  private class Task(val name: String, body: Date => Unit)
+  {
+    override def toString: String = "cronjob: " + name
+
+    val start_date: Date = Date.now()
+
+    private val future: Future[Unit] = Future.thread(toString) { body(start_date) }
+    def is_finished: Boolean = future.is_finished
+
+    def success: Boolean =
+      future.join_result match {
+        case Exn.Res(_) => true
+        case Exn.Exn(_) => false
+      }
+  }
+
   def cronjob(progress: Progress)
   {
-    /* log */
-
-    val hostname = Isabelle_System.hostname()
-
-    def log(date: Date, msg: String)
-    {
-      val text = "[" + Build_Log.print_date(date) + ", " + hostname + "]: " + msg
-      File.append(main_log, text + "\n")
-      progress.echo(text)
-    }
-
-
-    /* start */
-
-    val start_date = Date.now()
+    /* check */
 
     val still_running =
       try { Some(File.read(main_state_file)) }
       catch { case ERROR(_) => None }
 
     still_running match {
+      case None | Some("") =>
       case Some(running) =>
         error("Isabelle cronjob appears to be still running: " + running)
-      case None =>
-        File.write(main_state_file, start_date + " " + hostname)
-        log(start_date, "start cronjob")
     }
 
 
-    /* identify repository snapshots */
+    /* logger */
+
+    val hostname = Isabelle_System.hostname()
 
-    {
-      val pull_date = Date.now()
-
-      val isabelle_id = pull_repos(isabelle_repos)
-      val afp_id = pull_repos(afp_repos)
+    val logger: Consumer_Thread[String] =
+      Consumer_Thread.fork("cronjob: logger", daemon = true)(
+        consume = (text: String) =>
+          {
+            File.append(main_log, text + "\n")
+            progress.echo(text)
+            true
+          })
 
-      val log_path = log_dir + Build_Log.log_path("isabelle_identify", pull_date)
-      Isabelle_System.mkdirs(log_path.dir)
-      File.write(log_path,
-        Library.terminate_lines(
-          List("isabelle_identify: " + Build_Log.print_date(pull_date),
-            "",
-            "Isabelle version: " + isabelle_id,
-            "AFP version: " + afp_id)))
+    def log(date: Date, task_name: String, msg: String)
+    {
+      logger.send(
+        "[" + Build_Log.print_date(date) + ", " + hostname + ", " + task_name + "]: " + msg)
+    }
+
+    def log_start(task: Task) { log(task.start_date, task.name, "started") }
+
+    def log_end(end_date: Date, task: Task)
+    {
+      val elapsed_time = end_date.time - task.start_date.time
+      val msg =
+        (if (task.success) "finished" else "FAILED") +
+        (if (elapsed_time.seconds < 3.0) "" else ", elapsed time " + elapsed_time.message_hms)
+      log(end_date, task.name, msg)
     }
 
 
-    /* end */
+    /* manage tasks */
 
-    val end_date = Date.now()
-    val elapsed_time = end_date.time - start_date.time
+    def manage_tasks(task_specs: List[(String, Date => Unit)])
+    {
+      @tailrec def await(running: List[Task])
+      {
+        running.partition(_.is_finished) match {
+          case (Nil, Nil) =>
+          case (Nil, _ :: _) => Thread.sleep(500); await(running)
+          case (finished, remaining) =>
+            val end_date = Date.now()
+            finished.foreach(log_end(end_date, _))
+            await(remaining)
+        }
+      }
+      await(task_specs.map({ case (name, body) => new Task(name, body) }))
+    }
 
-    log(end_date, "end cronjob, elapsed time " + elapsed_time.message_hms)
+
+    /* main */
+
+    val main_task = new Task("main", _ => ())
+    File.write(main_state_file, main_task.start_date + " " + hostname)
+    log_start(main_task)
 
+    manage_tasks(List("isabelle_identify" -> isabelle_identify _))
+
+    log_end(Date.now(), main_task)
     main_state_file.file.delete
+
+    logger.shutdown()
   }