record active execution task and depend on it -- avoid new executions bumping into old ones;
authorwenzelm
Sat Jun 02 21:59:11 2018 +0200 (14 months ago)
changeset 6835493d3c967802e
parent 68353 29cbe9e8ecde
child 68355 67a4db47e4f6
record active execution task and depend on it -- avoid new executions bumping into old ones;
src/Pure/Concurrent/task_queue.ML
src/Pure/PIDE/document.ML
src/Pure/PIDE/execution.ML
     1.1 --- a/src/Pure/Concurrent/task_queue.ML	Sat Jun 02 21:10:20 2018 +0200
     1.2 +++ b/src/Pure/Concurrent/task_queue.ML	Sat Jun 02 21:59:11 2018 +0200
     1.3 @@ -21,6 +21,7 @@
     1.4    val group_of_task: task -> group
     1.5    val name_of_task: task -> string
     1.6    val pri_of_task: task -> int
     1.7 +  val eq_task: task * task -> bool
     1.8    val str_of_task: task -> string
     1.9    val str_of_task_groups: task -> string
    1.10    val task_statistics: task -> Properties.T
    1.11 @@ -129,8 +130,11 @@
    1.12  
    1.13  fun group_of_task (Task {group, ...}) = group;
    1.14  fun name_of_task (Task {name, ...}) = name;
    1.15 +fun id_of_task (Task {id, ...}) = id;
    1.16  fun pri_of_task (Task {pri, ...}) = the_default 0 pri;
    1.17  
    1.18 +fun eq_task (task1, task2) = id_of_task task1 = id_of_task task2;
    1.19 +
    1.20  fun str_of_task (Task {name, id, ...}) =
    1.21    if name = "" then string_of_int id else string_of_int id ^ " (" ^ name ^ ")";
    1.22  
     2.1 --- a/src/Pure/PIDE/document.ML	Sat Jun 02 21:10:20 2018 +0200
     2.2 +++ b/src/Pure/PIDE/document.ML	Sat Jun 02 21:59:11 2018 +0200
     2.3 @@ -497,7 +497,8 @@
     2.4              if Symtab.defined required name orelse visible_node node orelse pending_result node then
     2.5                let
     2.6                  fun body () =
     2.7 -                  (if forall finished_import deps then
     2.8 +                 (Execution.worker_task_active true;
     2.9 +                  if forall finished_import deps then
    2.10                      iterate_entries (fn (_, opt_exec) => fn () =>
    2.11                        (case opt_exec of
    2.12                          SOME exec =>
    2.13 @@ -505,13 +506,19 @@
    2.14                            then SOME (Command.exec execution_id exec)
    2.15                            else NONE
    2.16                        | NONE => NONE)) node ()
    2.17 -                   else ())
    2.18 -                   handle exn => (Output.system_message (Runtime.exn_message exn); Exn.reraise exn);
    2.19 +                  else ();
    2.20 +                  Execution.worker_task_active false)
    2.21 +                  handle exn =>
    2.22 +                   (Output.system_message (Runtime.exn_message exn);
    2.23 +                    Execution.worker_task_active false;
    2.24 +                    Exn.reraise exn);
    2.25                  val future =
    2.26                    (singleton o Future.forks)
    2.27                     {name = "theory:" ^ name,
    2.28                      group = SOME (Future.new_group NONE),
    2.29 -                    deps = Future.task_of delay_request' :: maps (the_list o #2 o #2) deps,
    2.30 +                    deps =
    2.31 +                      Future.task_of delay_request' :: Execution.active_tasks name @
    2.32 +                        maps (the_list o #2 o #2) deps,
    2.33                      pri = 0, interrupts = false} body;
    2.34                in (node, SOME (Future.task_of future)) end
    2.35              else (node, NONE));
     3.1 --- a/src/Pure/PIDE/execution.ML	Sat Jun 02 21:10:20 2018 +0200
     3.2 +++ b/src/Pure/PIDE/execution.ML	Sat Jun 02 21:59:11 2018 +0200
     3.3 @@ -10,6 +10,8 @@
     3.4    val start: unit -> Document_ID.execution
     3.5    val discontinue: unit -> unit
     3.6    val is_running: Document_ID.execution -> bool
     3.7 +  val active_tasks: string -> Future.task list
     3.8 +  val worker_task_active: bool -> string -> unit
     3.9    val is_running_exec: Document_ID.exec -> bool
    3.10    val running: Document_ID.execution -> Document_ID.exec -> Future.group list -> bool
    3.11    val snapshot: Document_ID.exec list -> Future.task list
    3.12 @@ -37,26 +39,27 @@
    3.13  datatype state =
    3.14    State of
    3.15     {execution_id: Document_ID.execution,  (*overall document execution*)
    3.16 +    nodes: Future.task list Symtab.table,  (*active nodes*)
    3.17      execs: execs};  (*running command execs*)
    3.18  
    3.19 -fun make_state (execution_id, execs) =
    3.20 -  State {execution_id = execution_id, execs = execs};
    3.21 +fun make_state (execution_id, nodes, execs) =
    3.22 +  State {execution_id = execution_id, nodes = nodes, execs = execs};
    3.23  
    3.24  local
    3.25    val state =
    3.26 -    Synchronized.var "Execution.state" (make_state (Document_ID.none, init_execs));
    3.27 +    Synchronized.var "Execution.state" (make_state (Document_ID.none, Symtab.empty, init_execs));
    3.28  in
    3.29  
    3.30  fun get_state () = let val State args = Synchronized.value state in args end;
    3.31  
    3.32  fun change_state_result f =
    3.33 -  Synchronized.change_result state (fn (State {execution_id, execs}) =>
    3.34 -    let val (result, args') = f (execution_id, execs)
    3.35 +  Synchronized.change_result state (fn (State {execution_id, nodes, execs}) =>
    3.36 +    let val (result, args') = f (execution_id, nodes, execs)
    3.37      in (result, make_state args') end);
    3.38  
    3.39  fun change_state f =
    3.40 -  Synchronized.change state (fn (State {execution_id, execs}) =>
    3.41 -    make_state (f (execution_id, execs)));
    3.42 +  Synchronized.change state (fn (State {execution_id, nodes, execs}) =>
    3.43 +    make_state (f (execution_id, nodes, execs)));
    3.44  
    3.45  end;
    3.46  
    3.47 @@ -68,26 +71,41 @@
    3.48  fun start () =
    3.49    let
    3.50      val execution_id = Document_ID.make ();
    3.51 -    val _ = change_state (apfst (K execution_id));
    3.52 +    val _ = change_state (fn (_, nodes, execs) => (execution_id, nodes, execs));
    3.53    in execution_id end;
    3.54  
    3.55 -fun discontinue () = change_state (apfst (K Document_ID.none));
    3.56 +fun discontinue () =
    3.57 +  change_state (fn (_, nodes, execs) => (Document_ID.none, nodes, execs));
    3.58  
    3.59  fun is_running execution_id =
    3.60    execution_id = #execution_id (get_state ());
    3.61  
    3.62  
    3.63 +(* active nodes *)
    3.64 +
    3.65 +fun active_tasks node_name =
    3.66 +  Symtab.lookup_list (#nodes (get_state ())) node_name;
    3.67 +
    3.68 +fun worker_task_active insert node_name =
    3.69 +  change_state (fn (execution_id, nodes, execs) =>
    3.70 +    let
    3.71 +      val nodes' = nodes
    3.72 +        |> (if insert then Symtab.insert_list else Symtab.remove_list)
    3.73 +          Task_Queue.eq_task (node_name, the (Future.worker_task ()));
    3.74 +    in (execution_id, nodes', execs) end);
    3.75 +
    3.76 +
    3.77  (* running execs *)
    3.78  
    3.79  fun is_running_exec exec_id =
    3.80    Inttab.defined (#execs (get_state ())) exec_id;
    3.81  
    3.82  fun running execution_id exec_id groups =
    3.83 -  change_state_result (fn (execution_id', execs) =>
    3.84 +  change_state_result (fn (execution_id', nodes, execs) =>
    3.85      let
    3.86        val ok = execution_id = execution_id' andalso not (Inttab.defined execs exec_id);
    3.87        val execs' = execs |> ok ? Inttab.update (exec_id, (groups, []));
    3.88 -    in (ok, (execution_id', execs')) end);
    3.89 +    in (ok, (execution_id', nodes, execs')) end);
    3.90  
    3.91  
    3.92  (* exec groups and tasks *)
    3.93 @@ -98,7 +116,7 @@
    3.94    | NONE => []);
    3.95  
    3.96  fun snapshot exec_ids =
    3.97 -  change_state_result (`(fn (_, execs) => Future.snapshot (maps (exec_groups execs) exec_ids)));
    3.98 +  change_state_result (`(fn (_, _, execs) => Future.snapshot (maps (exec_groups execs) exec_ids)));
    3.99  
   3.100  fun join exec_ids =
   3.101    (case snapshot exec_ids of
   3.102 @@ -130,11 +148,12 @@
   3.103      let
   3.104        val exec_id = the_default 0 (Position.parse_id pos);
   3.105        val group = Future.worker_subgroup ();
   3.106 -      val _ = change_state (apsnd (fn execs =>
   3.107 +      val _ = change_state (fn (execution_id, nodes, execs) =>
   3.108          (case Inttab.lookup execs exec_id of
   3.109            SOME (groups, prints) =>
   3.110 -            Inttab.update (exec_id, (group :: groups, prints)) execs
   3.111 -        | NONE => raise Fail (unregistered exec_id))));
   3.112 +            let val execs' = Inttab.update (exec_id, (group :: groups, prints)) execs
   3.113 +            in (execution_id, nodes, execs') end
   3.114 +        | NONE => raise Fail (unregistered exec_id)));
   3.115  
   3.116        val future =
   3.117          (singleton o Future.forks)
   3.118 @@ -167,15 +186,17 @@
   3.119  (* print *)
   3.120  
   3.121  fun print ({name, pos, pri}: params) e =
   3.122 -  change_state (apsnd (fn execs =>
   3.123 +  change_state (fn (execution_id, nodes, execs) =>
   3.124      let
   3.125        val exec_id = the_default 0 (Position.parse_id pos);
   3.126        val print = {name = name, pri = pri, body = e};
   3.127      in
   3.128        (case Inttab.lookup execs exec_id of
   3.129 -        SOME (groups, prints) => Inttab.update (exec_id, (groups, print :: prints)) execs
   3.130 +        SOME (groups, prints) =>
   3.131 +          let val execs' = Inttab.update (exec_id, (groups, print :: prints)) execs
   3.132 +          in (execution_id, nodes, execs') end
   3.133        | NONE => raise Fail (unregistered exec_id))
   3.134 -    end));
   3.135 +    end);
   3.136  
   3.137  fun fork_prints exec_id =
   3.138    (case Inttab.lookup (#execs (get_state ())) exec_id of
   3.139 @@ -192,7 +213,7 @@
   3.140  (* cleanup *)
   3.141  
   3.142  fun purge exec_ids =
   3.143 -  (change_state o apsnd) (fn execs =>
   3.144 +  change_state (fn (execution_id, nodes, execs) =>
   3.145      let
   3.146        val execs' = fold Inttab.delete_safe exec_ids execs;
   3.147        val () =
   3.148 @@ -201,12 +222,12 @@
   3.149            else groups |> List.app (fn group =>
   3.150              if Task_Queue.is_canceled group then ()
   3.151              else raise Fail ("Attempt to purge valid execution: " ^ Document_ID.print exec_id)));
   3.152 -    in execs' end);
   3.153 +    in (execution_id, nodes, execs') end);
   3.154  
   3.155  fun reset () =
   3.156 -  change_state_result (fn (_, execs) =>
   3.157 +  change_state_result (fn (_, _, execs) =>
   3.158      let val groups = Inttab.fold (append o #1 o #2) execs []
   3.159 -    in (groups, (Document_ID.none, init_execs)) end);
   3.160 +    in (groups, (Document_ID.none, Symtab.empty, init_execs)) end);
   3.161  
   3.162  fun shutdown () =
   3.163    (Future.shutdown ();