merged
authorwenzelm
Wed, 12 Oct 2016 22:06:06 +0200
changeset 64174 54479f7b6685
parent 64164 38c407446400 (current diff)
parent 64173 85ff21510ba9 (diff)
child 64175 8945293a9ed0
merged
--- a/src/Pure/Admin/build_history.scala	Wed Oct 12 20:38:47 2016 +0200
+++ b/src/Pure/Admin/build_history.scala	Wed Oct 12 22:06:06 2016 +0200
@@ -78,7 +78,7 @@
           "init_components " + File.bash_path(components_base_path) +
             " \"$ISABELLE_HOME/Admin/components/" + catalog + "\"")
       }
-      File.append(etc_settings, "\n" + Library.terminate_lines(component_settings))
+      File.append(etc_settings, "\n" + terminate_lines(component_settings))
     }
 
 
@@ -147,7 +147,7 @@
         List(ml_settings, thread_settings) :::
           (if (more_settings.isEmpty) Nil else List(more_settings))
 
-      File.append(etc_settings, "\n" + cat_lines(settings.map(Library.terminate_lines(_))))
+      File.append(etc_settings, "\n" + cat_lines(settings.map(terminate_lines(_))))
 
       ml_platform
     }
@@ -287,7 +287,7 @@
 
       Isabelle_System.mkdirs(log_path.dir)
       File.write_gzip(log_path,
-        Library.terminate_lines(
+        terminate_lines(
           Build_Log.Log_File.print_props(META_INFO_MARKER, meta_info) :: res.out_lines :::
           ml_statistics.map(Build_Log.Log_File.print_props(Build_Log.ML_STATISTICS_MARKER, _)) :::
           heap_sizes))
--- a/src/Pure/Admin/ci_profile.scala	Wed Oct 12 20:38:47 2016 +0200
+++ b/src/Pure/Admin/ci_profile.scala	Wed Oct 12 22:06:06 2016 +0200
@@ -72,7 +72,7 @@
 
 
   final def hg_id(path: Path): String =
-    Isabelle_System.hg("id -i", path.file).out
+    Mercurial.repository(path).identify(options = "-i")
 
   final def print_section(title: String): Unit =
     println(s"\n=== $title ===\n")
--- a/src/Pure/Admin/isabelle_cronjob.scala	Wed Oct 12 20:38:47 2016 +0200
+++ b/src/Pure/Admin/isabelle_cronjob.scala	Wed Oct 12 22:06:06 2016 +0200
@@ -7,6 +7,10 @@
 package isabelle
 
 
+import scala.annotation.tailrec
+import scala.collection.mutable
+
+
 object Isabelle_Cronjob
 {
   /** file-system state: owned by main cronjob **/
@@ -16,79 +20,154 @@
   val log_dir = main_dir + Path.explode("log")
 
   val main_state_file = run_dir + Path.explode("main.state")
-  val main_log = log_dir + Path.explode("main.log")
+  val main_log = log_dir + Path.explode("main.log")  // owned by log service
 
 
-  /* managed repository clones */
+  /* task logging */
+
+  sealed case class Logger_Task(name: String, body: Logger => Unit)
+
+  class Log_Service private[Isabelle_Cronjob](progress: Progress)
+  {
+    private val thread: Consumer_Thread[String] =
+      Consumer_Thread.fork("cronjob: logger", daemon = true)(
+        consume = (text: String) =>
+          {
+            File.append(main_log, text + "\n")   // critical
+            progress.echo(text)
+            true
+          })
+
+    def shutdown() { thread.shutdown() }
+
+    val hostname = Isabelle_System.hostname()
+
+    def log(date: Date, task_name: String, msg: String): Unit =
+      thread.send(
+        "[" + Build_Log.print_date(date) + ", " + hostname + ", " + task_name + "]: " + msg)
+
+    def start_logger(start_date: Date, task_name: String): Logger =
+      new Logger(this, start_date, task_name)
+
+    def run_task(start_date: Date, task: Logger_Task)
+    {
+      val logger = start_logger(start_date, task.name)
+      val res = Exn.capture { task.body(logger) }
+      val end_date = Date.now()
+      val err =
+        res match {
+          case Exn.Res(_) => None
+          case Exn.Exn(exn) => Some(Exn.message(exn))
+        }
+      logger.log_end(end_date, err)
+    }
+
+    def fork_task(start_date: Date, task: Logger_Task): Task =
+      new Task(task.name, run_task(start_date, task))
+  }
+
+  class Logger private[Isabelle_Cronjob](
+    val log_service: Log_Service, val start_date: Date, val task_name: String)
+  {
+    def log(date: Date, msg: String): Unit = log_service.log(date, task_name, msg)
+
+    def log_end(end_date: Date, err: Option[String])
+    {
+      val elapsed_time = end_date.time - start_date.time
+      val msg =
+        (if (err.isEmpty) "finished" else "ERROR " + err.get) +
+        (if (elapsed_time.seconds < 3.0) "" else ", elapsed time " + elapsed_time.message_hms)
+      log(end_date, msg)
+    }
+
+    log(start_date, "started")
+  }
+
+  class Task private[Isabelle_Cronjob](name: String, body: => Unit)
+  {
+    private val future: Future[Unit] = Future.thread("cronjob: " + name) { body }
+    def is_finished: Boolean = future.is_finished
+  }
+
+
+
+  /** particular tasks **/
+
+  /* identify repository snapshots */
 
   val isabelle_repos = main_dir + Path.explode("isabelle-build_history")
   val afp_repos = main_dir + Path.explode("AFP-build_history")
 
-  def pull_repos(root: Path): String =
-  {
-    val hg = Mercurial.repository(root)
-    hg.pull(options = "-q")
-    hg.identify("tip", options = "-i")
-  }
+  val isabelle_identify =
+    Logger_Task("isabelle_identify", logger =>
+      {
+        def pull_repos(root: Path): String =
+        {
+          val hg = Mercurial.repository(root)
+          hg.pull(options = "-q")
+          hg.identify("tip", options = "-i")
+        }
+
+        val isabelle_id = pull_repos(isabelle_repos)
+        val afp_id = pull_repos(afp_repos)
+
+        val log_path = log_dir + Build_Log.log_path("isabelle_identify", logger.start_date)
+        Isabelle_System.mkdirs(log_path.dir)
+        File.write(log_path,
+          terminate_lines(
+            List("isabelle_identify: " + Build_Log.print_date(logger.start_date),
+              "",
+              "Isabelle version: " + isabelle_id,
+              "AFP version: " + afp_id)))
+      })
+
+
 
   /** cronjob **/
 
   def cronjob(progress: Progress)
   {
-    /* log */
-
-    val hostname = Isabelle_System.hostname()
-
-    def log(date: Date, msg: String)
-    {
-      val text = "[" + Build_Log.print_date(date) + ", " + hostname + "]: " + msg
-      File.append(main_log, text + "\n")
-      progress.echo(text)
-    }
-
-
-    /* start */
-
-    val start_date = Date.now()
+    /* soft lock */
 
     val still_running =
       try { Some(File.read(main_state_file)) }
       catch { case ERROR(_) => None }
 
     still_running match {
+      case None | Some("") =>
       case Some(running) =>
         error("Isabelle cronjob appears to be still running: " + running)
-      case None =>
-        File.write(main_state_file, start_date + " " + hostname)
-        log(start_date, "start cronjob")
+    }
+
+    val main_start_date = Date.now()
+    val log_service = new Log_Service(progress)
+
+    File.write(main_state_file, main_start_date + " " + log_service.hostname)
+
+
+    /* parallel tasks */
+
+    def parallel_tasks(tasks: List[Logger_Task])
+    {
+      @tailrec def await(running: List[Task])
+      {
+        running.partition(_.is_finished) match {
+          case (Nil, Nil) =>
+          case (Nil, _ :: _) => Thread.sleep(500); await(running)
+          case (_ :: _, remaining) => await(remaining)
+        }
+      }
+      val start_date = Date.now()
+      await(tasks.map(task => log_service.fork_task(start_date, task)))
     }
 
 
-    /* identify repository snapshots */
-
-    {
-      val pull_date = Date.now()
-
-      val isabelle_id = pull_repos(isabelle_repos)
-      val afp_id = pull_repos(afp_repos)
+    /* main */
 
-      val log_path = log_dir + Build_Log.log_path("isabelle_identify", pull_date)
-      Isabelle_System.mkdirs(log_path.dir)
-      File.write(log_path,
-        Library.terminate_lines(
-          List("isabelle_identify: " + Build_Log.print_date(pull_date),
-            "",
-            "Isabelle version: " + isabelle_id,
-            "AFP version: " + afp_id)))
-    }
+    log_service.run_task(main_start_date,
+      Logger_Task("isabelle_cronjob", _ => parallel_tasks(List(isabelle_identify))))
 
-
-    /* end */
-
-    val end_date = Date.now()
-    val elapsed_time = end_date.time - start_date.time
-
-    log(end_date, "end cronjob, elapsed time " + elapsed_time.message_hms)
+    log_service.shutdown()
 
     main_state_file.file.delete
   }
--- a/src/Pure/General/mercurial.scala	Wed Oct 12 20:38:47 2016 +0200
+++ b/src/Pure/General/mercurial.scala	Wed Oct 12 22:06:06 2016 +0200
@@ -24,38 +24,64 @@
 
   /* repository access */
 
-  def repository(root: Path): Repository = new Repository(root)
+  def repository(root: Path, ssh: Option[SSH.Session] = None): Repository =
+  {
+    val hg = new Repository(root, ssh)
+    hg.command("root").check
+    hg
+  }
 
-  class Repository private[Mercurial](val root: Path)
+  def clone_repository(
+    source: String, dest: Path, options: String = "", ssh: Option[SSH.Session] = None): Repository =
+  {
+    val hg = new Repository(dest, ssh)
+    hg.command("clone",
+      File.bash_string(source) + " " + File.bash_string(dest.implode), options).check
+    hg
+  }
+
+  class Repository private[Mercurial](val root: Path, ssh: Option[SSH.Session])
   {
-    override def toString: String = root.toString
+    hg =>
+
+    override def toString: String =
+      ssh match {
+        case None => root.implode
+        case Some(session) => session.toString + ":" + root.implode
+      }
 
-    def command(cmd: String, cwd: JFile = null): Process_Result =
-      Isabelle_System.hg("--repository " + File.bash_path(root) + " --noninteractive " + cmd,
-        cwd = cwd)
-
+    def command(name: String, args: String = "", options: String = ""): Process_Result =
+    {
+      val cmdline =
+        "\"${HG:-hg}\"" +
+          (if (name == "clone") "" else " --repository " + File.bash_string(root.implode)) +
+          " --noninteractive " + name + " " + options + " " + args
+      ssh match {
+        case None => Isabelle_System.bash(cmdline)
+        case Some(session) => session.execute(cmdline)
+      }
+    }
 
     def heads(template: String = "{node|short}\n", options: String = ""): List[String] =
-      command("heads " + options + opt_template(template)).check.out_lines
+      hg.command("heads", opt_template(template), options).check.out_lines
 
     def identify(rev: String = "", options: String = ""): String =
-      command("id " + options + opt_rev(rev)).check.out_lines.headOption getOrElse ""
+      hg.command("id", opt_rev(rev), options).check.out_lines.headOption getOrElse ""
 
     def manifest(rev: String = "", options: String = ""): List[String] =
-      command("manifest " + options + opt_rev(rev)).check.out_lines
+      hg.command("manifest", opt_rev(rev), options).check.out_lines
 
     def log(rev: String = "", template: String = "", options: String = ""): String =
-      command("log " + options + opt_rev(rev) + opt_template(template)).check.out
+      hg.command("log", opt_rev(rev) + opt_template(template), options).check.out
 
     def pull(remote: String = "", rev: String = "", options: String = ""): Unit =
-      command("pull " + options + opt_rev(rev) + optional(remote)).check
+      hg.command("pull", opt_rev(rev) + optional(remote), options).check
 
-    def update(rev: String = "", clean: Boolean = false, check: Boolean = false, options: String = "")
+    def update(
+      rev: String = "", clean: Boolean = false, check: Boolean = false, options: String = "")
     {
-      command("update " + options +
-        opt_rev(rev) + opt_flag("--clean", clean) + opt_flag("--check", check)).check
+      hg.command("update",
+        opt_rev(rev) + opt_flag("--clean", clean) + opt_flag("--check", check), options).check
     }
-
-    command("root").check
   }
 }
--- a/src/Pure/General/ssh.scala	Wed Oct 12 20:38:47 2016 +0200
+++ b/src/Pure/General/ssh.scala	Wed Oct 12 22:06:06 2016 +0200
@@ -106,7 +106,7 @@
   /* channel */
 
   class Channel[C <: JSch_Channel] private[SSH](
-    val session: Session, val kind: String, val options: Options, val channel: C)
+    val session: Session, val kind: String, val channel: C)
   {
     override def toString: String = kind + " " + session.toString
 
@@ -118,9 +118,8 @@
 
   private val exec_wait_delay = Time.seconds(0.3)
 
-  class Exec private[SSH](
-    session: Session, kind: String, options: Options, channel: ChannelExec)
-    extends Channel[ChannelExec](session, kind, options, channel)
+  class Exec private[SSH](session: Session, kind: String, channel: ChannelExec)
+    extends Channel[ChannelExec](session, kind, channel)
   {
     def kill(signal: String) { channel.sendSignal(signal) }
 
@@ -134,8 +133,8 @@
     val stdout: InputStream = channel.getInputStream
     val stderr: InputStream = channel.getErrStream
 
-    // after preparing streams
-    channel.connect(connect_timeout(options))
+    // connect after preparing streams
+    channel.connect(connect_timeout(session.options))
 
     def result(
       progress_stdout: String => Unit = (_: String) => (),
@@ -204,11 +203,10 @@
     def is_dir: Boolean = attrs.isDir
   }
 
-  class Sftp private[SSH](
-    session: Session, kind: String, options: Options, channel: ChannelSftp)
-    extends Channel[ChannelSftp](session, kind, options, channel)
+  class Sftp private[SSH](session: Session, kind: String, channel: ChannelSftp)
+    extends Channel[ChannelSftp](session, kind, channel)
   {
-    channel.connect(connect_timeout(options))
+    channel.connect(connect_timeout(session.options))
 
     def home: String = channel.getHome()
 
@@ -268,8 +266,10 @@
 
   /* session */
 
-  class Session private[SSH](val session_options: Options, val session: JSch_Session)
+  class Session private[SSH](val options: Options, val session: JSch_Session)
   {
+    def update_options(new_options: Options): Session = new Session(new_options, session)
+
     override def toString: String =
       (if (session.getUserName == null) "" else session.getUserName + "@") +
       (if (session.getHost == null) "" else session.getHost) +
@@ -279,25 +279,24 @@
     def close() { session.disconnect }
 
     def execute(command: String,
-        options: Options = session_options,
         progress_stdout: String => Unit = (_: String) => (),
         progress_stderr: String => Unit = (_: String) => (),
         strict: Boolean = true): Process_Result =
-      exec(command, options).result(progress_stdout, progress_stderr, strict)
+      exec(command).result(progress_stdout, progress_stderr, strict)
 
-    def exec(command: String, options: Options = session_options): Exec =
+    def exec(command: String): Exec =
     {
       val kind = "exec"
       val channel = session.openChannel(kind).asInstanceOf[ChannelExec]
       channel.setCommand(command)
-      new Exec(this, kind, options, channel)
+      new Exec(this, kind, channel)
     }
 
-    def sftp(options: Options = session_options): Sftp =
+    def sftp(): Sftp =
     {
       val kind = "sftp"
       val channel = session.openChannel(kind).asInstanceOf[ChannelSftp]
-      new Sftp(this, kind, options, channel)
+      new Sftp(this, kind, channel)
     }
 
 
@@ -319,6 +318,8 @@
 
 class SSH private(val options: Options, val jsch: JSch)
 {
+  def update_options(new_options: Options): SSH = new SSH(new_options, jsch)
+
   def open_session(host: String, port: Int = SSH.default_port, user: String = ""): SSH.Session =
   {
     val session = jsch.getSession(if (user == "") null else user, host, port)
--- a/src/Pure/ROOT.scala	Wed Oct 12 20:38:47 2016 +0200
+++ b/src/Pure/ROOT.scala	Wed Oct 12 22:06:06 2016 +0200
@@ -15,6 +15,7 @@
   val space_explode = Library.space_explode _
   val split_lines = Library.split_lines _
   val cat_lines = Library.cat_lines _
+  val terminate_lines = Library.terminate_lines _
   val quote = Library.quote _
   val commas = Library.commas _
   val commas_quote = Library.commas_quote _
--- a/src/Pure/System/isabelle_system.scala	Wed Oct 12 20:38:47 2016 +0200
+++ b/src/Pure/System/isabelle_system.scala	Wed Oct 12 22:06:06 2016 +0200
@@ -317,9 +317,6 @@
   def pdf_viewer(arg: Path): Unit =
     bash("exec \"$PDF_VIEWER\" " + File.bash_path(arg) + " >/dev/null 2>/dev/null &")
 
-  def hg(cmd_line: String, cwd: JFile = null): Process_Result =
-    bash("\"${HG:-hg}\" " + cmd_line, cwd = cwd)
-
 
 
   /** Isabelle resources **/
--- a/src/Pure/Tools/build.scala	Wed Oct 12 20:38:47 2016 +0200
+++ b/src/Pure/Tools/build.scala	Wed Oct 12 22:06:06 2016 +0200
@@ -562,7 +562,7 @@
                     yield Sessions.write_heap_digest(path)
 
                 File.write_gzip(store.output_dir + Sessions.log_gz(name),
-                  Library.terminate_lines(
+                  terminate_lines(
                     session_sources_stamp(name) ::
                     input_heaps.map(INPUT_HEAP + _) :::
                     heap_stamp.toList.map(OUTPUT_HEAP + _) :::
@@ -575,7 +575,7 @@
                 (store.output_dir + Sessions.log_gz(name)).file.delete
 
                 File.write(store.output_dir + Sessions.log(name),
-                  Library.terminate_lines(process_result.out_lines))
+                  terminate_lines(process_result.out_lines))
                 progress.echo(name + " FAILED")
                 if (!process_result.interrupted) progress.echo(process_result_tail.out)