79873
|
1 |
/* Title: Pure/Build/database_progress.scala
|
|
2 |
Author: Makarius
|
|
3 |
|
|
4 |
System progress backed by shared database: local SQLite or remote PostgreSQL.
|
|
5 |
*/
|
|
6 |
|
|
7 |
package isabelle
|
|
8 |
|
|
9 |
|
|
10 |
import scala.collection.immutable.SortedMap
|
|
11 |
|
|
12 |
|
|
13 |
object Database_Progress {
|
|
14 |
/* SQL data model */
|
|
15 |
|
|
16 |
object private_data extends SQL.Data("isabelle_progress") {
|
|
17 |
val database: Path = Path.explode("$ISABELLE_HOME_USER/progress.db")
|
|
18 |
|
|
19 |
override lazy val tables: SQL.Tables =
|
|
20 |
SQL.Tables(Base.table, Agents.table, Messages.table)
|
|
21 |
|
|
22 |
object Base {
|
|
23 |
val context_uuid = SQL.Column.string("context_uuid").make_primary_key
|
|
24 |
val context = SQL.Column.long("context").make_primary_key
|
|
25 |
val stopped = SQL.Column.bool("stopped")
|
|
26 |
|
|
27 |
val table = make_table(List(context_uuid, context, stopped))
|
|
28 |
}
|
|
29 |
|
|
30 |
object Agents {
|
|
31 |
val agent_uuid = SQL.Column.string("agent_uuid").make_primary_key
|
|
32 |
val context_uuid = SQL.Column.string("context_uuid").make_primary_key
|
|
33 |
val kind = SQL.Column.string("kind")
|
|
34 |
val hostname = SQL.Column.string("hostname")
|
|
35 |
val java_pid = SQL.Column.long("java_pid")
|
|
36 |
val java_start = SQL.Column.date("java_start")
|
|
37 |
val start = SQL.Column.date("start")
|
|
38 |
val stamp = SQL.Column.date("stamp")
|
|
39 |
val stop = SQL.Column.date("stop")
|
|
40 |
val seen = SQL.Column.long("seen")
|
|
41 |
|
|
42 |
val table =
|
|
43 |
make_table(
|
|
44 |
List(agent_uuid, context_uuid, kind, hostname, java_pid, java_start,
|
|
45 |
start, stamp, stop, seen),
|
|
46 |
name = "agents")
|
|
47 |
}
|
|
48 |
|
|
49 |
object Messages {
|
|
50 |
type T = SortedMap[Long, Progress.Message]
|
|
51 |
val empty: T = SortedMap.empty
|
|
52 |
|
|
53 |
val context = SQL.Column.long("context").make_primary_key
|
|
54 |
val serial = SQL.Column.long("serial").make_primary_key
|
|
55 |
val kind = SQL.Column.int("kind")
|
|
56 |
val text = SQL.Column.string("text")
|
|
57 |
val verbose = SQL.Column.bool("verbose")
|
|
58 |
|
|
59 |
val table = make_table(List(context, serial, kind, text, verbose), name = "messages")
|
|
60 |
}
|
|
61 |
|
|
62 |
val channel: String = Base.table.name
|
|
63 |
val channel_ping: SQL.Notification = SQL.Notification(channel, payload = "ping")
|
|
64 |
val channel_output: SQL.Notification = SQL.Notification(channel, payload = "output")
|
|
65 |
|
|
66 |
def read_progress_context(db: SQL.Database, context_uuid: String): Option[Long] =
|
|
67 |
db.execute_query_statementO(
|
|
68 |
Base.table.select(List(Base.context),
|
|
69 |
sql = Base.context_uuid.where_equal(context_uuid)), _.long(Base.context))
|
|
70 |
|
|
71 |
def next_progress_context(db: SQL.Database): Long =
|
|
72 |
db.execute_query_statementO(
|
|
73 |
Base.table.select(List(Base.context.max)), _.long(Base.context)).getOrElse(0L) + 1L
|
|
74 |
|
|
75 |
def read_progress_stopped(db: SQL.Database, context: Long): Boolean =
|
|
76 |
db.execute_query_statementO(
|
|
77 |
Base.table.select(List(Base.stopped), sql = Base.context.where_equal(context)),
|
|
78 |
_.bool(Base.stopped)
|
|
79 |
).getOrElse(false)
|
|
80 |
|
|
81 |
def write_progress_stopped(db: SQL.Database, context: Long, stopped: Boolean): Unit =
|
|
82 |
db.execute_statement(
|
|
83 |
Base.table.update(List(Base.stopped), sql = Base.context.where_equal(context)),
|
|
84 |
body = { stmt => stmt.bool(1) = stopped })
|
|
85 |
|
|
86 |
def update_agent(
|
|
87 |
db: SQL.Database,
|
|
88 |
agent_uuid: String,
|
|
89 |
seen: Long,
|
|
90 |
stop_now: Boolean = false
|
|
91 |
): Unit = {
|
|
92 |
val sql = Agents.agent_uuid.where_equal(agent_uuid)
|
|
93 |
|
|
94 |
val stop =
|
|
95 |
db.execute_query_statementO(
|
|
96 |
Agents.table.select(List(Agents.stop), sql = sql), _.get_date(Agents.stop)).flatten
|
|
97 |
|
|
98 |
db.execute_statement(
|
|
99 |
Agents.table.update(List(Agents.stamp, Agents.stop, Agents.seen), sql = sql),
|
|
100 |
body = { stmt =>
|
|
101 |
val now = db.now()
|
|
102 |
stmt.date(1) = now
|
|
103 |
stmt.date(2) = if (stop_now) Some(now) else stop
|
|
104 |
stmt.long(3) = seen
|
|
105 |
})
|
|
106 |
}
|
|
107 |
|
|
108 |
def read_messages_serial(db: SQL.Database, context: Long): Long =
|
|
109 |
db.execute_query_statementO(
|
|
110 |
Messages.table.select(
|
|
111 |
List(Messages.serial.max), sql = Base.context.where_equal(context)),
|
|
112 |
_.long(Messages.serial)
|
|
113 |
).getOrElse(0L)
|
|
114 |
|
|
115 |
def read_messages(db: SQL.Database, context: Long, seen: Long = 0): Messages.T =
|
|
116 |
db.execute_query_statement(
|
|
117 |
Messages.table.select(
|
|
118 |
List(Messages.serial, Messages.kind, Messages.text, Messages.verbose),
|
|
119 |
sql =
|
|
120 |
SQL.where_and(
|
|
121 |
Messages.context.ident + " = " + context,
|
|
122 |
if (seen <= 0) "" else Messages.serial.ident + " > " + seen)),
|
|
123 |
SortedMap.from[Long, Progress.Message],
|
|
124 |
{ res =>
|
|
125 |
val serial = res.long(Messages.serial)
|
|
126 |
val kind = Progress.Kind.fromOrdinal(res.int(Messages.kind))
|
|
127 |
val text = res.string(Messages.text)
|
|
128 |
val verbose = res.bool(Messages.verbose)
|
|
129 |
serial -> Progress.Message(kind, text, verbose = verbose)
|
|
130 |
}
|
|
131 |
)
|
|
132 |
|
|
133 |
def write_messages(
|
|
134 |
db: SQL.Database,
|
|
135 |
context: Long,
|
|
136 |
messages: List[(Long, Progress.Message)]
|
|
137 |
): Unit = {
|
|
138 |
db.execute_batch_statement(Messages.table.insert(), batch =
|
|
139 |
for ((serial, message) <- messages) yield { (stmt: SQL.Statement) =>
|
|
140 |
stmt.long(1) = context
|
|
141 |
stmt.long(2) = serial
|
|
142 |
stmt.int(3) = message.kind.ordinal
|
|
143 |
stmt.string(4) = message.text
|
|
144 |
stmt.bool(5) = message.verbose
|
|
145 |
})
|
|
146 |
}
|
|
147 |
}
|
|
148 |
}
|
|
149 |
|
|
150 |
class Database_Progress(
|
|
151 |
db: SQL.Database,
|
|
152 |
base_progress: Progress,
|
|
153 |
input_messages: Boolean = false,
|
|
154 |
output_stopped: Boolean = false,
|
|
155 |
kind: String = "progress",
|
|
156 |
hostname: String = Isabelle_System.hostname(),
|
|
157 |
context_uuid: String = UUID.random_string(),
|
|
158 |
timeout: Option[Time] = None,
|
|
159 |
tick_expire: Int = 50)
|
|
160 |
extends Progress {
|
|
161 |
database_progress =>
|
|
162 |
|
|
163 |
override def now(): Date = db.now()
|
|
164 |
override val start: Date = now()
|
|
165 |
|
|
166 |
if (UUID.unapply(context_uuid).isEmpty) {
|
|
167 |
error("Bad Database_Progress.context_uuid: " + quote(context_uuid))
|
|
168 |
}
|
|
169 |
|
|
170 |
private var _tick: Long = 0
|
|
171 |
private var _agent_uuid: String = ""
|
|
172 |
private var _context: Long = -1
|
|
173 |
private var _serial: Long = 0
|
|
174 |
private var _stopped_db: Boolean = false
|
|
175 |
private var _consumer: Consumer_Thread[Progress.Output] = null
|
|
176 |
|
|
177 |
def agent_uuid: String = synchronized { _agent_uuid }
|
|
178 |
|
|
179 |
private def init(): Unit = synchronized {
|
|
180 |
db.listen(Database_Progress.private_data.channel)
|
|
181 |
Database_Progress.private_data.transaction_lock(db,
|
|
182 |
create = true,
|
|
183 |
label = "Database_Progress.init"
|
|
184 |
) {
|
|
185 |
Database_Progress.private_data.read_progress_context(db, context_uuid) match {
|
|
186 |
case Some(context) =>
|
|
187 |
_context = context
|
|
188 |
_agent_uuid = UUID.random_string()
|
|
189 |
case None =>
|
|
190 |
_context = Database_Progress.private_data.next_progress_context(db)
|
|
191 |
_agent_uuid = context_uuid
|
|
192 |
db.execute_statement(Database_Progress.private_data.Base.table.insert(), { stmt =>
|
|
193 |
stmt.string(1) = context_uuid
|
|
194 |
stmt.long(2) = _context
|
|
195 |
stmt.bool(3) = false
|
|
196 |
})
|
|
197 |
}
|
|
198 |
db.execute_statement(Database_Progress.private_data.Agents.table.insert(), { stmt =>
|
|
199 |
val java = ProcessHandle.current()
|
|
200 |
val java_pid = java.pid
|
|
201 |
val java_start = Date.instant(java.info.startInstant.get)
|
|
202 |
|
|
203 |
stmt.string(1) = _agent_uuid
|
|
204 |
stmt.string(2) = context_uuid
|
|
205 |
stmt.string(3) = kind
|
|
206 |
stmt.string(4) = hostname
|
|
207 |
stmt.long(5) = java_pid
|
|
208 |
stmt.date(6) = java_start
|
|
209 |
stmt.date(7) = start
|
|
210 |
stmt.date(8) = start
|
|
211 |
stmt.date(9) = None
|
|
212 |
stmt.long(10) = 0L
|
|
213 |
})
|
|
214 |
}
|
|
215 |
if (context_uuid == _agent_uuid) db.vacuum(Database_Progress.private_data.tables.list)
|
|
216 |
|
|
217 |
def consume(bulk_output: List[Progress.Output]): List[Exn.Result[Unit]] = {
|
|
218 |
val expired = synchronized { _tick += 1; _tick % tick_expire == 0 }
|
|
219 |
val received = db.receive(n => n.channel == Database_Progress.private_data.channel)
|
|
220 |
val ok =
|
|
221 |
bulk_output.nonEmpty || expired || base_progress.stopped && output_stopped ||
|
|
222 |
received.isEmpty ||
|
|
223 |
received.get.contains(Database_Progress.private_data.channel_ping) ||
|
|
224 |
input_messages && received.get.contains(Database_Progress.private_data.channel_output)
|
|
225 |
if (ok) {
|
|
226 |
sync_database {
|
|
227 |
if (bulk_output.nonEmpty) {
|
|
228 |
for (out <- bulk_output) {
|
|
229 |
out match {
|
|
230 |
case message: Progress.Message =>
|
|
231 |
if (do_output(message)) base_progress.output(message)
|
|
232 |
case theory: Progress.Theory => base_progress.theory(theory)
|
|
233 |
}
|
|
234 |
}
|
|
235 |
|
|
236 |
val messages =
|
|
237 |
for ((out, i) <- bulk_output.zipWithIndex)
|
|
238 |
yield (_serial + i + 1) -> out.message
|
|
239 |
|
|
240 |
Database_Progress.private_data.write_messages(db, _context, messages)
|
|
241 |
_serial = messages.last._1
|
|
242 |
|
|
243 |
db.send(Database_Progress.private_data.channel_output)
|
|
244 |
}
|
|
245 |
bulk_output.map(_ => Exn.Res(()))
|
|
246 |
}
|
|
247 |
}
|
|
248 |
else Nil
|
|
249 |
}
|
|
250 |
|
|
251 |
_consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")(
|
|
252 |
bulk = _ => true, timeout = timeout,
|
|
253 |
consume = { bulk_output =>
|
|
254 |
val results =
|
|
255 |
if (bulk_output.isEmpty) consume(Nil)
|
|
256 |
else bulk_output.grouped(200).toList.flatMap(consume)
|
|
257 |
(results, true) })
|
|
258 |
}
|
|
259 |
|
|
260 |
def close(): Unit = synchronized {
|
|
261 |
if (_context > 0) {
|
|
262 |
_consumer.shutdown()
|
|
263 |
_consumer = null
|
|
264 |
|
|
265 |
Database_Progress.private_data.transaction_lock(db, label = "Database_Progress.exit") {
|
|
266 |
Database_Progress.private_data.update_agent(db, _agent_uuid, _serial, stop_now = true)
|
|
267 |
}
|
|
268 |
_context = 0
|
|
269 |
}
|
|
270 |
db.close()
|
|
271 |
}
|
|
272 |
|
|
273 |
private def sync_context[A](body: => A): A = synchronized {
|
|
274 |
if (_context < 0) throw new IllegalStateException("Database_Progress before init")
|
|
275 |
if (_context == 0) throw new IllegalStateException("Database_Progress after exit")
|
|
276 |
|
|
277 |
body
|
|
278 |
}
|
|
279 |
|
|
280 |
private def sync_database[A](body: => A): A = synchronized {
|
|
281 |
Database_Progress.private_data.transaction_lock(db, label = "Database_Progress.sync_database") {
|
|
282 |
_stopped_db = Database_Progress.private_data.read_progress_stopped(db, _context)
|
|
283 |
|
|
284 |
if (_stopped_db && !base_progress.stopped) base_progress.stop()
|
|
285 |
if (!_stopped_db && base_progress.stopped && output_stopped) {
|
|
286 |
Database_Progress.private_data.write_progress_stopped(db, _context, true)
|
|
287 |
db.send(Database_Progress.private_data.channel_ping)
|
|
288 |
}
|
|
289 |
|
|
290 |
val serial0 = _serial
|
|
291 |
if (input_messages) {
|
|
292 |
val messages = Database_Progress.private_data.read_messages(db, _context, seen = _serial)
|
|
293 |
for ((message_serial, message) <- messages) {
|
|
294 |
if (base_progress.do_output(message)) base_progress.output(message)
|
|
295 |
_serial = _serial max message_serial
|
|
296 |
}
|
|
297 |
}
|
|
298 |
else {
|
|
299 |
_serial = _serial max Database_Progress.private_data.read_messages_serial(db, _context)
|
|
300 |
}
|
|
301 |
|
|
302 |
val res = body
|
|
303 |
|
|
304 |
if (_serial != serial0) Database_Progress.private_data.update_agent(db, _agent_uuid, _serial)
|
|
305 |
|
|
306 |
res
|
|
307 |
}
|
|
308 |
}
|
|
309 |
|
|
310 |
private def sync(): Unit = sync_database {}
|
|
311 |
|
|
312 |
override def output(message: Progress.Message): Unit = sync_context { _consumer.send(message) }
|
|
313 |
override def theory(theory: Progress.Theory): Unit = sync_context { _consumer.send(theory) }
|
|
314 |
|
|
315 |
override def nodes_status(nodes_status: Document_Status.Nodes_Status): Unit =
|
|
316 |
base_progress.nodes_status(nodes_status)
|
|
317 |
|
|
318 |
override def verbose: Boolean = base_progress.verbose
|
|
319 |
|
|
320 |
override def stop(): Unit = sync_context { base_progress.stop(); sync() }
|
|
321 |
override def stopped: Boolean = sync_context { base_progress.stopped }
|
|
322 |
override def stopped_local: Boolean = sync_context { base_progress.stopped && !_stopped_db }
|
|
323 |
|
|
324 |
override def toString: String = super.toString + ": database " + db
|
|
325 |
|
|
326 |
init()
|
|
327 |
sync()
|
|
328 |
}
|