1 (* Title: Pure/PIDE/execution.ML
4 Global management of execution. Unique running execution serves as
5 barrier for further exploration of forked command execs.
10 val start: unit -> Document_ID.execution
11 val discontinue: unit -> unit
12 val is_running: Document_ID.execution -> bool
13 val is_running_exec: Document_ID.exec -> bool
14 val running: Document_ID.execution -> Document_ID.exec -> Future.group list -> bool
15 val peek: Document_ID.exec -> Future.group list
16 val cancel: Document_ID.exec -> unit
17 type params = {name: string, pos: Position.T, pri: int}
18 val fork: params -> (unit -> 'a) -> 'a future
19 val print: params -> (unit -> unit) -> unit
20 val fork_prints: Document_ID.exec -> unit
21 val purge: Document_ID.exec list -> unit
22 val reset: unit -> Future.group list
23 val shutdown: unit -> unit
26 structure Execution: EXECUTION =
31 type print = {name: string, pri: int, body: unit -> unit};
32 type exec_state = Future.group list * print list; (*active forks, prints*)
34 Document_ID.execution * (*overall document execution*)
35 exec_state Inttab.table; (*running command execs*)
37 val init_state: state = (Document_ID.none, Inttab.make [(Document_ID.none, ([], []))]);
38 val state = Synchronized.var "Execution.state" init_state;
40 fun get_state () = Synchronized.value state;
41 fun change_state_result f = Synchronized.change_result state f;
42 fun change_state f = Synchronized.change state f;
44 fun unregistered exec_id = "Unregistered execution: " ^ Document_ID.print exec_id;
47 (* unique running execution *)
51 val execution_id = Document_ID.make ();
52 val _ = change_state (apfst (K execution_id));
55 fun discontinue () = change_state (apfst (K Document_ID.none));
57 fun is_running execution_id = execution_id = #1 (get_state ());
62 fun is_running_exec exec_id =
63 Inttab.defined (#2 (get_state ())) exec_id;
65 fun running execution_id exec_id groups =
66 change_state_result (fn (execution_id', execs) =>
68 val ok = execution_id = execution_id' andalso not (Inttab.defined execs exec_id);
69 val execs' = execs |> ok ? Inttab.update (exec_id, (groups, []));
70 in (ok, (execution_id', execs')) end);
73 (case Inttab.lookup (#2 (get_state ())) exec_id of
74 SOME (groups, _) => groups
77 fun cancel exec_id = List.app Future.cancel_group (peek exec_id);
82 fun status task markups =
85 if ! Multithreading.trace >= 2
86 then [(Markup.taskN, Task_Queue.str_of_task task)] else [];
87 in Output.status (implode (map (Markup.markup_only o Markup.properties props) markups)) end;
89 type params = {name: string, pos: Position.T, pri: int};
91 fun fork ({name, pos, pri}: params) e =
92 Thread_Attributes.uninterruptible (fn _ => Position.setmp_thread_data pos (fn () =>
94 val exec_id = the_default 0 (Position.parse_id pos);
95 val group = Future.worker_subgroup ();
96 val _ = change_state (apsnd (fn execs =>
97 (case Inttab.lookup execs exec_id of
98 SOME (groups, prints) =>
99 Inttab.update (exec_id, (group :: groups, prints)) execs
100 | NONE => raise Fail (unregistered exec_id))));
103 (singleton o Future.forks)
104 {name = name, group = SOME group, deps = [], pri = pri, interrupts = false}
107 val task = the (Future.worker_task ());
108 val _ = status task [Markup.running];
110 Exn.capture (Future.interruptible_task e) ()
111 |> Future.identify_result pos
112 |> Exn.map_exn Runtime.thread_context;
113 val _ = status task [Markup.joined];
117 (status task [Markup.failed];
118 status task [Markup.finished];
119 Output.report [Markup.markup_only (Markup.bad ())];
120 if exec_id = 0 then ()
121 else List.app (Future.error_message pos) (Runtime.exn_messages exn))
123 status task [Markup.finished])
124 in Exn.release result end);
126 val _ = status (Future.task_of future) [Markup.forked];
132 fun print ({name, pos, pri}: params) e =
133 change_state (apsnd (fn execs =>
135 val exec_id = the_default 0 (Position.parse_id pos);
136 val print = {name = name, pri = pri, body = e};
138 (case Inttab.lookup execs exec_id of
139 SOME (groups, prints) => Inttab.update (exec_id, (groups, print :: prints)) execs
140 | NONE => raise Fail (unregistered exec_id))
143 fun fork_prints exec_id =
144 (case Inttab.lookup (#2 (get_state ())) exec_id of
146 if null prints orelse null (tl prints) orelse not (Multithreading.enabled ())
147 then List.app (fn {body, ...} => body ()) (rev prints)
149 let val pos = Position.thread_data () in
150 List.app (fn {name, pri, body} =>
151 ignore (fork {name = name, pos = pos, pri = pri} body)) (rev prints)
153 | NONE => raise Fail (unregistered exec_id));
159 (change_state o apsnd) (fn execs =>
161 val execs' = fold Inttab.delete_safe exec_ids execs;
163 (execs', ()) |-> Inttab.fold (fn (exec_id, (groups, _)) => fn () =>
164 if Inttab.defined execs' exec_id then ()
165 else groups |> List.app (fn group =>
166 if Task_Queue.is_canceled group then ()
167 else raise Fail ("Attempt to purge valid execution: " ^ Document_ID.print exec_id)));
171 change_state_result (fn (_, execs) =>
172 let val groups = Inttab.fold (append o #1 o #2) execs []
173 in (groups, init_state) end);
177 (case maps Task_Queue.group_status (reset ()) of
179 | exns => raise Par_Exn.make exns));