# HG changeset patch # User wenzelm # Date 1476302766 -7200 # Node ID 54479f7b668552b1bc4be11e5dfc5b89d2f2814b # Parent 38c40744640038e9f3da31e152daa499494cd322# Parent 85ff21510ba9e788451154e24d0e152e6b56b4bc merged diff -r 38c407446400 -r 54479f7b6685 src/Pure/Admin/build_history.scala --- a/src/Pure/Admin/build_history.scala Wed Oct 12 20:38:47 2016 +0200 +++ b/src/Pure/Admin/build_history.scala Wed Oct 12 22:06:06 2016 +0200 @@ -78,7 +78,7 @@ "init_components " + File.bash_path(components_base_path) + " \"$ISABELLE_HOME/Admin/components/" + catalog + "\"") } - File.append(etc_settings, "\n" + Library.terminate_lines(component_settings)) + File.append(etc_settings, "\n" + terminate_lines(component_settings)) } @@ -147,7 +147,7 @@ List(ml_settings, thread_settings) ::: (if (more_settings.isEmpty) Nil else List(more_settings)) - File.append(etc_settings, "\n" + cat_lines(settings.map(Library.terminate_lines(_)))) + File.append(etc_settings, "\n" + cat_lines(settings.map(terminate_lines(_)))) ml_platform } @@ -287,7 +287,7 @@ Isabelle_System.mkdirs(log_path.dir) File.write_gzip(log_path, - Library.terminate_lines( + terminate_lines( Build_Log.Log_File.print_props(META_INFO_MARKER, meta_info) :: res.out_lines ::: ml_statistics.map(Build_Log.Log_File.print_props(Build_Log.ML_STATISTICS_MARKER, _)) ::: heap_sizes)) diff -r 38c407446400 -r 54479f7b6685 src/Pure/Admin/ci_profile.scala --- a/src/Pure/Admin/ci_profile.scala Wed Oct 12 20:38:47 2016 +0200 +++ b/src/Pure/Admin/ci_profile.scala Wed Oct 12 22:06:06 2016 +0200 @@ -72,7 +72,7 @@ final def hg_id(path: Path): String = - Isabelle_System.hg("id -i", path.file).out + Mercurial.repository(path).identify(options = "-i") final def print_section(title: String): Unit = println(s"\n=== $title ===\n") diff -r 38c407446400 -r 54479f7b6685 src/Pure/Admin/isabelle_cronjob.scala --- a/src/Pure/Admin/isabelle_cronjob.scala Wed Oct 12 20:38:47 2016 +0200 +++ b/src/Pure/Admin/isabelle_cronjob.scala Wed Oct 12 22:06:06 2016 +0200 @@ -7,6 +7,10 @@ package isabelle +import scala.annotation.tailrec +import scala.collection.mutable + + object Isabelle_Cronjob { /** file-system state: owned by main cronjob **/ @@ -16,79 +20,154 @@ val log_dir = main_dir + Path.explode("log") val main_state_file = run_dir + Path.explode("main.state") - val main_log = log_dir + Path.explode("main.log") + val main_log = log_dir + Path.explode("main.log") // owned by log service - /* managed repository clones */ + /* task logging */ + + sealed case class Logger_Task(name: String, body: Logger => Unit) + + class Log_Service private[Isabelle_Cronjob](progress: Progress) + { + private val thread: Consumer_Thread[String] = + Consumer_Thread.fork("cronjob: logger", daemon = true)( + consume = (text: String) => + { + File.append(main_log, text + "\n") // critical + progress.echo(text) + true + }) + + def shutdown() { thread.shutdown() } + + val hostname = Isabelle_System.hostname() + + def log(date: Date, task_name: String, msg: String): Unit = + thread.send( + "[" + Build_Log.print_date(date) + ", " + hostname + ", " + task_name + "]: " + msg) + + def start_logger(start_date: Date, task_name: String): Logger = + new Logger(this, start_date, task_name) + + def run_task(start_date: Date, task: Logger_Task) + { + val logger = start_logger(start_date, task.name) + val res = Exn.capture { task.body(logger) } + val end_date = Date.now() + val err = + res match { + case Exn.Res(_) => None + case Exn.Exn(exn) => Some(Exn.message(exn)) + } + logger.log_end(end_date, err) + } + + def fork_task(start_date: Date, task: Logger_Task): Task = + new Task(task.name, run_task(start_date, task)) + } + + class Logger private[Isabelle_Cronjob]( + val log_service: Log_Service, val start_date: Date, val task_name: String) + { + def log(date: Date, msg: String): Unit = log_service.log(date, task_name, msg) + + def log_end(end_date: Date, err: Option[String]) + { + val elapsed_time = end_date.time - start_date.time + val msg = + (if (err.isEmpty) "finished" else "ERROR " + err.get) + + (if (elapsed_time.seconds < 3.0) "" else ", elapsed time " + elapsed_time.message_hms) + log(end_date, msg) + } + + log(start_date, "started") + } + + class Task private[Isabelle_Cronjob](name: String, body: => Unit) + { + private val future: Future[Unit] = Future.thread("cronjob: " + name) { body } + def is_finished: Boolean = future.is_finished + } + + + + /** particular tasks **/ + + /* identify repository snapshots */ val isabelle_repos = main_dir + Path.explode("isabelle-build_history") val afp_repos = main_dir + Path.explode("AFP-build_history") - def pull_repos(root: Path): String = - { - val hg = Mercurial.repository(root) - hg.pull(options = "-q") - hg.identify("tip", options = "-i") - } + val isabelle_identify = + Logger_Task("isabelle_identify", logger => + { + def pull_repos(root: Path): String = + { + val hg = Mercurial.repository(root) + hg.pull(options = "-q") + hg.identify("tip", options = "-i") + } + + val isabelle_id = pull_repos(isabelle_repos) + val afp_id = pull_repos(afp_repos) + + val log_path = log_dir + Build_Log.log_path("isabelle_identify", logger.start_date) + Isabelle_System.mkdirs(log_path.dir) + File.write(log_path, + terminate_lines( + List("isabelle_identify: " + Build_Log.print_date(logger.start_date), + "", + "Isabelle version: " + isabelle_id, + "AFP version: " + afp_id))) + }) + + /** cronjob **/ def cronjob(progress: Progress) { - /* log */ - - val hostname = Isabelle_System.hostname() - - def log(date: Date, msg: String) - { - val text = "[" + Build_Log.print_date(date) + ", " + hostname + "]: " + msg - File.append(main_log, text + "\n") - progress.echo(text) - } - - - /* start */ - - val start_date = Date.now() + /* soft lock */ val still_running = try { Some(File.read(main_state_file)) } catch { case ERROR(_) => None } still_running match { + case None | Some("") => case Some(running) => error("Isabelle cronjob appears to be still running: " + running) - case None => - File.write(main_state_file, start_date + " " + hostname) - log(start_date, "start cronjob") + } + + val main_start_date = Date.now() + val log_service = new Log_Service(progress) + + File.write(main_state_file, main_start_date + " " + log_service.hostname) + + + /* parallel tasks */ + + def parallel_tasks(tasks: List[Logger_Task]) + { + @tailrec def await(running: List[Task]) + { + running.partition(_.is_finished) match { + case (Nil, Nil) => + case (Nil, _ :: _) => Thread.sleep(500); await(running) + case (_ :: _, remaining) => await(remaining) + } + } + val start_date = Date.now() + await(tasks.map(task => log_service.fork_task(start_date, task))) } - /* identify repository snapshots */ - - { - val pull_date = Date.now() - - val isabelle_id = pull_repos(isabelle_repos) - val afp_id = pull_repos(afp_repos) + /* main */ - val log_path = log_dir + Build_Log.log_path("isabelle_identify", pull_date) - Isabelle_System.mkdirs(log_path.dir) - File.write(log_path, - Library.terminate_lines( - List("isabelle_identify: " + Build_Log.print_date(pull_date), - "", - "Isabelle version: " + isabelle_id, - "AFP version: " + afp_id))) - } + log_service.run_task(main_start_date, + Logger_Task("isabelle_cronjob", _ => parallel_tasks(List(isabelle_identify)))) - - /* end */ - - val end_date = Date.now() - val elapsed_time = end_date.time - start_date.time - - log(end_date, "end cronjob, elapsed time " + elapsed_time.message_hms) + log_service.shutdown() main_state_file.file.delete } diff -r 38c407446400 -r 54479f7b6685 src/Pure/General/mercurial.scala --- a/src/Pure/General/mercurial.scala Wed Oct 12 20:38:47 2016 +0200 +++ b/src/Pure/General/mercurial.scala Wed Oct 12 22:06:06 2016 +0200 @@ -24,38 +24,64 @@ /* repository access */ - def repository(root: Path): Repository = new Repository(root) + def repository(root: Path, ssh: Option[SSH.Session] = None): Repository = + { + val hg = new Repository(root, ssh) + hg.command("root").check + hg + } - class Repository private[Mercurial](val root: Path) + def clone_repository( + source: String, dest: Path, options: String = "", ssh: Option[SSH.Session] = None): Repository = + { + val hg = new Repository(dest, ssh) + hg.command("clone", + File.bash_string(source) + " " + File.bash_string(dest.implode), options).check + hg + } + + class Repository private[Mercurial](val root: Path, ssh: Option[SSH.Session]) { - override def toString: String = root.toString + hg => + + override def toString: String = + ssh match { + case None => root.implode + case Some(session) => session.toString + ":" + root.implode + } - def command(cmd: String, cwd: JFile = null): Process_Result = - Isabelle_System.hg("--repository " + File.bash_path(root) + " --noninteractive " + cmd, - cwd = cwd) - + def command(name: String, args: String = "", options: String = ""): Process_Result = + { + val cmdline = + "\"${HG:-hg}\"" + + (if (name == "clone") "" else " --repository " + File.bash_string(root.implode)) + + " --noninteractive " + name + " " + options + " " + args + ssh match { + case None => Isabelle_System.bash(cmdline) + case Some(session) => session.execute(cmdline) + } + } def heads(template: String = "{node|short}\n", options: String = ""): List[String] = - command("heads " + options + opt_template(template)).check.out_lines + hg.command("heads", opt_template(template), options).check.out_lines def identify(rev: String = "", options: String = ""): String = - command("id " + options + opt_rev(rev)).check.out_lines.headOption getOrElse "" + hg.command("id", opt_rev(rev), options).check.out_lines.headOption getOrElse "" def manifest(rev: String = "", options: String = ""): List[String] = - command("manifest " + options + opt_rev(rev)).check.out_lines + hg.command("manifest", opt_rev(rev), options).check.out_lines def log(rev: String = "", template: String = "", options: String = ""): String = - command("log " + options + opt_rev(rev) + opt_template(template)).check.out + hg.command("log", opt_rev(rev) + opt_template(template), options).check.out def pull(remote: String = "", rev: String = "", options: String = ""): Unit = - command("pull " + options + opt_rev(rev) + optional(remote)).check + hg.command("pull", opt_rev(rev) + optional(remote), options).check - def update(rev: String = "", clean: Boolean = false, check: Boolean = false, options: String = "") + def update( + rev: String = "", clean: Boolean = false, check: Boolean = false, options: String = "") { - command("update " + options + - opt_rev(rev) + opt_flag("--clean", clean) + opt_flag("--check", check)).check + hg.command("update", + opt_rev(rev) + opt_flag("--clean", clean) + opt_flag("--check", check), options).check } - - command("root").check } } diff -r 38c407446400 -r 54479f7b6685 src/Pure/General/ssh.scala --- a/src/Pure/General/ssh.scala Wed Oct 12 20:38:47 2016 +0200 +++ b/src/Pure/General/ssh.scala Wed Oct 12 22:06:06 2016 +0200 @@ -106,7 +106,7 @@ /* channel */ class Channel[C <: JSch_Channel] private[SSH]( - val session: Session, val kind: String, val options: Options, val channel: C) + val session: Session, val kind: String, val channel: C) { override def toString: String = kind + " " + session.toString @@ -118,9 +118,8 @@ private val exec_wait_delay = Time.seconds(0.3) - class Exec private[SSH]( - session: Session, kind: String, options: Options, channel: ChannelExec) - extends Channel[ChannelExec](session, kind, options, channel) + class Exec private[SSH](session: Session, kind: String, channel: ChannelExec) + extends Channel[ChannelExec](session, kind, channel) { def kill(signal: String) { channel.sendSignal(signal) } @@ -134,8 +133,8 @@ val stdout: InputStream = channel.getInputStream val stderr: InputStream = channel.getErrStream - // after preparing streams - channel.connect(connect_timeout(options)) + // connect after preparing streams + channel.connect(connect_timeout(session.options)) def result( progress_stdout: String => Unit = (_: String) => (), @@ -204,11 +203,10 @@ def is_dir: Boolean = attrs.isDir } - class Sftp private[SSH]( - session: Session, kind: String, options: Options, channel: ChannelSftp) - extends Channel[ChannelSftp](session, kind, options, channel) + class Sftp private[SSH](session: Session, kind: String, channel: ChannelSftp) + extends Channel[ChannelSftp](session, kind, channel) { - channel.connect(connect_timeout(options)) + channel.connect(connect_timeout(session.options)) def home: String = channel.getHome() @@ -268,8 +266,10 @@ /* session */ - class Session private[SSH](val session_options: Options, val session: JSch_Session) + class Session private[SSH](val options: Options, val session: JSch_Session) { + def update_options(new_options: Options): Session = new Session(new_options, session) + override def toString: String = (if (session.getUserName == null) "" else session.getUserName + "@") + (if (session.getHost == null) "" else session.getHost) + @@ -279,25 +279,24 @@ def close() { session.disconnect } def execute(command: String, - options: Options = session_options, progress_stdout: String => Unit = (_: String) => (), progress_stderr: String => Unit = (_: String) => (), strict: Boolean = true): Process_Result = - exec(command, options).result(progress_stdout, progress_stderr, strict) + exec(command).result(progress_stdout, progress_stderr, strict) - def exec(command: String, options: Options = session_options): Exec = + def exec(command: String): Exec = { val kind = "exec" val channel = session.openChannel(kind).asInstanceOf[ChannelExec] channel.setCommand(command) - new Exec(this, kind, options, channel) + new Exec(this, kind, channel) } - def sftp(options: Options = session_options): Sftp = + def sftp(): Sftp = { val kind = "sftp" val channel = session.openChannel(kind).asInstanceOf[ChannelSftp] - new Sftp(this, kind, options, channel) + new Sftp(this, kind, channel) } @@ -319,6 +318,8 @@ class SSH private(val options: Options, val jsch: JSch) { + def update_options(new_options: Options): SSH = new SSH(new_options, jsch) + def open_session(host: String, port: Int = SSH.default_port, user: String = ""): SSH.Session = { val session = jsch.getSession(if (user == "") null else user, host, port) diff -r 38c407446400 -r 54479f7b6685 src/Pure/ROOT.scala --- a/src/Pure/ROOT.scala Wed Oct 12 20:38:47 2016 +0200 +++ b/src/Pure/ROOT.scala Wed Oct 12 22:06:06 2016 +0200 @@ -15,6 +15,7 @@ val space_explode = Library.space_explode _ val split_lines = Library.split_lines _ val cat_lines = Library.cat_lines _ + val terminate_lines = Library.terminate_lines _ val quote = Library.quote _ val commas = Library.commas _ val commas_quote = Library.commas_quote _ diff -r 38c407446400 -r 54479f7b6685 src/Pure/System/isabelle_system.scala --- a/src/Pure/System/isabelle_system.scala Wed Oct 12 20:38:47 2016 +0200 +++ b/src/Pure/System/isabelle_system.scala Wed Oct 12 22:06:06 2016 +0200 @@ -317,9 +317,6 @@ def pdf_viewer(arg: Path): Unit = bash("exec \"$PDF_VIEWER\" " + File.bash_path(arg) + " >/dev/null 2>/dev/null &") - def hg(cmd_line: String, cwd: JFile = null): Process_Result = - bash("\"${HG:-hg}\" " + cmd_line, cwd = cwd) - /** Isabelle resources **/ diff -r 38c407446400 -r 54479f7b6685 src/Pure/Tools/build.scala --- a/src/Pure/Tools/build.scala Wed Oct 12 20:38:47 2016 +0200 +++ b/src/Pure/Tools/build.scala Wed Oct 12 22:06:06 2016 +0200 @@ -562,7 +562,7 @@ yield Sessions.write_heap_digest(path) File.write_gzip(store.output_dir + Sessions.log_gz(name), - Library.terminate_lines( + terminate_lines( session_sources_stamp(name) :: input_heaps.map(INPUT_HEAP + _) ::: heap_stamp.toList.map(OUTPUT_HEAP + _) ::: @@ -575,7 +575,7 @@ (store.output_dir + Sessions.log_gz(name)).file.delete File.write(store.output_dir + Sessions.log(name), - Library.terminate_lines(process_result.out_lines)) + terminate_lines(process_result.out_lines)) progress.echo(name + " FAILED") if (!process_result.interrupted) progress.echo(process_result_tail.out)