# HG changeset patch # User wenzelm # Date 1527969551 -7200 # Node ID 93d3c967802ef5c4ccb5d3e4278573a8b71b93a3 # Parent 29cbe9e8ecde1e92a7f68e300e85c246abb34f38 record active execution task and depend on it -- avoid new executions bumping into old ones; diff -r 29cbe9e8ecde -r 93d3c967802e src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Sat Jun 02 21:10:20 2018 +0200 +++ b/src/Pure/Concurrent/task_queue.ML Sat Jun 02 21:59:11 2018 +0200 @@ -21,6 +21,7 @@ val group_of_task: task -> group val name_of_task: task -> string val pri_of_task: task -> int + val eq_task: task * task -> bool val str_of_task: task -> string val str_of_task_groups: task -> string val task_statistics: task -> Properties.T @@ -129,8 +130,11 @@ fun group_of_task (Task {group, ...}) = group; fun name_of_task (Task {name, ...}) = name; +fun id_of_task (Task {id, ...}) = id; fun pri_of_task (Task {pri, ...}) = the_default 0 pri; +fun eq_task (task1, task2) = id_of_task task1 = id_of_task task2; + fun str_of_task (Task {name, id, ...}) = if name = "" then string_of_int id else string_of_int id ^ " (" ^ name ^ ")"; diff -r 29cbe9e8ecde -r 93d3c967802e src/Pure/PIDE/document.ML --- a/src/Pure/PIDE/document.ML Sat Jun 02 21:10:20 2018 +0200 +++ b/src/Pure/PIDE/document.ML Sat Jun 02 21:59:11 2018 +0200 @@ -497,7 +497,8 @@ if Symtab.defined required name orelse visible_node node orelse pending_result node then let fun body () = - (if forall finished_import deps then + (Execution.worker_task_active true; + if forall finished_import deps then iterate_entries (fn (_, opt_exec) => fn () => (case opt_exec of SOME exec => @@ -505,13 +506,19 @@ then SOME (Command.exec execution_id exec) else NONE | NONE => NONE)) node () - else ()) - handle exn => (Output.system_message (Runtime.exn_message exn); Exn.reraise exn); + else (); + Execution.worker_task_active false) + handle exn => + (Output.system_message (Runtime.exn_message exn); + Execution.worker_task_active false; + Exn.reraise exn); val future = (singleton o Future.forks) {name = "theory:" ^ name, group = SOME (Future.new_group NONE), - deps = Future.task_of delay_request' :: maps (the_list o #2 o #2) deps, + deps = + Future.task_of delay_request' :: Execution.active_tasks name @ + maps (the_list o #2 o #2) deps, pri = 0, interrupts = false} body; in (node, SOME (Future.task_of future)) end else (node, NONE)); diff -r 29cbe9e8ecde -r 93d3c967802e src/Pure/PIDE/execution.ML --- a/src/Pure/PIDE/execution.ML Sat Jun 02 21:10:20 2018 +0200 +++ b/src/Pure/PIDE/execution.ML Sat Jun 02 21:59:11 2018 +0200 @@ -10,6 +10,8 @@ val start: unit -> Document_ID.execution val discontinue: unit -> unit val is_running: Document_ID.execution -> bool + val active_tasks: string -> Future.task list + val worker_task_active: bool -> string -> unit val is_running_exec: Document_ID.exec -> bool val running: Document_ID.execution -> Document_ID.exec -> Future.group list -> bool val snapshot: Document_ID.exec list -> Future.task list @@ -37,26 +39,27 @@ datatype state = State of {execution_id: Document_ID.execution, (*overall document execution*) + nodes: Future.task list Symtab.table, (*active nodes*) execs: execs}; (*running command execs*) -fun make_state (execution_id, execs) = - State {execution_id = execution_id, execs = execs}; +fun make_state (execution_id, nodes, execs) = + State {execution_id = execution_id, nodes = nodes, execs = execs}; local val state = - Synchronized.var "Execution.state" (make_state (Document_ID.none, init_execs)); + Synchronized.var "Execution.state" (make_state (Document_ID.none, Symtab.empty, init_execs)); in fun get_state () = let val State args = Synchronized.value state in args end; fun change_state_result f = - Synchronized.change_result state (fn (State {execution_id, execs}) => - let val (result, args') = f (execution_id, execs) + Synchronized.change_result state (fn (State {execution_id, nodes, execs}) => + let val (result, args') = f (execution_id, nodes, execs) in (result, make_state args') end); fun change_state f = - Synchronized.change state (fn (State {execution_id, execs}) => - make_state (f (execution_id, execs))); + Synchronized.change state (fn (State {execution_id, nodes, execs}) => + make_state (f (execution_id, nodes, execs))); end; @@ -68,26 +71,41 @@ fun start () = let val execution_id = Document_ID.make (); - val _ = change_state (apfst (K execution_id)); + val _ = change_state (fn (_, nodes, execs) => (execution_id, nodes, execs)); in execution_id end; -fun discontinue () = change_state (apfst (K Document_ID.none)); +fun discontinue () = + change_state (fn (_, nodes, execs) => (Document_ID.none, nodes, execs)); fun is_running execution_id = execution_id = #execution_id (get_state ()); +(* active nodes *) + +fun active_tasks node_name = + Symtab.lookup_list (#nodes (get_state ())) node_name; + +fun worker_task_active insert node_name = + change_state (fn (execution_id, nodes, execs) => + let + val nodes' = nodes + |> (if insert then Symtab.insert_list else Symtab.remove_list) + Task_Queue.eq_task (node_name, the (Future.worker_task ())); + in (execution_id, nodes', execs) end); + + (* running execs *) fun is_running_exec exec_id = Inttab.defined (#execs (get_state ())) exec_id; fun running execution_id exec_id groups = - change_state_result (fn (execution_id', execs) => + change_state_result (fn (execution_id', nodes, execs) => let val ok = execution_id = execution_id' andalso not (Inttab.defined execs exec_id); val execs' = execs |> ok ? Inttab.update (exec_id, (groups, [])); - in (ok, (execution_id', execs')) end); + in (ok, (execution_id', nodes, execs')) end); (* exec groups and tasks *) @@ -98,7 +116,7 @@ | NONE => []); fun snapshot exec_ids = - change_state_result (`(fn (_, execs) => Future.snapshot (maps (exec_groups execs) exec_ids))); + change_state_result (`(fn (_, _, execs) => Future.snapshot (maps (exec_groups execs) exec_ids))); fun join exec_ids = (case snapshot exec_ids of @@ -130,11 +148,12 @@ let val exec_id = the_default 0 (Position.parse_id pos); val group = Future.worker_subgroup (); - val _ = change_state (apsnd (fn execs => + val _ = change_state (fn (execution_id, nodes, execs) => (case Inttab.lookup execs exec_id of SOME (groups, prints) => - Inttab.update (exec_id, (group :: groups, prints)) execs - | NONE => raise Fail (unregistered exec_id)))); + let val execs' = Inttab.update (exec_id, (group :: groups, prints)) execs + in (execution_id, nodes, execs') end + | NONE => raise Fail (unregistered exec_id))); val future = (singleton o Future.forks) @@ -167,15 +186,17 @@ (* print *) fun print ({name, pos, pri}: params) e = - change_state (apsnd (fn execs => + change_state (fn (execution_id, nodes, execs) => let val exec_id = the_default 0 (Position.parse_id pos); val print = {name = name, pri = pri, body = e}; in (case Inttab.lookup execs exec_id of - SOME (groups, prints) => Inttab.update (exec_id, (groups, print :: prints)) execs + SOME (groups, prints) => + let val execs' = Inttab.update (exec_id, (groups, print :: prints)) execs + in (execution_id, nodes, execs') end | NONE => raise Fail (unregistered exec_id)) - end)); + end); fun fork_prints exec_id = (case Inttab.lookup (#execs (get_state ())) exec_id of @@ -192,7 +213,7 @@ (* cleanup *) fun purge exec_ids = - (change_state o apsnd) (fn execs => + change_state (fn (execution_id, nodes, execs) => let val execs' = fold Inttab.delete_safe exec_ids execs; val () = @@ -201,12 +222,12 @@ else groups |> List.app (fn group => if Task_Queue.is_canceled group then () else raise Fail ("Attempt to purge valid execution: " ^ Document_ID.print exec_id))); - in execs' end); + in (execution_id, nodes, execs') end); fun reset () = - change_state_result (fn (_, execs) => + change_state_result (fn (_, _, execs) => let val groups = Inttab.fold (append o #1 o #2) execs [] - in (groups, (Document_ID.none, init_execs)) end); + in (groups, (Document_ID.none, Symtab.empty, init_execs)) end); fun shutdown () = (Future.shutdown ();