record active execution task and depend on it -- avoid new executions bumping into old ones;
authorwenzelm
Sat, 02 Jun 2018 21:59:11 +0200
changeset 68354 93d3c967802e
parent 68353 29cbe9e8ecde
child 68355 67a4db47e4f6
record active execution task and depend on it -- avoid new executions bumping into old ones;
src/Pure/Concurrent/task_queue.ML
src/Pure/PIDE/document.ML
src/Pure/PIDE/execution.ML
--- a/src/Pure/Concurrent/task_queue.ML	Sat Jun 02 21:10:20 2018 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Sat Jun 02 21:59:11 2018 +0200
@@ -21,6 +21,7 @@
   val group_of_task: task -> group
   val name_of_task: task -> string
   val pri_of_task: task -> int
+  val eq_task: task * task -> bool
   val str_of_task: task -> string
   val str_of_task_groups: task -> string
   val task_statistics: task -> Properties.T
@@ -129,8 +130,11 @@
 
 fun group_of_task (Task {group, ...}) = group;
 fun name_of_task (Task {name, ...}) = name;
+fun id_of_task (Task {id, ...}) = id;
 fun pri_of_task (Task {pri, ...}) = the_default 0 pri;
 
+fun eq_task (task1, task2) = id_of_task task1 = id_of_task task2;
+
 fun str_of_task (Task {name, id, ...}) =
   if name = "" then string_of_int id else string_of_int id ^ " (" ^ name ^ ")";
 
--- a/src/Pure/PIDE/document.ML	Sat Jun 02 21:10:20 2018 +0200
+++ b/src/Pure/PIDE/document.ML	Sat Jun 02 21:59:11 2018 +0200
@@ -497,7 +497,8 @@
             if Symtab.defined required name orelse visible_node node orelse pending_result node then
               let
                 fun body () =
-                  (if forall finished_import deps then
+                 (Execution.worker_task_active true;
+                  if forall finished_import deps then
                     iterate_entries (fn (_, opt_exec) => fn () =>
                       (case opt_exec of
                         SOME exec =>
@@ -505,13 +506,19 @@
                           then SOME (Command.exec execution_id exec)
                           else NONE
                       | NONE => NONE)) node ()
-                   else ())
-                   handle exn => (Output.system_message (Runtime.exn_message exn); Exn.reraise exn);
+                  else ();
+                  Execution.worker_task_active false)
+                  handle exn =>
+                   (Output.system_message (Runtime.exn_message exn);
+                    Execution.worker_task_active false;
+                    Exn.reraise exn);
                 val future =
                   (singleton o Future.forks)
                    {name = "theory:" ^ name,
                     group = SOME (Future.new_group NONE),
-                    deps = Future.task_of delay_request' :: maps (the_list o #2 o #2) deps,
+                    deps =
+                      Future.task_of delay_request' :: Execution.active_tasks name @
+                        maps (the_list o #2 o #2) deps,
                     pri = 0, interrupts = false} body;
               in (node, SOME (Future.task_of future)) end
             else (node, NONE));
--- a/src/Pure/PIDE/execution.ML	Sat Jun 02 21:10:20 2018 +0200
+++ b/src/Pure/PIDE/execution.ML	Sat Jun 02 21:59:11 2018 +0200
@@ -10,6 +10,8 @@
   val start: unit -> Document_ID.execution
   val discontinue: unit -> unit
   val is_running: Document_ID.execution -> bool
+  val active_tasks: string -> Future.task list
+  val worker_task_active: bool -> string -> unit
   val is_running_exec: Document_ID.exec -> bool
   val running: Document_ID.execution -> Document_ID.exec -> Future.group list -> bool
   val snapshot: Document_ID.exec list -> Future.task list
@@ -37,26 +39,27 @@
 datatype state =
   State of
    {execution_id: Document_ID.execution,  (*overall document execution*)
+    nodes: Future.task list Symtab.table,  (*active nodes*)
     execs: execs};  (*running command execs*)
 
-fun make_state (execution_id, execs) =
-  State {execution_id = execution_id, execs = execs};
+fun make_state (execution_id, nodes, execs) =
+  State {execution_id = execution_id, nodes = nodes, execs = execs};
 
 local
   val state =
-    Synchronized.var "Execution.state" (make_state (Document_ID.none, init_execs));
+    Synchronized.var "Execution.state" (make_state (Document_ID.none, Symtab.empty, init_execs));
 in
 
 fun get_state () = let val State args = Synchronized.value state in args end;
 
 fun change_state_result f =
