--- a/src/Pure/PIDE/execution.ML Sat Jun 02 21:10:20 2018 +0200
+++ b/src/Pure/PIDE/execution.ML Sat Jun 02 21:59:11 2018 +0200
@@ -10,6 +10,8 @@
val start: unit -> Document_ID.execution
val discontinue: unit -> unit
val is_running: Document_ID.execution -> bool
+ val active_tasks: string -> Future.task list
+ val worker_task_active: bool -> string -> unit
val is_running_exec: Document_ID.exec -> bool
val running: Document_ID.execution -> Document_ID.exec -> Future.group list -> bool
val snapshot: Document_ID.exec list -> Future.task list
@@ -37,26 +39,27 @@
datatype state =
State of
{execution_id: Document_ID.execution, (*overall document execution*)
+ nodes: Future.task list Symtab.table, (*active nodes*)
execs: execs}; (*running command execs*)
-fun make_state (execution_id, execs) =
- State {execution_id = execution_id, execs = execs};
+fun make_state (execution_id, nodes, execs) =
+ State {execution_id = execution_id, nodes = nodes, execs = execs};
local
val state =
- Synchronized.var "Execution.state" (make_state (Document_ID.none, init_execs));
+ Synchronized.var "Execution.state" (make_state (Document_ID.none, Symtab.empty, init_execs));
in
fun get_state () = let val State args = Synchronized.value state in args end;
fun change_state_result f =
- Synchronized.change_result state (fn (State {execution_id, execs}) =>
- let val (result, args') = f (execution_id, execs)
+ Synchronized.change_result state (fn (State {execution_id, nodes, execs}) =>
+ let val (result, args') = f (execution_id, nodes, execs)
in (result, make_state args') end);
fun change_state f =
- Synchronized.change state (fn (State {execution_id, execs}) =>
- make_state (f (execution_id, execs)));
+ Synchronized.change state (fn (State {execution_id, nodes, execs}) =>
+ make_state (f (execution_id, nodes, execs)));
end;
@@ -68,26 +71,41 @@
fun start () =
let
val execution_id = Document_ID.make ();
- val _ = change_state (apfst (K execution_id));
+ val _ = change_state (fn (_, nodes, execs) => (execution_id, nodes, execs));
in execution_id end;
-fun discontinue () = change_state (apfst (K Document_ID.none));
+fun discontinue () =
+ change_state (fn (_, nodes, execs) => (Document_ID.none, nodes, execs));
fun is_running execution_id =
execution_id = #execution_id (get_state ());
+(* active nodes *)
+
+fun active_tasks node_name =
+ Symtab.lookup_list (#nodes (get_state ())) node_name;
+
+fun worker_task_active insert node_name =
+ change_state (fn (execution_id, nodes, execs) =>
+ let
+ val nodes' = nodes
+ |> (if insert then Symtab.insert_list else Symtab.remove_list)
+ Task_Queue.eq_task (node_name, the (Future.worker_task ()));
+ in (execution_id, nodes', execs) end);
+
+
(* running execs *)
fun is_running_exec exec_id =
Inttab.defined (#execs (get_state ())) exec_id;
fun running execution_id exec_id groups =
- change_state_result (fn (execution_id', execs) =>
+ change_state_result (fn (execution_id', nodes, execs) =>
let
val ok = execution_id = execution_id' andalso not (Inttab.defined execs exec_id);
val execs' = execs |> ok ? Inttab.update (exec_id, (groups, []));
- in (ok, (execution_id', execs')) end);
+ in (ok, (execution_id', nodes, execs')) end);
(* exec groups and tasks *)
@@ -98,7 +116,7 @@
| NONE => []);
fun snapshot exec_ids =
- change_state_result (`(fn (_, execs) => Future.snapshot (maps (exec_groups execs) exec_ids)));
+ change_state_result (`(fn (_, _, execs) => Future.snapshot (maps (exec_groups execs) exec_ids)));
fun join exec_ids =
(case snapshot exec_ids of
@@ -130,11 +148,12 @@
let
val exec_id = the_default 0 (Position.parse_id pos);
val group = Future.worker_subgroup ();
- val _ = change_state (apsnd (fn execs =>
+ val _ = change_state (fn (execution_id, nodes, execs) =>
(case Inttab.lookup execs exec_id of
SOME (groups, prints) =>
- Inttab.update (exec_id, (group :: groups, prints)) execs
- | NONE => raise Fail (unregistered exec_id))));
+ let val execs' = Inttab.update (exec_id, (group :: groups, prints)) execs
+ in (execution_id, nodes, execs') end
+ | NONE => raise Fail (unregistered exec_id)));
val future =
(singleton o Future.forks)
@@ -167,15 +186,17 @@
(* print *)
fun print ({name, pos, pri}: params) e =
- change_state (apsnd (fn execs =>
+ change_state (fn (execution_id, nodes, execs) =>
let
val exec_id = the_default 0 (Position.parse_id pos);
val print = {name = name, pri = pri, body = e};
in
(case Inttab.lookup execs exec_id of
- SOME (groups, prints) => Inttab.update (exec_id, (groups, print :: prints)) execs
+ SOME (groups, prints) =>
+ let val execs' = Inttab.update (exec_id, (groups, print :: prints)) execs
+ in (execution_id, nodes, execs') end
| NONE => raise Fail (unregistered exec_id))
- end));
+ end);
fun fork_prints exec_id =
(case Inttab.lookup (#execs (get_state ())) exec_id of
@@ -192,7 +213,7 @@
(* cleanup *)
fun purge exec_ids =
- (change_state o apsnd) (fn execs =>
+ change_state (fn (execution_id, nodes, execs) =>
let
val execs' = fold Inttab.delete_safe exec_ids execs;
val () =
@@ -201,12 +222,12 @@
else groups |> List.app (fn group =>
if Task_Queue.is_canceled group then ()
else raise Fail ("Attempt to purge valid execution: " ^ Document_ID.print exec_id)));
- in execs' end);
+ in (execution_id, nodes, execs') end);
fun reset () =
- change_state_result (fn (_, execs) =>
+ change_state_result (fn (_, _, execs) =>
let val groups = Inttab.fold (append o #1 o #2) execs []
- in (groups, (Document_ID.none, init_execs)) end);
+ in (groups, (Document_ID.none, Symtab.empty, init_execs)) end);
fun shutdown () =
(Future.shutdown ();