--- a/src/Pure/Admin/build_history.scala Wed Oct 12 20:38:47 2016 +0200
+++ b/src/Pure/Admin/build_history.scala Wed Oct 12 22:06:06 2016 +0200
@@ -78,7 +78,7 @@
"init_components " + File.bash_path(components_base_path) +
" \"$ISABELLE_HOME/Admin/components/" + catalog + "\"")
}
- File.append(etc_settings, "\n" + Library.terminate_lines(component_settings))
+ File.append(etc_settings, "\n" + terminate_lines(component_settings))
}
@@ -147,7 +147,7 @@
List(ml_settings, thread_settings) :::
(if (more_settings.isEmpty) Nil else List(more_settings))
- File.append(etc_settings, "\n" + cat_lines(settings.map(Library.terminate_lines(_))))
+ File.append(etc_settings, "\n" + cat_lines(settings.map(terminate_lines(_))))
ml_platform
}
@@ -287,7 +287,7 @@
Isabelle_System.mkdirs(log_path.dir)
File.write_gzip(log_path,
- Library.terminate_lines(
+ terminate_lines(
Build_Log.Log_File.print_props(META_INFO_MARKER, meta_info) :: res.out_lines :::
ml_statistics.map(Build_Log.Log_File.print_props(Build_Log.ML_STATISTICS_MARKER, _)) :::
heap_sizes))
--- a/src/Pure/Admin/ci_profile.scala Wed Oct 12 20:38:47 2016 +0200
+++ b/src/Pure/Admin/ci_profile.scala Wed Oct 12 22:06:06 2016 +0200
@@ -72,7 +72,7 @@
final def hg_id(path: Path): String =
- Isabelle_System.hg("id -i", path.file).out
+ Mercurial.repository(path).identify(options = "-i")
final def print_section(title: String): Unit =
println(s"\n=== $title ===\n")
--- a/src/Pure/Admin/isabelle_cronjob.scala Wed Oct 12 20:38:47 2016 +0200
+++ b/src/Pure/Admin/isabelle_cronjob.scala Wed Oct 12 22:06:06 2016 +0200
@@ -7,6 +7,10 @@
package isabelle
+import scala.annotation.tailrec
+import scala.collection.mutable
+
+
object Isabelle_Cronjob
{
/** file-system state: owned by main cronjob **/
@@ -16,79 +20,154 @@
val log_dir = main_dir + Path.explode("log")
val main_state_file = run_dir + Path.explode("main.state")
- val main_log = log_dir + Path.explode("main.log")
+ val main_log = log_dir + Path.explode("main.log") // owned by log service
- /* managed repository clones */
+ /* task logging */
+
+ sealed case class Logger_Task(name: String, body: Logger => Unit)
+
+ class Log_Service private[Isabelle_Cronjob](progress: Progress)
+ {
+ private val thread: Consumer_Thread[String] =
+ Consumer_Thread.fork("cronjob: logger", daemon = true)(
+ consume = (text: String) =>
+ {
+ File.append(main_log, text + "\n") // critical
+ progress.echo(text)
+ true
+ })
+
+ def shutdown() { thread.shutdown() }
+
+ val hostname = Isabelle_System.hostname()
+
+ def log(date: Date, task_name: String, msg: String): Unit =
+ thread.send(
+ "[" + Build_Log.print_date(date) + ", " + hostname + ", " + task_name + "]: " + msg)
+
+ def start_logger(start_date: Date, task_name: String): Logger =
+ new Logger(this, start_date, task_name)
+
+ def run_task(start_date: Date, task: Logger_Task)
+ {
+ val logger = start_logger(start_date, task.name)
+ val res = Exn.capture { task.body(logger) }
+ val end_date = Date.now()
+ val err =
+ res match {
+ case Exn.Res(_) => None
+ case Exn.Exn(exn) => Some(Exn.message(exn))
+ }
+ logger.log_end(end_date, err)
+ }
+
+ def fork_task(start_date: Date, task: Logger_Task): Task =
+ new Task(task.name, run_task(start_date, task))
+ }
+
+ class Logger private[Isabelle_Cronjob](
+ val log_service: Log_Service, val start_date: Date, val task_name: String)
+ {
+ def log(date: Date, msg: String): Unit = log_service.log(date, task_name, msg)
+
+ def log_end(end_date: Date, err: Option[String])
+ {
+ val elapsed_time = end_date.time - start_date.time
+ val msg =
+ (if (err.isEmpty) "finished" else "ERROR " + err.get) +
+ (if (elapsed_time.seconds < 3.0) "" else ", elapsed time " + elapsed_time.message_hms)
+ log(end_date, msg)
+ }
+
+ log(start_date, "started")
+ }
+
+ class Task private[Isabelle_Cronjob](name: String, body: => Unit)
+ {
+ private val future: Future[Unit] = Future.thread("cronjob: " + name) { body }
+ def is_finished: Boolean = future.is_finished
+ }
+
+
+
+ /** particular tasks **/
+
+ /* identify repository snapshots */
val isabelle_repos = main_dir + Path.explode("isabelle-build_history")
val afp_repos = main_dir + Path.explode("AFP-build_history")
- def pull_repos(root: Path): String =
- {
- val hg = Mercurial.repository(root)
- hg.pull(options = "-q")
- hg.identify("tip", options = "-i")
- }
+ val isabelle_identify =
+ Logger_Task("isabelle_identify", logger =>
+ {
+ def pull_repos(root: Path): String =
+ {
+ val hg = Mercurial.repository(root)
+ hg.pull(options = "-q")
+ hg.identify("tip", options = "-i")
+ }
+
+ val isabelle_id = pull_repos(isabelle_repos)
+ val afp_id = pull_repos(afp_repos)
+
+ val log_path = log_dir + Build_Log.log_path("isabelle_identify", logger.start_date)
+ Isabelle_System.mkdirs(log_path.dir)
+ File.write(log_path,
+ terminate_lines(
+ List("isabelle_identify: " + Build_Log.print_date(logger.start_date),
+ "",
+ "Isabelle version: " + isabelle_id,
+ "AFP version: " + afp_id)))
+ })
+
+
/** cronjob **/
def cronjob(progress: Progress)
{
- /* log */
-
- val hostname = Isabelle_System.hostname()
-
- def log(date: Date, msg: String)
- {
- val text = "[" + Build_Log.print_date(date) + ", " + hostname + "]: " + msg
- File.append(main_log, text + "\n")
- progress.echo(text)
- }
-
-
- /* start */
-
- val start_date = Date.now()
+ /* soft lock */
val still_running =
try { Some(File.read(main_state_file)) }
catch { case ERROR(_) => None }
still_running match {
+ case None | Some("") =>
case Some(running) =>
error("Isabelle cronjob appears to be still running: " + running)
- case None =>
- File.write(main_state_file, start_date + " " + hostname)
- log(start_date, "start cronjob")
+ }
+
+ val main_start_date = Date.now()
+ val log_service = new Log_Service(progress)
+
+ File.write(main_state_file, main_start_date + " " + log_service.hostname)
+
+
+ /* parallel tasks */
+
+ def parallel_tasks(tasks: List[Logger_Task])
+ {
+ @tailrec def await(running: List[Task])
+ {
+ running.partition(_.is_finished) match {
+ case (Nil, Nil) =>
+ case (Nil, _ :: _) => Thread.sleep(500); await(running)
+ case (_ :: _, remaining) => await(remaining)
+ }
+ }
+ val start_date = Date.now()
+ await(tasks.map(task => log_service.fork_task(start_date, task)))
}
- /* identify repository snapshots */
-
- {
- val pull_date = Date.now()
-
- val isabelle_id = pull_repos(isabelle_repos)
- val afp_id = pull_repos(afp_repos)
+ /* main */
- val log_path = log_dir + Build_Log.log_path("isabelle_identify", pull_date)
- Isabelle_System.mkdirs(log_path.dir)
- File.write(log_path,
- Library.terminate_lines(
- List("isabelle_identify: " + Build_Log.print_date(pull_date),
- "",
- "Isabelle version: " + isabelle_id,
- "AFP version: " + afp_id)))
- }
+ log_service.run_task(main_start_date,
+ Logger_Task("isabelle_cronjob", _ => parallel_tasks(List(isabelle_identify))))
-
- /* end */
-
- val end_date = Date.now()
- val elapsed_time = end_date.time - start_date.time
-
- log(end_date, "end cronjob, elapsed time " + elapsed_time.message_hms)
+ log_service.shutdown()
main_state_file.file.delete
}
--- a/src/Pure/General/mercurial.scala Wed Oct 12 20:38:47 2016 +0200
+++ b/src/Pure/General/mercurial.scala Wed Oct 12 22:06:06 2016 +0200
@@ -24,38 +24,64 @@
/* repository access */
- def repository(root: Path): Repository = new Repository(root)
+ def repository(root: Path, ssh: Option[SSH.Session] = None): Repository =
+ {
+ val hg = new Repository(root, ssh)
+ hg.command("root").check
+ hg
+ }
- class Repository private[Mercurial](val root: Path)
+ def clone_repository(
+ source: String, dest: Path, options: String = "", ssh: Option[SSH.Session] = None): Repository =
+ {
+ val hg = new Repository(dest, ssh)
+ hg.command("clone",
+ File.bash_string(source) + " " + File.bash_string(dest.implode), options).check
+ hg
+ }
+
+ class Repository private[Mercurial](val root: Path, ssh: Option[SSH.Session])
{
- override def toString: String = root.toString
+ hg =>
+
+ override def toString: String =
+ ssh match {
+ case None => root.implode
+ case Some(session) => session.toString + ":" + root.implode
+ }
- def command(cmd: String, cwd: JFile = null): Process_Result =
- Isabelle_System.hg("--repository " + File.bash_path(root) + " --noninteractive " + cmd,
- cwd = cwd)
-
+ def command(name: String, args: String = "", options: String = ""): Process_Result =
+ {
+ val cmdline =
+ "\"${HG:-hg}\"" +
+ (if (name == "clone") "" else " --repository " + File.bash_string(root.implode)) +
+ " --noninteractive " + name + " " + options + " " + args
+ ssh match {
+ case None => Isabelle_System.bash(cmdline)
+ case Some(session) => session.execute(cmdline)
+ }
+ }
def heads(template: String = "{node|short}\n", options: String = ""): List[String] =
- command("heads " + options + opt_template(template)).check.out_lines
+ hg.command("heads", opt_template(template), options).check.out_lines
def identify(rev: String = "", options: String = ""): String =
- command("id " + options + opt_rev(rev)).check.out_lines.headOption getOrElse ""
+ hg.command("id", opt_rev(rev), options).check.out_lines.headOption getOrElse ""
def manifest(rev: String = "", options: String = ""): List[String] =
- command("manifest " + options + opt_rev(rev)).check.out_lines
+ hg.command("manifest", opt_rev(rev), options).check.out_lines
def log(rev: String = "", template: String = "", options: String = ""): String =
- command("log " + options + opt_rev(rev) + opt_template(template)).check.out
+ hg.command("log", opt_rev(rev) + opt_template(template), options).check.out
def pull(remote: String = "", rev: String = "", options: String = ""): Unit =
- command("pull " + options + opt_rev(rev) + optional(remote)).check
+ hg.command("pull", opt_rev(rev) + optional(remote), options).check
- def update(rev: String = "", clean: Boolean = false, check: Boolean = false, options: String = "")
+ def update(
+ rev: String = "", clean: Boolean = false, check: Boolean = false, options: String = "")
{
- command("update " + options +
- opt_rev(rev) + opt_flag("--clean", clean) + opt_flag("--check", check)).check
+ hg.command("update",
+ opt_rev(rev) + opt_flag("--clean", clean) + opt_flag("--check", check), options).check
}
-
- command("root").check
}
}
--- a/src/Pure/General/ssh.scala Wed Oct 12 20:38:47 2016 +0200
+++ b/src/Pure/General/ssh.scala Wed Oct 12 22:06:06 2016 +0200
@@ -106,7 +106,7 @@
/* channel */
class Channel[C <: JSch_Channel] private[SSH](
- val session: Session, val kind: String, val options: Options, val channel: C)
+ val session: Session, val kind: String, val channel: C)
{
override def toString: String = kind + " " + session.toString
@@ -118,9 +118,8 @@
private val exec_wait_delay = Time.seconds(0.3)
- class Exec private[SSH](
- session: Session, kind: String, options: Options, channel: ChannelExec)
- extends Channel[ChannelExec](session, kind, options, channel)
+ class Exec private[SSH](session: Session, kind: String, channel: ChannelExec)
+ extends Channel[ChannelExec](session, kind, channel)
{
def kill(signal: String) { channel.sendSignal(signal) }
@@ -134,8 +133,8 @@
val stdout: InputStream = channel.getInputStream
val stderr: InputStream = channel.getErrStream
- // after preparing streams
- channel.connect(connect_timeout(options))
+ // connect after preparing streams
+ channel.connect(connect_timeout(session.options))
def result(
progress_stdout: String => Unit = (_: String) => (),
@@ -204,11 +203,10 @@
def is_dir: Boolean = attrs.isDir
}
- class Sftp private[SSH](
- session: Session, kind: String, options: Options, channel: ChannelSftp)
- extends Channel[ChannelSftp](session, kind, options, channel)
+ class Sftp private[SSH](session: Session, kind: String, channel: ChannelSftp)
+ extends Channel[ChannelSftp](session, kind, channel)
{
- channel.connect(connect_timeout(options))
+ channel.connect(connect_timeout(session.options))
def home: String = channel.getHome()
@@ -268,8 +266,10 @@
/* session */
- class Session private[SSH](val session_options: Options, val session: JSch_Session)
+ class Session private[SSH](val options: Options, val session: JSch_Session)
{
+ def update_options(new_options: Options): Session = new Session(new_options, session)
+
override def toString: String =
(if (session.getUserName == null) "" else session.getUserName + "@") +
(if (session.getHost == null) "" else session.getHost) +
@@ -279,25 +279,24 @@
def close() { session.disconnect }
def execute(command: String,
- options: Options = session_options,
progress_stdout: String => Unit = (_: String) => (),
progress_stderr: String => Unit = (_: String) => (),
strict: Boolean = true): Process_Result =
- exec(command, options).result(progress_stdout, progress_stderr, strict)
+ exec(command).result(progress_stdout, progress_stderr, strict)
- def exec(command: String, options: Options = session_options): Exec =
+ def exec(command: String): Exec =
{
val kind = "exec"
val channel = session.openChannel(kind).asInstanceOf[ChannelExec]
channel.setCommand(command)
- new Exec(this, kind, options, channel)
+ new Exec(this, kind, channel)
}
- def sftp(options: Options = session_options): Sftp =
+ def sftp(): Sftp =
{
val kind = "sftp"
val channel = session.openChannel(kind).asInstanceOf[ChannelSftp]
- new Sftp(this, kind, options, channel)
+ new Sftp(this, kind, channel)
}
@@ -319,6 +318,8 @@
class SSH private(val options: Options, val jsch: JSch)
{
+ def update_options(new_options: Options): SSH = new SSH(new_options, jsch)
+
def open_session(host: String, port: Int = SSH.default_port, user: String = ""): SSH.Session =
{
val session = jsch.getSession(if (user == "") null else user, host, port)
--- a/src/Pure/ROOT.scala Wed Oct 12 20:38:47 2016 +0200
+++ b/src/Pure/ROOT.scala Wed Oct 12 22:06:06 2016 +0200
@@ -15,6 +15,7 @@
val space_explode = Library.space_explode _
val split_lines = Library.split_lines _
val cat_lines = Library.cat_lines _
+ val terminate_lines = Library.terminate_lines _
val quote = Library.quote _
val commas = Library.commas _
val commas_quote = Library.commas_quote _
--- a/src/Pure/System/isabelle_system.scala Wed Oct 12 20:38:47 2016 +0200
+++ b/src/Pure/System/isabelle_system.scala Wed Oct 12 22:06:06 2016 +0200
@@ -317,9 +317,6 @@
def pdf_viewer(arg: Path): Unit =
bash("exec \"$PDF_VIEWER\" " + File.bash_path(arg) + " >/dev/null 2>/dev/null &")
- def hg(cmd_line: String, cwd: JFile = null): Process_Result =
- bash("\"${HG:-hg}\" " + cmd_line, cwd = cwd)
-
/** Isabelle resources **/
--- a/src/Pure/Tools/build.scala Wed Oct 12 20:38:47 2016 +0200
+++ b/src/Pure/Tools/build.scala Wed Oct 12 22:06:06 2016 +0200
@@ -562,7 +562,7 @@
yield Sessions.write_heap_digest(path)
File.write_gzip(store.output_dir + Sessions.log_gz(name),
- Library.terminate_lines(
+ terminate_lines(
session_sources_stamp(name) ::
input_heaps.map(INPUT_HEAP + _) :::
heap_stamp.toList.map(OUTPUT_HEAP + _) :::
@@ -575,7 +575,7 @@
(store.output_dir + Sessions.log_gz(name)).file.delete
File.write(store.output_dir + Sessions.log(name),
- Library.terminate_lines(process_result.out_lines))
+ terminate_lines(process_result.out_lines))
progress.echo(name + " FAILED")
if (!process_result.interrupted) progress.echo(process_result_tail.out)