302 Base.table.update(List(Base.stop), sql = SQL.where(Generic.sql(build_uuid = build_uuid))), |
302 Base.table.update(List(Base.stop), sql = SQL.where(Generic.sql(build_uuid = build_uuid))), |
303 body = { stmt => stmt.date(1) = db.now() }) |
303 body = { stmt => stmt.date(1) = db.now() }) |
304 |
304 |
305 def clean_build(db: SQL.Database): Unit = { |
305 def clean_build(db: SQL.Database): Unit = { |
306 val old = |
306 val old = |
307 db.using_statement( |
307 db.execute_query_statement( |
308 Base.table.select(List(Base.build_uuid), sql = SQL.where(Base.stop.defined)) |
308 Base.table.select(List(Base.build_uuid), sql = SQL.where(Base.stop.defined)), |
309 )(stmt => stmt.execute_query().iterator(_.string(Base.build_uuid)).toList) |
309 List.from[String], res => res.string(Base.build_uuid)) |
310 |
310 |
311 if (old.nonEmpty) { |
311 if (old.nonEmpty) { |
312 for (table <- List(Base.table, Sessions.table, Progress.table, Workers.table)) { |
312 for (table <- List(Base.table, Sessions.table, Progress.table, Workers.table)) { |
313 db.execute_statement(table.delete(sql = Generic.build_uuid.where_member(old))) |
313 db.execute_statement(table.delete(sql = Generic.build_uuid.where_member(old))) |
314 } |
314 } |
331 val table = make_table("sessions", |
331 val table = make_table("sessions", |
332 List(name, deps, ancestors, sources, timeout, old_time, old_command_timings, build_uuid)) |
332 List(name, deps, ancestors, sources, timeout, old_time, old_command_timings, build_uuid)) |
333 } |
333 } |
334 |
334 |
335 def read_sessions_domain(db: SQL.Database): Set[String] = |
335 def read_sessions_domain(db: SQL.Database): Set[String] = |
336 db.using_statement(Sessions.table.select(List(Sessions.name)))(stmt => |
336 db.execute_query_statement( |
337 Set.from(stmt.execute_query().iterator(_.string(Sessions.name)))) |
337 Sessions.table.select(List(Sessions.name)), |
|
338 Set.from[String], res => res.string(Sessions.name)) |
338 |
339 |
339 def read_sessions(db: SQL.Database, names: Iterable[String] = Nil): State.Sessions = |
340 def read_sessions(db: SQL.Database, names: Iterable[String] = Nil): State.Sessions = |
340 db.using_statement( |
341 db.execute_query_statement( |
341 Sessions.table.select(sql = if_proper(names, Sessions.name.where_member(names))) |
342 Sessions.table.select(sql = if_proper(names, Sessions.name.where_member(names))), |
342 ) { stmt => |
343 Map.from[String, Build_Job.Session_Context], |
343 Map.from(stmt.execute_query().iterator { res => |
344 { res => |
344 val name = res.string(Sessions.name) |
345 val name = res.string(Sessions.name) |
345 val deps = split_lines(res.string(Sessions.deps)) |
346 val deps = split_lines(res.string(Sessions.deps)) |
346 val ancestors = split_lines(res.string(Sessions.ancestors)) |
347 val ancestors = split_lines(res.string(Sessions.ancestors)) |
347 val sources_shasum = SHA1.fake_shasum(res.string(Sessions.sources)) |
348 val sources_shasum = SHA1.fake_shasum(res.string(Sessions.sources)) |
348 val timeout = Time.ms(res.long(Sessions.timeout)) |
349 val timeout = Time.ms(res.long(Sessions.timeout)) |
349 val old_time = Time.ms(res.long(Sessions.old_time)) |
350 val old_time = Time.ms(res.long(Sessions.old_time)) |
350 val old_command_timings_blob = res.bytes(Sessions.old_command_timings) |
351 val old_command_timings_blob = res.bytes(Sessions.old_command_timings) |
351 val build_uuid = res.string(Sessions.build_uuid) |
352 val build_uuid = res.string(Sessions.build_uuid) |
352 name -> Build_Job.Session_Context(name, deps, ancestors, sources_shasum, |
353 name -> Build_Job.Session_Context(name, deps, ancestors, sources_shasum, |
353 timeout, old_time, old_command_timings_blob, build_uuid) |
354 timeout, old_time, old_command_timings_blob, build_uuid) |
354 }) |
|
355 } |
355 } |
|
356 ) |
356 |
357 |
357 def update_sessions(db:SQL.Database, sessions: State.Sessions): Boolean = { |
358 def update_sessions(db:SQL.Database, sessions: State.Sessions): Boolean = { |
358 val old_sessions = read_sessions_domain(db) |
359 val old_sessions = read_sessions_domain(db) |
359 val insert = sessions.iterator.filterNot(p => old_sessions.contains(p._1)).toList |
360 val insert = sessions.iterator.filterNot(p => old_sessions.contains(p._1)).toList |
360 |
361 |
387 |
388 |
388 val table = make_table("progress", List(serial, kind, text, verbose, build_uuid)) |
389 val table = make_table("progress", List(serial, kind, text, verbose, build_uuid)) |
389 } |
390 } |
390 |
391 |
391 def read_progress(db: SQL.Database, seen: Long = 0, build_uuid: String = ""): Progress_Messages = |
392 def read_progress(db: SQL.Database, seen: Long = 0, build_uuid: String = ""): Progress_Messages = |
392 db.using_statement( |
393 db.execute_query_statement( |
393 Progress.table.select( |
394 Progress.table.select( |
394 sql = |
395 sql = |
395 SQL.where( |
396 SQL.where( |
396 SQL.and( |
397 SQL.and( |
397 if (seen <= 0) "" else Progress.serial.ident + " > " + seen, |
398 if (seen <= 0) "" else Progress.serial.ident + " > " + seen, |
398 Generic.sql(build_uuid = build_uuid)))) |
399 Generic.sql(build_uuid = build_uuid)))), |
399 ) { stmt => |
400 SortedMap.from[Long, isabelle.Progress.Message], |
400 SortedMap.from(stmt.execute_query().iterator { res => |
401 { res => |
401 val serial = res.long(Progress.serial) |
402 val serial = res.long(Progress.serial) |
402 val kind = isabelle.Progress.Kind(res.int(Progress.kind)) |
403 val kind = isabelle.Progress.Kind(res.int(Progress.kind)) |
403 val text = res.string(Progress.text) |
404 val text = res.string(Progress.text) |
404 val verbose = res.bool(Progress.verbose) |
405 val verbose = res.bool(Progress.verbose) |
405 serial -> isabelle.Progress.Message(kind, text, verbose = verbose) |
406 serial -> isabelle.Progress.Message(kind, text, verbose = verbose) |
406 }) |
|
407 } |
407 } |
|
408 ) |
408 |
409 |
409 def write_progress( |
410 def write_progress( |
410 db: SQL.Database, |
411 db: SQL.Database, |
411 message_serial: Long, |
412 message_serial: Long, |
412 message: isabelle.Progress.Message, |
413 message: isabelle.Progress.Message, |
437 |
438 |
438 val serial_max = serial.copy(expr = "MAX(" + serial.ident + ")") |
439 val serial_max = serial.copy(expr = "MAX(" + serial.ident + ")") |
439 } |
440 } |
440 |
441 |
441 def serial_max(db: SQL.Database): Long = |
442 def serial_max(db: SQL.Database): Long = |
442 db.using_statement( |
443 db.execute_query_statementO[Long]( |
443 Workers.table.select(List(Workers.serial_max)) |
444 Workers.table.select(List(Workers.serial_max)), |
444 )(stmt => stmt.execute_query().iterator(_.long(Workers.serial)).nextOption.getOrElse(0L)) |
445 res => res.long(Workers.serial) |
|
446 ).getOrElse(0L) |
445 |
447 |
446 def start_worker(db: SQL.Database, worker_uuid: String, build_uuid: String): Long = { |
448 def start_worker(db: SQL.Database, worker_uuid: String, build_uuid: String): Long = { |
447 def err(msg: String): Nothing = |
449 def err(msg: String): Nothing = |
448 error("Cannot start worker " + worker_uuid + if_proper(msg, "\n" + msg)) |
450 error("Cannot start worker " + worker_uuid + if_proper(msg, "\n" + msg)) |
449 |
451 |
450 val build_stop = { |
452 val build_stop = |
451 val sql = |
453 db.execute_query_statementO( |
452 Base.table.select(List(Base.stop), |
454 Base.table.select(List(Base.stop), |
453 sql = SQL.where(Generic.sql(build_uuid = build_uuid))) |
455 sql = SQL.where(Generic.sql(build_uuid = build_uuid))), |
454 db.using_statement(sql)(_.execute_query().iterator(_.get_date(Base.stop)).nextOption) |
456 res => res.get_date(Base.stop)) |
455 } |
457 |
456 build_stop match { |
458 build_stop match { |
457 case Some(None) => |
459 case Some(None) => |
458 case Some(Some(_)) => err("for already stopped build process " + build_uuid) |
460 case Some(Some(_)) => err("for already stopped build process " + build_uuid) |
459 case None => err("for unknown build process " + build_uuid) |
461 case None => err("for unknown build process " + build_uuid) |
460 } |
462 } |
501 |
503 |
502 val table = make_table("pending", List(name, deps, info)) |
504 val table = make_table("pending", List(name, deps, info)) |
503 } |
505 } |
504 |
506 |
505 def read_pending(db: SQL.Database): List[Entry] = |
507 def read_pending(db: SQL.Database): List[Entry] = |
506 db.using_statement(Pending.table.select(sql = SQL.order_by(List(Pending.name)))) { stmt => |
508 db.execute_query_statement( |
507 List.from( |
509 Pending.table.select(sql = SQL.order_by(List(Pending.name))), |
508 stmt.execute_query().iterator { res => |
510 List.from[Entry], |
509 val name = res.string(Pending.name) |
511 { res => |
510 val deps = res.string(Pending.deps) |
512 val name = res.string(Pending.name) |
511 val info = res.string(Pending.info) |
513 val deps = res.string(Pending.deps) |
512 Entry(name, split_lines(deps), info = JSON.Object.parse(info)) |
514 val info = res.string(Pending.info) |
513 }) |
515 Entry(name, split_lines(deps), info = JSON.Object.parse(info)) |
514 } |
516 }) |
515 |
517 |
516 def update_pending(db: SQL.Database, pending: State.Pending): Boolean = { |
518 def update_pending(db: SQL.Database, pending: State.Pending): Boolean = { |
517 val old_pending = read_pending(db) |
519 val old_pending = read_pending(db) |
518 val (delete, insert) = Library.symmetric_difference(old_pending, pending) |
520 val (delete, insert) = Library.symmetric_difference(old_pending, pending) |
519 |
521 |
544 |
546 |
545 val table = make_table("running", List(name, hostname, numa_node)) |
547 val table = make_table("running", List(name, hostname, numa_node)) |
546 } |
548 } |
547 |
549 |
548 def read_running(db: SQL.Database): List[Build_Job.Abstract] = |
550 def read_running(db: SQL.Database): List[Build_Job.Abstract] = |
549 db.using_statement(Running.table.select(sql = SQL.order_by(List(Running.name)))) { stmt => |
551 db.execute_query_statement( |
550 List.from( |
552 Running.table.select(sql = SQL.order_by(List(Running.name))), |
551 stmt.execute_query().iterator { res => |
553 List.from[Build_Job.Abstract], |
552 val name = res.string(Running.name) |
554 { res => |
553 val hostname = res.string(Running.hostname) |
555 val name = res.string(Running.name) |
554 val numa_node = res.get_int(Running.numa_node) |
556 val hostname = res.string(Running.hostname) |
555 Build_Job.Abstract(name, Host.Node_Info(hostname, numa_node)) |
557 val numa_node = res.get_int(Running.numa_node) |
556 }) |
558 Build_Job.Abstract(name, Host.Node_Info(hostname, numa_node)) |
557 } |
559 } |
|
560 ) |
558 |
561 |
559 def update_running(db: SQL.Database, running: State.Running): Boolean = { |
562 def update_running(db: SQL.Database, running: State.Running): Boolean = { |
560 val old_running = read_running(db) |
563 val old_running = read_running(db) |
561 val abs_running = running.valuesIterator.map(_.make_abstract).toList |
564 val abs_running = running.valuesIterator.map(_.make_abstract).toList |
562 |
565 |
597 make_table("results", |
600 make_table("results", |
598 List(name, hostname, numa_node, rc, out, err, timing_elapsed, timing_cpu, timing_gc)) |
601 List(name, hostname, numa_node, rc, out, err, timing_elapsed, timing_cpu, timing_gc)) |
599 } |
602 } |
600 |
603 |
601 def read_results_domain(db: SQL.Database): Set[String] = |
604 def read_results_domain(db: SQL.Database): Set[String] = |
602 db.using_statement(Results.table.select(List(Results.name)))(stmt => |
605 db.execute_query_statement( |
603 Set.from(stmt.execute_query().iterator(_.string(Results.name)))) |
606 Results.table.select(List(Results.name)), |
|
607 Set.from[String], res => res.string(Results.name)) |
604 |
608 |
605 def read_results(db: SQL.Database, names: List[String] = Nil): Map[String, Build_Job.Result] = |
609 def read_results(db: SQL.Database, names: List[String] = Nil): Map[String, Build_Job.Result] = |
606 db.using_statement( |
610 db.execute_query_statement( |
607 Results.table.select(sql = if_proper(names, Results.name.where_member(names))) |
611 Results.table.select(sql = if_proper(names, Results.name.where_member(names))), |
608 ) { stmt => |
612 Map.from[String, Build_Job.Result], |
609 Map.from(stmt.execute_query().iterator { res => |
613 { res => |
610 val name = res.string(Results.name) |
614 val name = res.string(Results.name) |
611 val hostname = res.string(Results.hostname) |
615 val hostname = res.string(Results.hostname) |
612 val numa_node = res.get_int(Results.numa_node) |
616 val numa_node = res.get_int(Results.numa_node) |
613 val rc = res.int(Results.rc) |
617 val rc = res.int(Results.rc) |
614 val out = res.string(Results.out) |
618 val out = res.string(Results.out) |
615 val err = res.string(Results.err) |
619 val err = res.string(Results.err) |
616 val timing = |
620 val timing = |
617 res.timing( |
621 res.timing( |
618 Results.timing_elapsed, |
622 Results.timing_elapsed, |
619 Results.timing_cpu, |
623 Results.timing_cpu, |
620 Results.timing_gc) |
624 Results.timing_gc) |
621 val node_info = Host.Node_Info(hostname, numa_node) |
625 val node_info = Host.Node_Info(hostname, numa_node) |
622 val process_result = |
626 val process_result = |
623 Process_Result(rc, |
627 Process_Result(rc, |
624 out_lines = split_lines(out), |
628 out_lines = split_lines(out), |
625 err_lines = split_lines(err), |
629 err_lines = split_lines(err), |
626 timing = timing) |
630 timing = timing) |
627 name -> Build_Job.Result(node_info, process_result) |
631 name -> Build_Job.Result(node_info, process_result) |
628 }) |
|
629 } |
632 } |
|
633 ) |
630 |
634 |
631 def update_results(db: SQL.Database, results: State.Results): Boolean = { |
635 def update_results(db: SQL.Database, results: State.Results): Boolean = { |
632 val old_results = read_results_domain(db) |
636 val old_results = read_results_domain(db) |
633 val insert = results.iterator.filterNot(p => old_results.contains(p._1)).toList |
637 val insert = results.iterator.filterNot(p => old_results.contains(p._1)).toList |
634 |
638 |