# HG changeset patch # User wenzelm # Date 1692291700 -7200 # Node ID af37e1b4dce0c4db9881ad3b1c78f0d3ca0c8bf2 # Parent 879e1ba3868be0207d043762bd653431927d202d more scalable Database_Progress via asynchronous Consumer_Thread.fork_bulk; diff -r 879e1ba3868b -r af37e1b4dce0 src/Pure/System/progress.scala --- a/src/Pure/System/progress.scala Thu Aug 17 16:15:25 2023 +0200 +++ b/src/Pure/System/progress.scala Thu Aug 17 19:01:40 2023 +0200 @@ -139,12 +139,12 @@ }) } - def next_messages_serial(db: SQL.Database, context: Long): Long = + def read_messages_serial(db: SQL.Database, context: Long): Long = db.execute_query_statementO( Messages.table.select( List(Messages.serial.max), sql = Base.context.where_equal(context)), _.long(Messages.serial) - ).getOrElse(0L) + 1L + ).getOrElse(0L) def read_messages(db: SQL.Database, context: Long, seen: Long = 0): Messages.T = db.execute_query_statement( @@ -268,7 +268,8 @@ output_stopped: Boolean = false, kind: String = "progress", hostname: String = Isabelle_System.hostname(), - context_uuid: String = UUID.random().toString) + context_uuid: String = UUID.random().toString, + timeout: Option[Time] = None) extends Progress { database_progress => @@ -276,6 +277,7 @@ private var _context: Long = -1 private var _serial: Long = 0 private var _stopped_db: Boolean = false + private var _consumer: Consumer_Thread[Progress.Output] = null def agent_uuid: String = synchronized { _agent_uuid } @@ -313,10 +315,37 @@ }) } if (context_uuid == _agent_uuid) db.vacuum(Progress.private_data.tables.list) + + _consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")( + bulk = _ => true, timeout = timeout, + consume = { bulk_output => + sync_database { + _serial = _serial max Progress.private_data.read_messages_serial(db, _context) + val serial0 = _serial + + for (out <- bulk_output) { + _serial += 1L + Progress.private_data.write_messages(db, _context, _serial, out.message) + + out match { + case message: Progress.Message => + if (do_output(message)) base_progress.output(message) + case theory: Progress.Theory => base_progress.theory(theory) + } + } + + if (_serial != serial0) Progress.private_data.update_agent(db, _agent_uuid, _serial) + + (bulk_output.map(_ => Exn.Res(())), true) + } + }) } def exit(close: Boolean = false): Unit = synchronized { if (_context > 0) { + _consumer.shutdown() + _consumer = null + Progress.private_data.transaction_lock(db, label = "Database_Progress.exit") { Progress.private_data.update_agent(db, _agent_uuid, _serial, stop_now = true) } @@ -325,10 +354,14 @@ if (close) db.close() } - private def sync_database[A](body: => A): A = synchronized { + private def sync_context[A](body: => A): A = synchronized { if (_context < 0) throw new IllegalStateException("Database_Progress before init") if (_context == 0) throw new IllegalStateException("Database_Progress after exit") + body + } + + private def sync_database[A](body: => A): A = { Progress.private_data.transaction_lock(db, label = "Database_Progress.sync") { _stopped_db = Progress.private_data.read_progress_stopped(db, _context) @@ -348,32 +381,17 @@ } } - def sync(): Unit = sync_database {} - - private def output_database(out: Progress.Output): Unit = - sync_database { - _serial = _serial max Progress.private_data.next_messages_serial(db, _context) - - Progress.private_data.write_messages(db, _context, _serial, out.message) + private def sync(): Unit = sync_database {} - out match { - case message: Progress.Message => - if (do_output(message)) base_progress.output(message) - case theory: Progress.Theory => base_progress.theory(theory) - } - - Progress.private_data.update_agent(db, _agent_uuid, _serial) - } - - override def output(message: Progress.Message): Unit = output_database(message) - override def theory(theory: Progress.Theory): Unit = output_database(theory) + override def output(message: Progress.Message): Unit = sync_context { _consumer.send(message) } + override def theory(theory: Progress.Theory): Unit = sync_context { _consumer.send(theory) } override def nodes_status(nodes_status: Document_Status.Nodes_Status): Unit = base_progress.nodes_status(nodes_status) override def verbose: Boolean = base_progress.verbose - override def stop(): Unit = synchronized { base_progress.stop(); sync() } + override def stop(): Unit = sync_context { base_progress.stop(); sync() } override def stopped: Boolean = sync_database { base_progress.stopped } override def stopped_local: Boolean = sync_database { base_progress.stopped && !_stopped_db } diff -r 879e1ba3868b -r af37e1b4dce0 src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Thu Aug 17 16:15:25 2023 +0200 +++ b/src/Pure/Tools/build_process.scala Thu Aug 17 19:01:40 2023 +0200 @@ -885,7 +885,8 @@ output_stopped = build_context.master, hostname = hostname, context_uuid = build_uuid, - kind = "build_process") + kind = "build_process", + timeout = Some(build_delay)) (progress, progress.agent_uuid) } catch { case exn: Throwable => close(); throw exn } @@ -929,7 +930,6 @@ _build_database match { case None => body case Some(db) => - progress.asInstanceOf[Database_Progress].sync() Build_Process.private_data.transaction_lock(db, label = label) { _state = Build_Process.private_data.pull_database(db, worker_uuid, hostname, _state) val res = body