13 val is_running_exec: Document_ID.exec -> bool |
13 val is_running_exec: Document_ID.exec -> bool |
14 val running: Document_ID.execution -> Document_ID.exec -> Future.group list -> bool |
14 val running: Document_ID.execution -> Document_ID.exec -> Future.group list -> bool |
15 val peek: Document_ID.exec -> Future.group list |
15 val peek: Document_ID.exec -> Future.group list |
16 val cancel: Document_ID.exec -> unit |
16 val cancel: Document_ID.exec -> unit |
17 val terminate: Document_ID.exec -> unit |
17 val terminate: Document_ID.exec -> unit |
18 val fork: {name: string, pos: Position.T, pri: int} -> (unit -> 'a) -> 'a future |
18 type params = {name: string, pos: Position.T, pri: int} |
19 val print: (serial -> unit) -> unit |
19 val fork: params -> (unit -> 'a) -> 'a future |
20 val apply_prints: Document_ID.exec -> unit |
20 val print: params -> (serial -> unit) -> unit |
|
21 val fork_prints: Document_ID.exec -> unit |
21 val purge: Document_ID.exec list -> unit |
22 val purge: Document_ID.exec list -> unit |
22 val reset: unit -> Future.group list |
23 val reset: unit -> Future.group list |
23 val shutdown: unit -> unit |
24 val shutdown: unit -> unit |
24 end; |
25 end; |
25 |
26 |
26 structure Execution: EXECUTION = |
27 structure Execution: EXECUTION = |
27 struct |
28 struct |
28 |
29 |
29 (* global state *) |
30 (* global state *) |
30 |
31 |
31 type exec_state = Future.group list * (unit -> unit) list; (*active forks, prints*) |
32 type print = {name: string, pri: int, body: unit -> unit}; |
|
33 type exec_state = Future.group list * print list; (*active forks, prints*) |
32 type state = |
34 type state = |
33 Document_ID.execution * (*overall document execution*) |
35 Document_ID.execution * (*overall document execution*) |
34 exec_state Inttab.table; (*running command execs*) |
36 exec_state Inttab.table; (*running command execs*) |
35 |
37 |
36 val init_state: state = (Document_ID.none, Inttab.make [(Document_ID.none, ([], []))]); |
38 val init_state: state = (Document_ID.none, Inttab.make [(Document_ID.none, ([], []))]); |
86 |
88 |
87 fun status task markups = |
89 fun status task markups = |
88 let val props = Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)] |
90 let val props = Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)] |
89 in Output.status (implode (map (Markup.markup_only o props) markups)) end; |
91 in Output.status (implode (map (Markup.markup_only o props) markups)) end; |
90 |
92 |
91 fun fork {name, pos, pri} e = |
93 type params = {name: string, pos: Position.T, pri: int}; |
|
94 |
|
95 fun fork ({name, pos, pri}: params) e = |
92 uninterruptible (fn _ => Position.setmp_thread_data pos (fn () => |
96 uninterruptible (fn _ => Position.setmp_thread_data pos (fn () => |
93 let |
97 let |
94 val exec_id = the_default 0 (Position.parse_id pos); |
98 val exec_id = the_default 0 (Position.parse_id pos); |
95 val group = Future.worker_subgroup (); |
99 val group = Future.worker_subgroup (); |
96 val _ = change_state (apsnd (fn execs => |
100 val _ = change_state (apsnd (fn execs => |
126 in future end)) (); |
130 in future end)) (); |
127 |
131 |
128 |
132 |
129 (* print *) |
133 (* print *) |
130 |
134 |
131 fun print pr = |
135 fun print ({name, pos, pri}: params) pr = |
132 change_state (apsnd (fn execs => |
136 change_state (apsnd (fn execs => |
133 let |
137 let |
134 val exec_id = the_default 0 (Position.parse_id (Position.thread_data ())); |
138 val exec_id = the_default 0 (Position.parse_id pos); |
135 val i = serial (); |
139 val i = serial (); |
|
140 val print = {name = name, pri = pri, body = fn () => pr i}; |
136 in |
141 in |
137 (case Inttab.lookup execs exec_id of |
142 (case Inttab.lookup execs exec_id of |
138 SOME (groups, prints) => |
143 SOME (groups, prints) => Inttab.update (exec_id, (groups, print :: prints)) execs |
139 Inttab.update (exec_id, (groups, (fn () => pr i) :: prints)) execs |
|
140 | NONE => raise Fail (unregistered exec_id)) |
144 | NONE => raise Fail (unregistered exec_id)) |
141 end)); |
145 end)); |
142 |
146 |
143 fun apply_prints exec_id = |
147 fun fork_prints exec_id = |
144 (case Inttab.lookup (#2 (get_state ())) exec_id of |
148 (case Inttab.lookup (#2 (get_state ())) exec_id of |
145 SOME (_, prints) => |
149 SOME (_, prints) => |
146 if null prints orelse null (tl prints) orelse not (Multithreading.enabled ()) |
150 if null prints orelse null (tl prints) orelse not (Multithreading.enabled ()) |
147 then List.app (fn e => e ()) (rev prints) |
151 then List.app (fn {body, ...} => body ()) (rev prints) |
148 else |
152 else |
149 let |
153 let |
150 val pos = Position.thread_data (); |
154 val pos = Position.thread_data (); |
151 val pri = |
155 val pri = |
152 (case Future.worker_task () of |
156 (case Future.worker_task () of |
153 SOME task => Task_Queue.pri_of_task task |
157 SOME task => Task_Queue.pri_of_task task |
154 | NONE => 0); |
158 | NONE => 0); |
155 val futures = map (fork {name = "Execution.print", pos = pos, pri = pri}) (rev prints); |
159 in |
156 in ignore (Future.joins futures) end |
160 List.app (fn {name, pri, body} => |
|
161 ignore (fork {name = name, pos = pos, pri = pri} body)) (rev prints) |
|
162 end |
157 | NONE => raise Fail (unregistered exec_id)); |
163 | NONE => raise Fail (unregistered exec_id)); |
158 |
164 |
159 |
165 |
160 (* cleanup *) |
166 (* cleanup *) |
161 |
167 |