# HG changeset patch # User wenzelm # Date 1708809342 -3600 # Node ID aa77ebb2dc16ce4297d4384d7d2bb31ae47fe759 # Parent 80cb54976c1c2c6cabb9fbd207524989026070bf# Parent 5938158733bb8374d4eadd48adc0d3a24dfa3e39 merged diff -r 80cb54976c1c -r aa77ebb2dc16 src/Pure/Build/build.scala --- a/src/Pure/Build/build.scala Sat Feb 24 11:29:30 2024 +0100 +++ b/src/Pure/Build/build.scala Sat Feb 24 22:15:42 2024 +0100 @@ -39,7 +39,7 @@ fresh_build: Boolean = false, no_build: Boolean = false, session_setup: (String, Session) => Unit = (_, _) => (), - build_uuid: String = UUID.random().toString, + build_uuid: String = UUID.random_string(), jobs: Int = 0, master: Boolean = false ) { @@ -186,108 +186,105 @@ val build_options = store.options using(store.open_server()) { server => - using_optional(store.maybe_open_database_server(server = server)) { database_server => - - /* session selection and dependencies */ + /* session selection and dependencies */ - val full_sessions = - Sessions.load_structure(build_options, dirs = AFP.make_dirs(afp_root) ::: dirs, - select_dirs = select_dirs, infos = infos, augment_options = augment_options) - val full_sessions_selection = full_sessions.imports_selection(selection) + val full_sessions = + Sessions.load_structure(build_options, dirs = AFP.make_dirs(afp_root) ::: dirs, + select_dirs = select_dirs, infos = infos, augment_options = augment_options) + val full_sessions_selection = full_sessions.imports_selection(selection) - val build_deps = { - val deps0 = - Sessions.deps(full_sessions.selection(selection), progress = progress, - inlined_files = true, list_files = list_files, check_keywords = check_keywords - ).check_errors + val build_deps = { + val deps0 = + Sessions.deps(full_sessions.selection(selection), progress = progress, + inlined_files = true, list_files = list_files, check_keywords = check_keywords + ).check_errors - if (soft_build && !fresh_build) { - val outdated = - deps0.sessions_structure.build_topological_order.flatMap(name => - store.try_open_database(name, server = server) match { - case Some(db) => - using(db)(store.read_build(_, name)) match { - case Some(build) if build.ok => - val session_options = deps0.sessions_structure(name).options - val session_sources = deps0.sources_shasum(name) - if (Sessions.eq_sources(session_options, build.sources, session_sources)) { - None - } - else Some(name) - case _ => Some(name) - } - case None => Some(name) - }) + if (soft_build && !fresh_build) { + val outdated = + deps0.sessions_structure.build_topological_order.flatMap(name => + store.try_open_database(name, server = server) match { + case Some(db) => + using(db)(store.read_build(_, name)) match { + case Some(build) if build.ok => + val session_options = deps0.sessions_structure(name).options + val session_sources = deps0.sources_shasum(name) + if (Sessions.eq_sources(session_options, build.sources, session_sources)) { + None + } + else Some(name) + case _ => Some(name) + } + case None => Some(name) + }) - Sessions.deps(full_sessions.selection(Sessions.Selection(sessions = outdated)), - progress = progress, inlined_files = true).check_errors - } - else deps0 + Sessions.deps(full_sessions.selection(Sessions.Selection(sessions = outdated)), + progress = progress, inlined_files = true).check_errors } + else deps0 + } - /* check unknown files */ + /* check unknown files */ - if (check_unknown_files) { - val source_files = - (for { - (_, base) <- build_deps.session_bases.iterator - (path, _) <- base.session_sources.iterator - } yield path).toList - Mercurial.check_files(source_files)._2 match { - case Nil => - case unknown_files => - progress.echo_warning( - "Unknown files (not part of the underlying Mercurial repository):" + - unknown_files.map(File.standard_path).sorted.mkString("\n ", "\n ", "")) - } + if (check_unknown_files) { + val source_files = + (for { + (_, base) <- build_deps.session_bases.iterator + (path, _) <- base.session_sources.iterator + } yield path).toList + Mercurial.check_files(source_files)._2 match { + case Nil => + case unknown_files => + progress.echo_warning( + "Unknown files (not part of the underlying Mercurial repository):" + + unknown_files.map(File.standard_path).sorted.mkString("\n ", "\n ", "")) } + } - /* build process and results */ + /* build process and results */ - val clean_sessions = - if (clean_build) full_sessions.imports_descendants(full_sessions_selection) else Nil + val clean_sessions = + if (clean_build) full_sessions.imports_descendants(full_sessions_selection) else Nil - val build_context = - Context(store, build_deps, engine = engine, afp_root = afp_root, - build_hosts = build_hosts, hostname = hostname(build_options), - clean_sessions = clean_sessions, build_heap = build_heap, - numa_shuffling = numa_shuffling, fresh_build = fresh_build, - no_build = no_build, session_setup = session_setup, - jobs = max_jobs.getOrElse(if (build_hosts.nonEmpty) 0 else 1), master = true) + val build_context = + Context(store, build_deps, engine = engine, afp_root = afp_root, + build_hosts = build_hosts, hostname = hostname(build_options), + clean_sessions = clean_sessions, build_heap = build_heap, + numa_shuffling = numa_shuffling, fresh_build = fresh_build, + no_build = no_build, session_setup = session_setup, + jobs = max_jobs.getOrElse(if (build_hosts.nonEmpty) 0 else 1), master = true) - val results = engine.run_build_process(build_context, progress, server) + val results = engine.run_build_process(build_context, progress, server) - if (export_files) { - for (name <- full_sessions_selection.iterator if results(name).ok) { - val info = results.info(name) - if (info.export_files.nonEmpty) { - progress.echo("Exporting " + info.name + " ...") - for ((dir, prune, pats) <- info.export_files) { - Export.export_files(store, name, info.dir + dir, - progress = if (progress.verbose) progress else new Progress, - export_prune = prune, - export_patterns = pats) - } + if (export_files) { + for (name <- full_sessions_selection.iterator if results(name).ok) { + val info = results.info(name) + if (info.export_files.nonEmpty) { + progress.echo("Exporting " + info.name + " ...") + for ((dir, prune, pats) <- info.export_files) { + Export.export_files(store, name, info.dir + dir, + progress = if (progress.verbose) progress else new Progress, + export_prune = prune, + export_patterns = pats) } } } + } - val presentation_sessions = - results.sessions_ok.filter(name => browser_info.enabled(results.info(name))) - if (presentation_sessions.nonEmpty && !progress.stopped) { - Browser_Info.build(browser_info, results.store, results.deps, presentation_sessions, - progress = progress, server = server) - } + val presentation_sessions = + results.sessions_ok.filter(name => browser_info.enabled(results.info(name))) + if (presentation_sessions.nonEmpty && !progress.stopped) { + Browser_Info.build(browser_info, results.store, results.deps, presentation_sessions, + progress = progress, server = server) + } - if (results.unfinished.nonEmpty && (progress.verbose || !no_build)) { - progress.echo("Unfinished session(s): " + commas(results.unfinished)) - } + if (results.unfinished.nonEmpty && (progress.verbose || !no_build)) { + progress.echo("Unfinished session(s): " + commas(results.unfinished)) + } - results - } + results } } diff -r 80cb54976c1c -r aa77ebb2dc16 src/Pure/Build/build_job.scala --- a/src/Pure/Build/build_job.scala Sat Feb 24 11:29:30 2024 +0100 +++ b/src/Pure/Build/build_job.scala Sat Feb 24 22:15:42 2024 +0100 @@ -118,11 +118,9 @@ Future.thread("build", uninterruptible = true) { val info = session_background.sessions_structure(session_name) val options = Host.node_options(info.options, node_info) - val store = build_context.store using_optional(store.maybe_open_database_server(server = server)) { database_server => - store.clean_output(database_server, session_name, session_init = true) val session_sources = diff -r 80cb54976c1c -r aa77ebb2dc16 src/Pure/Build/build_process.scala --- a/src/Pure/Build/build_process.scala Sat Feb 24 11:29:30 2024 +0100 +++ b/src/Pure/Build/build_process.scala Sat Feb 24 22:15:42 2024 +0100 @@ -898,7 +898,7 @@ catch { case exn: Throwable => close(); throw exn } protected val (progress, worker_uuid) = synchronized { - if (_build_database.isEmpty) (build_progress, UUID.random().toString) + if (_build_database.isEmpty) (build_progress, UUID.random_string()) else { try { val db = store.open_build_database(Progress.private_data.database, server = server) diff -r 80cb54976c1c -r aa77ebb2dc16 src/Pure/Concurrent/isabelle_thread.scala --- a/src/Pure/Concurrent/isabelle_thread.scala Sat Feb 24 11:29:30 2024 +0100 +++ b/src/Pure/Concurrent/isabelle_thread.scala Sat Feb 24 22:15:42 2024 +0100 @@ -65,7 +65,7 @@ val thread = create(main, name = name, group = group, pri = pri, daemon = daemon, inherit_locals = inherit_locals) - thread.start + thread.start() thread } @@ -89,10 +89,10 @@ new Interrupt_Handler(handle, name) val interruptible: Interrupt_Handler = - Interrupt_Handler(_.raise_interrupt, name = "interruptible") + Interrupt_Handler(_.raise_interrupt(), name = "interruptible") val uninterruptible: Interrupt_Handler = - Interrupt_Handler(_.postpone_interrupt, name = "uninterruptible") + Interrupt_Handler(_.postpone_interrupt(), name = "uninterruptible") } class Interrupt_Handler private(handle: Isabelle_Thread => Unit, name: String) @@ -132,7 +132,7 @@ thread.setPriority(pri) thread.setDaemon(daemon) - override def run: Unit = main.run() + override def run(): Unit = main.run() def is_self: Boolean = Thread.currentThread == thread @@ -142,19 +142,19 @@ // synchronized, with concurrent changes private var interrupt_postponed: Boolean = false - def clear_interrupt: Boolean = synchronized { + def clear_interrupt(): Boolean = synchronized { val was_interrupted = isInterrupted || interrupt_postponed Exn.Interrupt.dispose() interrupt_postponed = false was_interrupted } - def raise_interrupt: Unit = synchronized { + def raise_interrupt(): Unit = synchronized { interrupt_postponed = false super.interrupt() } - def postpone_interrupt: Unit = synchronized { + def postpone_interrupt(): Unit = synchronized { interrupt_postponed = true Exn.Interrupt.dispose() } @@ -175,12 +175,12 @@ val old_handler = handler handler = new_handler try { - if (clear_interrupt) interrupt() + if (clear_interrupt()) interrupt() body } finally { handler = old_handler - if (clear_interrupt) interrupt() + if (clear_interrupt()) interrupt() } } } diff -r 80cb54976c1c -r aa77ebb2dc16 src/Pure/General/http.scala --- a/src/Pure/General/http.scala Sat Feb 24 11:29:30 2024 +0100 +++ b/src/Pure/General/http.scala Sat Feb 24 22:15:42 2024 +0100 @@ -114,7 +114,7 @@ connection.setRequestMethod("POST") connection.setDoOutput(true) - val boundary = UUID.random().toString + val boundary = UUID.random_string() connection.setRequestProperty( "Content-Type", "multipart/form-data; boundary=" + quote(boundary)) @@ -277,7 +277,7 @@ def server( port: Int = 0, - name: String = UUID.random().toString, + name: String = UUID.random_string(), services: List[Service] = isabelle_services ): Server = { val http_server = HttpServer.create(new InetSocketAddress(isabelle.Server.localhost, port), 0) diff -r 80cb54976c1c -r aa77ebb2dc16 src/Pure/General/sql.scala --- a/src/Pure/General/sql.scala Sat Feb 24 11:29:30 2024 +0100 +++ b/src/Pure/General/sql.scala Sat Feb 24 22:15:42 2024 +0100 @@ -14,7 +14,7 @@ import org.sqlite.SQLiteConfig import org.sqlite.jdbc4.JDBC4Connection -import org.postgresql.{PGConnection, PGNotification} +import org.postgresql.PGConnection import scala.collection.mutable @@ -383,6 +383,11 @@ } + /* notifications: IPC via database server */ + + sealed case class Notification(name: String, parameter: String) + + /* database */ trait Database extends AutoCloseable { @@ -581,6 +586,14 @@ execute_statement("CREATE VIEW " + table + " AS " + { table.query; table.body }) } } + + + /* notifications (PostgreSQL only) */ + + def listen(name: String): Unit = () + def unlisten(name: String = "*"): Unit = () + def send(name: String, parameter: String = ""): Unit = () + def receive(filter: Notification => Boolean): List[Notification] = Nil } @@ -664,7 +677,8 @@ password: String, database: String = "", server: SSH.Server = default_server, - server_close: Boolean = false + server_close: Boolean = false, + receiver_delay: Time = Time.seconds(0.5) ): Database = { init_jdbc @@ -680,7 +694,7 @@ if_proper(ssh, " via ssh " + quote(ssh.get.toString)) val connection = DriverManager.getConnection(url, user, password) - val db = new Database(connection, print, server, server_close) + val db = new Database(connection, print, server, server_close, receiver_delay) try { db.execute_statement("SET standard_conforming_strings = on") } catch { case exn: Throwable => db.close(); throw exn } @@ -736,7 +750,8 @@ val connection: Connection, print: String, server: SSH.Server, - server_close: Boolean + server_close: Boolean, + receiver_delay: Time ) extends SQL.Database { override def toString: String = print @@ -771,24 +786,86 @@ /* notifications: IPC via database server */ - // see https://www.postgresql.org/docs/current/sql-notify.html + /* + - see https://www.postgresql.org/docs/current/sql-notify.html + - self-notifications and repeated notifications are suppressed + - notifications are sorted by local system time (nano seconds) + */ - def listen(name: String): Unit = - execute_statement("LISTEN " + SQL.ident(name)) + private var _receiver_buffer: Option[Map[SQL.Notification, Long]] = None - def unlisten(name: String = "*"): Unit = - execute_statement("UNLISTEN " + (if (name == "*") name else SQL.ident(name))) + private lazy val _receiver_thread = + Isabelle_Thread.fork(name = "PostgreSQL.receiver", daemon = true, uninterruptible = true) { + val conn = the_postgresql_connection + val self_pid = conn.getBackendPID - def notify(name: String, payload: String = ""): Unit = - execute_statement("NOTIFY " + SQL.ident(name) + if_proper(payload, ", " + SQL.string(payload))) - - def get_notifications(): List[PGNotification] = - the_postgresql_connection.getNotifications() match { - case null => Nil - case array => array.toList + try { + while (true) { + Isabelle_Thread.interruptible { receiver_delay.sleep() } + Option(conn.getNotifications()) match { + case Some(array) if array.nonEmpty => + synchronized { + var received = _receiver_buffer.getOrElse(Map.empty) + for (a <- array.iterator if a.getPID != self_pid) { + val msg = SQL.Notification(a.getName, a.getParameter) + if (!received.isDefinedAt(msg)) { + val stamp = System.nanoTime() + received = received + (msg -> stamp) + } + } + _receiver_buffer = Some(received) + } + case _ => + } + } + } + catch { case Exn.Interrupt() => } } + private def receiver_shutdown(): Unit = synchronized { + if (_receiver_buffer.isDefined) { + _receiver_thread.interrupt() + Some(_receiver_thread) + } + else None + }.foreach(_.join()) - override def close(): Unit = { super.close(); if (server_close) server.close() } + private def synchronized_receiver[A](body: => A): A = synchronized { + if (_receiver_buffer.isEmpty) { + _receiver_buffer = Some(Map.empty) + _receiver_thread + } + body + } + + override def listen(name: String): Unit = synchronized_receiver { + execute_statement("LISTEN " + SQL.ident(name)) + } + + override def unlisten(name: String = "*"): Unit = synchronized_receiver { + execute_statement("UNLISTEN " + (if (name == "*") name else SQL.ident(name))) + } + + override def send(name: String, parameter: String = ""): Unit = synchronized_receiver { + execute_statement( + "NOTIFY " + SQL.ident(name) + if_proper(parameter, ", " + SQL.string(parameter))) + } + + override def receive(filter: SQL.Notification => Boolean = _ => true): List[SQL.Notification] = + synchronized { + val received = _receiver_buffer.getOrElse(Map.empty) + val filtered = received.keysIterator.filter(filter).toList + if (_receiver_buffer.isDefined && filtered.nonEmpty) { + _receiver_buffer = Some(received -- filtered) + filtered.map(msg => msg -> received(msg)).sortBy(_._2).map(_._1) + } + else Nil + } + + override def close(): Unit = { + receiver_shutdown() + super.close() + if (server_close) server.close() + } } } diff -r 80cb54976c1c -r aa77ebb2dc16 src/Pure/General/uuid.scala --- a/src/Pure/General/uuid.scala Sat Feb 24 11:29:30 2024 +0100 +++ b/src/Pure/General/uuid.scala Sat Feb 24 22:15:42 2024 +0100 @@ -11,6 +11,7 @@ type T = java.util.UUID def random(): T = java.util.UUID.randomUUID() + def random_string(): String = random().toString def unapply(s: String): Option[T] = try { Some(java.util.UUID.fromString(s)) } diff -r 80cb54976c1c -r aa77ebb2dc16 src/Pure/ML/ml_heap.scala --- a/src/Pure/ML/ml_heap.scala Sat Feb 24 11:29:30 2024 +0100 +++ b/src/Pure/ML/ml_heap.scala Sat Feb 24 22:15:42 2024 +0100 @@ -135,7 +135,13 @@ } } - def init_entry(db: SQL.Database, name: String, log_db: Option[Log_DB] = None): Unit = { + def init_entry( + db: SQL.Database, + name: String, + heap_size: Long, + heap_digest: Option[SHA1.Digest], + log_db: Option[Log_DB] + ): Unit = { clean_entry(db, name) for (table <- List(Size.table, Slices_Size.table)) { db.create_view(table) @@ -143,14 +149,14 @@ db.execute_statement(Base.table.insert(), body = { stmt => stmt.string(1) = name - stmt.long(2) = None - stmt.string(3) = None + stmt.long(2) = heap_size + stmt.string(3) = heap_digest.map(_.toString) stmt.string(4) = log_db.map(_.uuid) stmt.bytes(5) = log_db.map(_.content) }) } - def finish_entry( + def update_entry( db: SQL.Database, name: String, heap_size: Long, @@ -204,24 +210,37 @@ val slice_size = slice.bytes max Space.MiB(1).bytes val slices = (heap_size.toDouble / slice_size.toDouble).ceil.toInt + val step = if (slices == 0) 0L else (heap_size.toDouble / slices.toDouble).ceil.toLong + + def slice_content(i: Int): Bytes = { + val j = i + 1 + val offset = step * i + val limit = if (j < slices) step * j else heap_size + Bytes.read_file(session.the_heap, offset = offset, limit = limit) + .compress(cache = cache) + } try { - if (slices == 0 && log_db.isDefined) progress.echo("Storing " + session.log_db_name + " ...") + if (slices > 0) progress.echo("Storing " + session.name + " ...") - private_data.transaction_lock(db, create = true, label = "ML_Heap.store1") { - private_data.init_entry(db, session.name, log_db = if (slices == 0) log_db else None) + // init entry: slice 0 + initial log_db + { + val (heap_size0, heap_digest0) = if (slices > 1) (0L, None) else (heap_size, heap_digest) + val log_db0 = if (slices <= 1) log_db else None + val content0 = if (slices > 0) Some(slice_content(0)) else None + + if (log_db0.isDefined) progress.echo("Storing " + session.log_db_name + " ...") + + private_data.transaction_lock(db, create = true, label = "ML_Heap.store1") { + private_data.init_entry(db, session.name, heap_size0, heap_digest0, log_db0) + for (content <- content0) private_data.write_slice(db, session.name, 0, content) + } } - if (slices > 0) { - progress.echo("Storing " + session.name + " ...") - val step = (heap_size.toDouble / slices.toDouble).ceil.toLong - for (i <- 0 until slices) { - val j = i + 1 - val offset = step * i - val limit = if (j < slices) step * j else heap_size - val content = - Bytes.read_file(session.the_heap, offset = offset, limit = limit) - .compress(cache = cache) + // update entry: slice 1 ... + final log_db + if (slices > 1) { + for (i <- 1 until slices) { + val content = slice_content(i) private_data.transaction_lock(db, label = "ML_Heap.store2") { private_data.write_slice(db, session.name, i, content) } @@ -230,7 +249,7 @@ if (log_db.isDefined) progress.echo("Storing " + session.log_db_name + " ...") private_data.transaction_lock(db, label = "ML_Heap.store3") { - private_data.finish_entry(db, session.name, heap_size, heap_digest, log_db) + private_data.update_entry(db, session.name, heap_size, heap_digest, log_db) } } } diff -r 80cb54976c1c -r aa77ebb2dc16 src/Pure/System/host.scala --- a/src/Pure/System/host.scala Sat Feb 24 11:29:30 2024 +0100 +++ b/src/Pure/System/host.scala Sat Feb 24 22:15:42 2024 +0100 @@ -67,7 +67,8 @@ def node_options(options: Options, node: Node_Info): Options = { val threads_options = - if (node.rel_cpus.isEmpty) options else options.int.update("threads", node.rel_cpus.length) + if (node.rel_cpus.isEmpty) options + else options.int.update("threads", node.rel_cpus.length) node.numa_node match { case None if node.rel_cpus.isEmpty => diff -r 80cb54976c1c -r aa77ebb2dc16 src/Pure/System/progress.scala --- a/src/Pure/System/progress.scala Sat Feb 24 11:29:30 2024 +0100 +++ b/src/Pure/System/progress.scala Sat Feb 24 22:15:42 2024 +0100 @@ -274,7 +274,7 @@ output_stopped: Boolean = false, kind: String = "progress", hostname: String = Isabelle_System.hostname(), - context_uuid: String = UUID.random().toString, + context_uuid: String = UUID.random_string(), timeout: Option[Time] = None) extends Progress { database_progress => @@ -299,7 +299,7 @@ Progress.private_data.read_progress_context(db, context_uuid) match { case Some(context) => _context = context - _agent_uuid = UUID.random().toString + _agent_uuid = UUID.random_string() case None => _context = Progress.private_data.next_progress_context(db) _agent_uuid = context_uuid diff -r 80cb54976c1c -r aa77ebb2dc16 src/Pure/System/system_channel.scala --- a/src/Pure/System/system_channel.scala Sat Feb 24 11:29:30 2024 +0100 +++ b/src/Pure/System/system_channel.scala Sat Feb 24 22:15:42 2024 +0100 @@ -46,7 +46,7 @@ protected val server: ServerSocketChannel = ServerSocketChannel.open(protocol_family) def address: String - lazy val password: String = UUID.random().toString + lazy val password: String = UUID.random_string() override def toString: String = address diff -r 80cb54976c1c -r aa77ebb2dc16 src/Pure/Tools/server.scala --- a/src/Pure/Tools/server.scala Sat Feb 24 11:29:30 2024 +0100 +++ b/src/Pure/Tools/server.scala Sat Feb 24 22:15:42 2024 +0100 @@ -121,7 +121,7 @@ val socket: ServerSocket = new ServerSocket(port0, 50, Server.localhost) def port: Int = socket.getLocalPort def address: String = print_address(port) - val password: String = UUID.random().toString + val password: String = UUID.random_string() override def toString: String = print(port, password) diff -r 80cb54976c1c -r aa77ebb2dc16 src/Tools/VSCode/src/component_vscode_extension.scala --- a/src/Tools/VSCode/src/component_vscode_extension.scala Sat Feb 24 11:29:30 2024 +0100 +++ b/src/Tools/VSCode/src/component_vscode_extension.scala Sat Feb 24 22:15:42 2024 +0100 @@ -48,7 +48,7 @@ "name": "Isabelle", "scopeName": "source.isabelle", "fileTypes": ["thy"], - "uuid": """ + JSON.Format(UUID.random().toString) + """, + "uuid": """ + JSON.Format(UUID.random_string()) + """, "repository": { "comment": { "patterns": [ diff -r 80cb54976c1c -r aa77ebb2dc16 src/Tools/jEdit/src/main_plugin.scala --- a/src/Tools/jEdit/src/main_plugin.scala Sat Feb 24 11:29:30 2024 +0100 +++ b/src/Tools/jEdit/src/main_plugin.scala Sat Feb 24 22:15:42 2024 +0100 @@ -390,7 +390,7 @@ /* HTTP server */ - val http_root: String = "/" + UUID.random().toString + val http_root: String = "/" + UUID.random_string() val http_server: HTTP.Server = HTTP.server(services = Document_Model.Preview_Service :: HTTP.isabelle_services)