more scalable write_messages via db.execute_batch_statement;
authorwenzelm
Sat, 19 Aug 2023 14:45:57 +0200
changeset 78539 15f55d02ba67
parent 78538 56e8458ba262
child 78540 a6d079e0575d
more scalable write_messages via db.execute_batch_statement;
src/Pure/System/progress.scala
--- a/src/Pure/System/progress.scala	Sat Aug 19 14:34:36 2023 +0200
+++ b/src/Pure/System/progress.scala	Sat Aug 19 14:45:57 2023 +0200
@@ -167,15 +167,16 @@
     def write_messages(
       db: SQL.Database,
       context: Long,
-      serial: Long,
-      message: Message
+      messages: List[(Long, Message)]
     ): Unit = {
-      db.execute_statement(Messages.table.insert(), body = { stmt =>
-        stmt.long(1) = context
-        stmt.long(2) = serial
-        stmt.int(3) = message.kind.id
-        stmt.string(4) = message.text
-        stmt.bool(5) = message.verbose
+      db.execute_batch_statement(Messages.table.insert(), batch = { stmt =>
+        for ((serial, message) <- messages.iterator) yield {
+          stmt.long(1) = context
+          stmt.long(2) = serial
+          stmt.int(3) = message.kind.id
+          stmt.string(4) = message.text
+          stmt.bool(5) = message.verbose
+        }
       })
     }
   }
@@ -324,14 +325,13 @@
     _consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")(
       bulk = _ => true, timeout = timeout,
       consume = { bulk_output =>
-        sync_database {
-          _serial = _serial max Progress.private_data.read_messages_serial(db, _context)
-          val serial0 = _serial
+        val serial = _serial
+        val serial_db = Progress.private_data.read_messages_serial(db, _context)
 
+        _serial = _serial max serial_db
+
+        if (bulk_output.nonEmpty) {
           for (out <- bulk_output) {
-            _serial += 1L
-            Progress.private_data.write_messages(db, _context, _serial, out.message)
-
             out match {
               case message: Progress.Message =>
                 if (do_output(message)) base_progress.output(message)
@@ -339,10 +339,20 @@
             }
           }
 
-          if (_serial != serial0) Progress.private_data.update_agent(db, _agent_uuid, _serial)
+          val messages =
+            for ((out, i) <- bulk_output.zipWithIndex)
+              yield (_serial + i + 1) -> out.message
+
+          Progress.private_data.write_messages(db, _context, messages)
 
-          (bulk_output.map(_ => Exn.Res(())), true)
+          _serial = messages.last._1
         }
+
+        if (_serial != serial_db) {
+          Progress.private_data.update_agent(db, _agent_uuid, _serial)
+        }
+
+        (bulk_output.map(_ => Exn.Res(())), true)
       })
   }