author | wenzelm |
Thu, 06 Jun 2024 12:53:02 +0200 | |
changeset 80268 | 979f3893aa37 |
parent 79887 | 17220dc05991 |
permissions | -rw-r--r-- |
79873 | 1 |
/* Title: Pure/Build/database_progress.scala |
2 |
Author: Makarius |
|
3 |
||
4 |
System progress backed by shared database: local SQLite or remote PostgreSQL. |
|
5 |
*/ |
|
6 |
||
7 |
package isabelle |
|
8 |
||
9 |
||
10 |
import scala.collection.immutable.SortedMap |
|
11 |
||
12 |
||
13 |
object Database_Progress { |
|
14 |
/* SQL data model */ |
|
15 |
||
16 |
object private_data extends SQL.Data("isabelle_progress") { |
|
17 |
val database: Path = Path.explode("$ISABELLE_HOME_USER/progress.db") |
|
18 |
||
19 |
override lazy val tables: SQL.Tables = |
|
20 |
SQL.Tables(Base.table, Agents.table, Messages.table) |
|
21 |
||
22 |
object Base { |
|
23 |
val context_uuid = SQL.Column.string("context_uuid").make_primary_key |
|
24 |
val context = SQL.Column.long("context").make_primary_key |
|
25 |
val stopped = SQL.Column.bool("stopped") |
|
26 |
||
27 |
val table = make_table(List(context_uuid, context, stopped)) |
|
28 |
} |
|
29 |
||
30 |
object Agents { |
|
31 |
val agent_uuid = SQL.Column.string("agent_uuid").make_primary_key |
|
32 |
val context_uuid = SQL.Column.string("context_uuid").make_primary_key |
|
33 |
val kind = SQL.Column.string("kind") |
|
34 |
val hostname = SQL.Column.string("hostname") |
|
35 |
val java_pid = SQL.Column.long("java_pid") |
|
36 |
val java_start = SQL.Column.date("java_start") |
|
37 |
val start = SQL.Column.date("start") |
|
38 |
val stamp = SQL.Column.date("stamp") |
|
39 |
val stop = SQL.Column.date("stop") |
|
40 |
val seen = SQL.Column.long("seen") |
|
41 |
||
42 |
val table = |
|
43 |
make_table( |
|
44 |
List(agent_uuid, context_uuid, kind, hostname, java_pid, java_start, |
|
45 |
start, stamp, stop, seen), |
|
46 |
name = "agents") |
|
47 |
} |
|
48 |
||
49 |
object Messages { |
|
50 |
type T = SortedMap[Long, Progress.Message] |
|
51 |
val empty: T = SortedMap.empty |
|
52 |
||
53 |
val context = SQL.Column.long("context").make_primary_key |
|
54 |
val serial = SQL.Column.long("serial").make_primary_key |
|
55 |
val kind = SQL.Column.int("kind") |
|
56 |
val text = SQL.Column.string("text") |
|
57 |
val verbose = SQL.Column.bool("verbose") |
|
58 |
||
59 |
val table = make_table(List(context, serial, kind, text, verbose), name = "messages") |
|
60 |
} |
|
61 |
||
62 |
val channel: String = Base.table.name |
|
63 |
val channel_ping: SQL.Notification = SQL.Notification(channel, payload = "ping") |
|
64 |
val channel_output: SQL.Notification = SQL.Notification(channel, payload = "output") |
|
65 |
||
66 |
def read_progress_context(db: SQL.Database, context_uuid: String): Option[Long] = |
|
67 |
db.execute_query_statementO( |
|
68 |
Base.table.select(List(Base.context), |
|
69 |
sql = Base.context_uuid.where_equal(context_uuid)), _.long(Base.context)) |
|
70 |
||
71 |
def next_progress_context(db: SQL.Database): Long = |
|
72 |
db.execute_query_statementO( |
|
73 |
Base.table.select(List(Base.context.max)), _.long(Base.context)).getOrElse(0L) + 1L |
|
74 |
||
75 |
def read_progress_stopped(db: SQL.Database, context: Long): Boolean = |
|
76 |
db.execute_query_statementO( |
|
77 |
Base.table.select(List(Base.stopped), sql = Base.context.where_equal(context)), |
|
78 |
_.bool(Base.stopped) |
|
79 |
).getOrElse(false) |
|
80 |
||
81 |
def write_progress_stopped(db: SQL.Database, context: Long, stopped: Boolean): Unit = |
|
82 |
db.execute_statement( |
|
83 |
Base.table.update(List(Base.stopped), sql = Base.context.where_equal(context)), |
|
84 |
body = { stmt => stmt.bool(1) = stopped }) |
|
85 |
||
86 |
def update_agent( |
|
87 |
db: SQL.Database, |
|
88 |
agent_uuid: String, |
|
89 |
seen: Long, |
|
90 |
stop_now: Boolean = false |
|
91 |
): Unit = { |
|
92 |
val sql = Agents.agent_uuid.where_equal(agent_uuid) |
|
93 |
||
94 |
val stop = |
|
95 |
db.execute_query_statementO( |
|
96 |
Agents.table.select(List(Agents.stop), sql = sql), _.get_date(Agents.stop)).flatten |
|
97 |
||
98 |
db.execute_statement( |
|
99 |
Agents.table.update(List(Agents.stamp, Agents.stop, Agents.seen), sql = sql), |
|
100 |
body = { stmt => |
|
101 |
val now = db.now() |
|
102 |
stmt.date(1) = now |
|
103 |
stmt.date(2) = if (stop_now) Some(now) else stop |
|
104 |
stmt.long(3) = seen |
|
105 |
}) |
|
106 |
} |
|
107 |
||
108 |
def read_messages_serial(db: SQL.Database, context: Long): Long = |
|
109 |
db.execute_query_statementO( |
|
110 |
Messages.table.select( |
|
111 |
List(Messages.serial.max), sql = Base.context.where_equal(context)), |
|
112 |
_.long(Messages.serial) |
|
113 |
).getOrElse(0L) |
|
114 |
||
115 |
def read_messages(db: SQL.Database, context: Long, seen: Long = 0): Messages.T = |
|
116 |
db.execute_query_statement( |
|
117 |
Messages.table.select( |
|
118 |
List(Messages.serial, Messages.kind, Messages.text, Messages.verbose), |
|
119 |
sql = |
|
120 |
SQL.where_and( |
|
121 |
Messages.context.ident + " = " + context, |
|
122 |
if (seen <= 0) "" else Messages.serial.ident + " > " + seen)), |
|
123 |
SortedMap.from[Long, Progress.Message], |
|
124 |
{ res => |
|
125 |
val serial = res.long(Messages.serial) |
|
126 |
val kind = Progress.Kind.fromOrdinal(res.int(Messages.kind)) |
|
127 |
val text = res.string(Messages.text) |
|
128 |
val verbose = res.bool(Messages.verbose) |
|
129 |
serial -> Progress.Message(kind, text, verbose = verbose) |
|
130 |
} |
|
131 |
) |
|
132 |
||
133 |
def write_messages( |
|
134 |
db: SQL.Database, |
|
135 |
context: Long, |
|
136 |
messages: List[(Long, Progress.Message)] |
|
137 |
): Unit = { |
|
138 |
db.execute_batch_statement(Messages.table.insert(), batch = |
|
139 |
for ((serial, message) <- messages) yield { (stmt: SQL.Statement) => |
|
140 |
stmt.long(1) = context |
|
141 |
stmt.long(2) = serial |
|
142 |
stmt.int(3) = message.kind.ordinal |
|
143 |
stmt.string(4) = message.text |
|
144 |
stmt.bool(5) = message.verbose |
|
145 |
}) |
|
146 |
} |
|
147 |
} |
|
148 |
} |
|
149 |
||
150 |
class Database_Progress( |
|
151 |
db: SQL.Database, |
|
152 |
base_progress: Progress, |
|
153 |
input_messages: Boolean = false, |
|
154 |
kind: String = "progress", |
|
155 |
hostname: String = Isabelle_System.hostname(), |
|
156 |
context_uuid: String = UUID.random_string(), |
|
157 |
timeout: Option[Time] = None, |
|
158 |
tick_expire: Int = 50) |
|
159 |
extends Progress { |
|
160 |
database_progress => |
|
161 |
||
162 |
override def now(): Date = db.now() |
|
163 |
override val start: Date = now() |
|
164 |
||
165 |
if (UUID.unapply(context_uuid).isEmpty) { |
|
166 |
error("Bad Database_Progress.context_uuid: " + quote(context_uuid)) |
|
167 |
} |
|
168 |
||
169 |
private var _tick: Long = 0 |
|
170 |
private var _agent_uuid: String = "" |
|
171 |
private var _context: Long = -1 |
|
172 |
private var _serial: Long = 0 |
|
173 |
private var _consumer: Consumer_Thread[Progress.Output] = null |
|
174 |
||
175 |
def agent_uuid: String = synchronized { _agent_uuid } |
|
176 |
||
177 |
private def init(): Unit = synchronized { |
|
178 |
db.listen(Database_Progress.private_data.channel) |
|
179 |
Database_Progress.private_data.transaction_lock(db, |
|
180 |
create = true, |
|
181 |
label = "Database_Progress.init" |
|
182 |
) { |
|
183 |
Database_Progress.private_data.read_progress_context(db, context_uuid) match { |
|
184 |
case Some(context) => |
|
185 |
_context = context |
|
186 |
_agent_uuid = UUID.random_string() |
|
187 |
case None => |
|
188 |
_context = Database_Progress.private_data.next_progress_context(db) |
|
189 |
_agent_uuid = context_uuid |
|
190 |
db.execute_statement(Database_Progress.private_data.Base.table.insert(), { stmt => |
|
191 |
stmt.string(1) = context_uuid |
|
192 |
stmt.long(2) = _context |
|
193 |
stmt.bool(3) = false |
|
194 |
}) |
|
195 |
} |
|
196 |
db.execute_statement(Database_Progress.private_data.Agents.table.insert(), { stmt => |
|
197 |
val java = ProcessHandle.current() |
|
198 |
val java_pid = java.pid |
|
199 |
val java_start = Date.instant(java.info.startInstant.get) |
|
200 |
||
201 |
stmt.string(1) = _agent_uuid |
|
202 |
stmt.string(2) = context_uuid |
|
203 |
stmt.string(3) = kind |
|
204 |
stmt.string(4) = hostname |
|
205 |
stmt.long(5) = java_pid |
|
206 |
stmt.date(6) = java_start |
|
207 |
stmt.date(7) = start |
|
208 |
stmt.date(8) = start |
|
209 |
stmt.date(9) = None |
|
210 |
stmt.long(10) = 0L |
|
211 |
}) |
|
212 |
} |
|
213 |
if (context_uuid == _agent_uuid) db.vacuum(Database_Progress.private_data.tables.list) |
|
214 |
||
215 |
def consume(bulk_output: List[Progress.Output]): List[Exn.Result[Unit]] = { |
|
216 |
val expired = synchronized { _tick += 1; _tick % tick_expire == 0 } |
|
217 |
val received = db.receive(n => n.channel == Database_Progress.private_data.channel) |
|
218 |
val ok = |
|
79887
17220dc05991
revert most parts of 0e79fa88cab6: somewhat ambitious attempt to move towards "editing" builds via added/canceled workers;
wenzelm
parents:
79884
diff
changeset
|
219 |
bulk_output.nonEmpty || expired || base_progress.stopped || |
79873 | 220 |
received.isEmpty || |
221 |
received.get.contains(Database_Progress.private_data.channel_ping) || |
|
222 |
input_messages && received.get.contains(Database_Progress.private_data.channel_output) |
|
223 |
if (ok) { |
|
224 |
sync_database { |
|
225 |
if (bulk_output.nonEmpty) { |
|
226 |
for (out <- bulk_output) { |
|
227 |
out match { |
|
228 |
case message: Progress.Message => |
|
229 |
if (do_output(message)) base_progress.output(message) |
|
230 |
case theory: Progress.Theory => base_progress.theory(theory) |
|
231 |
} |
|
232 |
} |
|
233 |
||
234 |
val messages = |
|
235 |
for ((out, i) <- bulk_output.zipWithIndex) |
|
236 |
yield (_serial + i + 1) -> out.message |
|
237 |
||
238 |
Database_Progress.private_data.write_messages(db, _context, messages) |
|
239 |
_serial = messages.last._1 |
|
240 |
||
241 |
db.send(Database_Progress.private_data.channel_output) |
|
242 |
} |
|
243 |
bulk_output.map(_ => Exn.Res(())) |
|
244 |
} |
|
245 |
} |
|
246 |
else Nil |
|
247 |
} |
|
248 |
||
249 |
_consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")( |
|
79884 | 250 |
bulk = _ => true, |
251 |
timeout = timeout, |
|
79873 | 252 |
consume = { bulk_output => |
253 |
val results = |
|
254 |
if (bulk_output.isEmpty) consume(Nil) |
|
255 |
else bulk_output.grouped(200).toList.flatMap(consume) |
|
256 |
(results, true) }) |
|
257 |
} |
|
258 |
||
259 |
def close(): Unit = synchronized { |
|
260 |
if (_context > 0) { |
|
261 |
_consumer.shutdown() |
|
262 |
_consumer = null |
|
263 |
||
264 |
Database_Progress.private_data.transaction_lock(db, label = "Database_Progress.exit") { |
|
265 |
Database_Progress.private_data.update_agent(db, _agent_uuid, _serial, stop_now = true) |
|
266 |
} |
|
267 |
_context = 0 |
|
268 |
} |
|
269 |
db.close() |
|
270 |
} |
|
271 |
||
272 |
private def sync_context[A](body: => A): A = synchronized { |
|
273 |
if (_context < 0) throw new IllegalStateException("Database_Progress before init") |
|
274 |
if (_context == 0) throw new IllegalStateException("Database_Progress after exit") |
|
275 |
||
276 |
body |
|
277 |
} |
|
278 |
||
279 |
private def sync_database[A](body: => A): A = synchronized { |
|
280 |
Database_Progress.private_data.transaction_lock(db, label = "Database_Progress.sync_database") { |
|
79887
17220dc05991
revert most parts of 0e79fa88cab6: somewhat ambitious attempt to move towards "editing" builds via added/canceled workers;
wenzelm
parents:
79884
diff
changeset
|
281 |
val stopped_db = Database_Progress.private_data.read_progress_stopped(db, _context) |
79873 | 282 |
|
79887
17220dc05991
revert most parts of 0e79fa88cab6: somewhat ambitious attempt to move towards "editing" builds via added/canceled workers;
wenzelm
parents:
79884
diff
changeset
|
283 |
if (stopped_db && !base_progress.stopped) base_progress.stop() |
17220dc05991
revert most parts of 0e79fa88cab6: somewhat ambitious attempt to move towards "editing" builds via added/canceled workers;
wenzelm
parents:
79884
diff
changeset
|
284 |
if (!stopped_db && base_progress.stopped) { |
79873 | 285 |
Database_Progress.private_data.write_progress_stopped(db, _context, true) |
286 |
db.send(Database_Progress.private_data.channel_ping) |
|
287 |
} |
|
288 |
||
289 |
val serial0 = _serial |
|
290 |
if (input_messages) { |
|
291 |
val messages = Database_Progress.private_data.read_messages(db, _context, seen = _serial) |
|
292 |
for ((message_serial, message) <- messages) { |
|
293 |
if (base_progress.do_output(message)) base_progress.output(message) |
|
294 |
_serial = _serial max message_serial |
|
295 |
} |
|
296 |
} |
|
297 |
else { |
|
298 |
_serial = _serial max Database_Progress.private_data.read_messages_serial(db, _context) |
|
299 |
} |
|
300 |
||
301 |
val res = body |
|
302 |
||
303 |
if (_serial != serial0) Database_Progress.private_data.update_agent(db, _agent_uuid, _serial) |
|
304 |
||
305 |
res |
|
306 |
} |
|
307 |
} |
|
308 |
||
309 |
private def sync(): Unit = sync_database {} |
|
310 |
||
311 |
override def output(message: Progress.Message): Unit = sync_context { _consumer.send(message) } |
|
312 |
override def theory(theory: Progress.Theory): Unit = sync_context { _consumer.send(theory) } |
|
313 |
||
314 |
override def nodes_status(nodes_status: Document_Status.Nodes_Status): Unit = |
|
315 |
base_progress.nodes_status(nodes_status) |
|
316 |
||
317 |
override def verbose: Boolean = base_progress.verbose |
|
318 |
||
319 |
override def stop(): Unit = sync_context { base_progress.stop(); sync() } |
|
320 |
override def stopped: Boolean = sync_context { base_progress.stopped } |
|
321 |
||
322 |
override def toString: String = super.toString + ": database " + db |
|
323 |
||
324 |
init() |
|
325 |
sync() |
|
326 |
} |