-  Synchronized.change_result state (fn (State {execution_id, execs}) =>
-    let val (result, args') = f (execution_id, execs)
+  Synchronized.change_result state (fn (State {execution_id, nodes, execs}) =>
+    let val (result, args') = f (execution_id, nodes, execs)
     in (result, make_state args') end);
 
 fun change_state f =
-  Synchronized.change state (fn (State {execution_id, execs}) =>
-    make_state (f (execution_id, execs)));
+  Synchronized.change state (fn (State {execution_id, nodes, execs}) =>
+    make_state (f (execution_id, nodes, execs)));
 
 end;
 
@@ -68,26 +71,41 @@
 fun start () =
   let
     val execution_id = Document_ID.make ();
-    val _ = change_state (apfst (K execution_id));
+    val _ = change_state (fn (_, nodes, execs) => (execution_id, nodes, execs));
   in execution_id end;
 
-fun discontinue () = change_state (apfst (K Document_ID.none));
+fun discontinue () =
+  change_state (fn (_, nodes, execs) => (Document_ID.none, nodes, execs));
 
 fun is_running execution_id =
   execution_id = #execution_id (get_state ());
 
 
+(* active nodes *)
+
+fun active_tasks node_name =
+  Symtab.lookup_list (#nodes (get_state ())) node_name;
+
+fun worker_task_active insert node_name =
+  change_state (fn (execution_id, nodes, execs) =>
+    let
+      val nodes' = nodes
+        |> (if insert then Symtab.insert_list else Symtab.remove_list)
+          Task_Queue.eq_task (node_name, the (Future.worker_task ()));
+    in (execution_id, nodes', execs) end);
+
+
 (* running execs *)
 
 fun is_running_exec exec_id =
   Inttab.defined (#execs (get_state ())) exec_id;
 
 fun running execution_id exec_id groups =
-  change_state_result (fn (execution_id', execs) =>
+  change_state_result (fn (execution_id', nodes, execs) =>
     let
       val ok = execution_id = execution_id' andalso not (Inttab.defined execs exec_id);
       val execs' = execs |> ok ? Inttab.update (exec_id, (groups, []));
-    in (ok, (execution_id', execs')) end);
+    in (ok, (execution_id', nodes, execs')) end);
 
 
 (* exec groups and tasks *)
@@ -98,7 +116,7 @@
   | NONE => []);
 
 fun snapshot exec_ids =
-  change_state_result (`(fn (_, execs) => Future.snapshot (maps (exec_groups execs) exec_ids)));
+  change_state_result (`(fn (_, _, execs) => Future.snapshot (maps (exec_groups execs) exec_ids)));
 
 fun join exec_ids =
   (case snapshot exec_ids of
@@ -130,11 +148,12 @@
     let
       val exec_id = the_default 0 (Position.parse_id pos);
       val group = Future.worker_subgroup ();
-      val _ = change_state (apsnd (fn execs =>
+      val _ = change_state (fn (execution_id, nodes, execs) =>
         (case Inttab.lookup execs exec_id of
           SOME (groups, prints) =>
-            Inttab.update (exec_id, (group :: groups, prints)) execs
-        | NONE => raise Fail (unregistered exec_id))));
+            let val execs' = Inttab.update (exec_id, (group :: groups, prints)) execs
+            in (execution_id, nodes, execs') end
+        | NONE => raise Fail (unregistered exec_id)));
 
       val future =
         (singleton o Future.forks)
@@ -167,15 +186,17 @@
 (* print *)
 
 fun print ({name, pos, pri}: params) e =
-  change_state (apsnd (fn execs =>
+  change_state (fn (execution_id, nodes, execs) =>
     let
       val exec_id = the_default 0 (Position.parse_id pos);
       val print = {name = name, pri = pri, body = e};
     in
       (case Inttab.lookup execs exec_id of
-        SOME (groups, prints) => Inttab.update (exec_id, (groups, print :: prints)) execs
+        SOME (groups, prints) =>
+          let val execs' = Inttab.update (exec_id, (groups, print :: prints)) execs
+          in (execution_id, nodes, execs') end
       | NONE => raise Fail (unregistered exec_id))
-    end));
+    end);
 
 fun fork_prints exec_id =
   (case Inttab.lookup (#execs (get_state ())) exec_id of
@@ -192,7 +213,7 @@
 (* cleanup *)
 
 fun purge exec_ids =
-  (change_state o apsnd) (fn execs =>
+  change_state (fn (execution_id, nodes, execs) =>
     let
       val execs' = fold Inttab.delete_safe exec_ids execs;
       val () =
@@ -201,12 +222,12 @@
           else groups |> List.app (fn group =>
             if Task_Queue.is_canceled group then ()
             else raise Fail ("Attempt to purge valid execution: " ^ Document_ID.print exec_id)));
-    in execs' end);
+    in (execution_id, nodes, execs') end);
 
 fun reset () =
-  change_state_result (fn (_, execs) =>
+  change_state_result (fn (_, _, execs) =>
     let val groups = Inttab.fold (append o #1 o #2) execs []
-    in (groups, (Document_ID.none, init_execs)) end);
+    in (groups, (Document_ID.none, Symtab.empty, init_execs)) end);
 
 fun shutdown () =
   (Future.shutdown ();