src/Pure/PIDE/execution.ML
author wenzelm
Sun Mar 10 14:19:30 2019 +0100 (4 months ago ago)
changeset 70070 be04e9a053a7
parent 68880 8b98db8fd183
permissions -rw-r--r--
markup and document markers for some meta data from "Dublin Core Metadata Element Set";
     1 (*  Title:      Pure/PIDE/execution.ML
     2     Author:     Makarius
     3 
     4 Global management of execution.  Unique running execution serves as
     5 barrier for further exploration of forked command execs.
     6 *)
     7 
     8 signature EXECUTION =
     9 sig
    10   val start: unit -> Document_ID.execution
    11   val discontinue: unit -> unit
    12   val is_running: Document_ID.execution -> bool
    13   val active_tasks: string -> Future.task list
    14   val worker_task_active: bool -> string -> unit
    15   val is_running_exec: Document_ID.exec -> bool
    16   val running: Document_ID.execution -> Document_ID.exec -> Future.group list -> bool
    17   val snapshot: Document_ID.exec list -> Future.task list
    18   val join: Document_ID.exec list -> unit
    19   val peek: Document_ID.exec -> Future.group list
    20   val cancel: Document_ID.exec -> unit
    21   type params = {name: string, pos: Position.T, pri: int}
    22   val fork: params -> (unit -> 'a) -> 'a future
    23   val print: params -> (unit -> unit) -> unit
    24   val fork_prints: Document_ID.exec -> unit
    25   val purge: Document_ID.exec list -> unit
    26   val reset: unit -> Future.group list
    27   val shutdown: unit -> unit
    28 end;
    29 
    30 structure Execution: EXECUTION =
    31 struct
    32 
    33 (* global state *)
    34 
    35 type print = {name: string, pri: int, body: unit -> unit};
    36 type execs = (Future.group list * print list) (*active forks, prints*) Inttab.table;
    37 val init_execs: execs = Inttab.make [(Document_ID.none, ([], []))];
    38 
    39 datatype state =
    40   State of
    41    {execution_id: Document_ID.execution,  (*overall document execution*)
    42     nodes: Future.task list Symtab.table,  (*active nodes*)
    43     execs: execs};  (*running command execs*)
    44 
    45 fun make_state (execution_id, nodes, execs) =
    46   State {execution_id = execution_id, nodes = nodes, execs = execs};
    47 
    48 local
    49   val state =
    50     Synchronized.var "Execution.state" (make_state (Document_ID.none, Symtab.empty, init_execs));
    51 in
    52 
    53 fun get_state () = let val State args = Synchronized.value state in args end;
    54 
    55 fun change_state_result f =
    56   Synchronized.change_result state (fn (State {execution_id, nodes, execs}) =>
    57     let val (result, args') = f (execution_id, nodes, execs)
    58     in (result, make_state args') end);
    59 
    60 fun change_state f =
    61   Synchronized.change state (fn (State {execution_id, nodes, execs}) =>
    62     make_state (f (execution_id, nodes, execs)));
    63 
    64 end;
    65 
    66 fun unregistered exec_id = "Unregistered execution: " ^ Document_ID.print exec_id;
    67 
    68 
    69 (* unique running execution *)
    70 
    71 fun start () =
    72   let
    73     val execution_id = Document_ID.make ();
    74     val _ = change_state (fn (_, nodes, execs) => (execution_id, nodes, execs));
    75   in execution_id end;
    76 
    77 fun discontinue () =
    78   change_state (fn (_, nodes, execs) => (Document_ID.none, nodes, execs));
    79 
    80 fun is_running execution_id =
    81   execution_id = #execution_id (get_state ());
    82 
    83 
    84 (* active nodes *)
    85 
    86 fun active_tasks node_name =
    87   Symtab.lookup_list (#nodes (get_state ())) node_name;
    88 
    89 fun worker_task_active insert node_name =
    90   change_state (fn (execution_id, nodes, execs) =>
    91     let
    92       val nodes' = nodes
    93         |> (if insert then Symtab.insert_list else Symtab.remove_list)
    94           Task_Queue.eq_task (node_name, the (Future.worker_task ()));
    95     in (execution_id, nodes', execs) end);
    96 
    97 
    98 (* running execs *)
    99 
   100 fun is_running_exec exec_id =
   101   Inttab.defined (#execs (get_state ())) exec_id;
   102 
   103 fun running execution_id exec_id groups =
   104   change_state_result (fn (execution_id', nodes, execs) =>
   105     let
   106       val ok = execution_id = execution_id' andalso not (Inttab.defined execs exec_id);
   107       val execs' = execs |> ok ? Inttab.update (exec_id, (groups, []));
   108     in (ok, (execution_id', nodes, execs')) end);
   109 
   110 
   111 (* exec groups and tasks *)
   112 
   113 fun exec_groups (execs: execs) exec_id =
   114   (case Inttab.lookup execs exec_id of
   115     SOME (groups, _) => groups
   116   | NONE => []);
   117 
   118 fun snapshot [] = []
   119   | snapshot exec_ids =
   120       change_state_result
   121         (`(fn (_, _, execs) => Future.snapshot (maps (exec_groups execs) exec_ids)));
   122 
   123 fun join exec_ids =
   124   (case snapshot exec_ids of
   125     [] => ()
   126   | tasks =>
   127       ((singleton o Future.forks)
   128         {name = "Execution.join", group = SOME (Future.new_group NONE),
   129           deps = tasks, pri = 0, interrupts = false} I
   130       |> Future.join; join exec_ids));
   131 
   132 fun peek exec_id = exec_groups (#execs (get_state ())) exec_id;
   133 
   134 fun cancel exec_id = List.app Future.cancel_group (peek exec_id);
   135 
   136 
   137 (* fork *)
   138 
   139 fun status task markups =
   140   let
   141     val props =
   142       if ! Multithreading.trace >= 2
   143       then [(Markup.taskN, Task_Queue.str_of_task task)] else [];
   144   in Output.status (implode (map (Markup.markup_only o Markup.properties props) markups)) end;
   145 
   146 type params = {name: string, pos: Position.T, pri: int};
   147 
   148 fun fork ({name, pos, pri}: params) e =
   149   Thread_Attributes.uninterruptible (fn _ => Position.setmp_thread_data pos (fn () =>
   150     let
   151       val exec_id = the_default 0 (Position.parse_id pos);
   152       val group = Future.worker_subgroup ();
   153       val _ = change_state (fn (execution_id, nodes, execs) =>
   154         (case Inttab.lookup execs exec_id of
   155           SOME (groups, prints) =>
   156             let val execs' = Inttab.update (exec_id, (group :: groups, prints)) execs
   157             in (execution_id, nodes, execs') end
   158         | NONE => raise Fail (unregistered exec_id)));
   159 
   160       val future =
   161         (singleton o Future.forks)
   162           {name = name, group = SOME group, deps = [], pri = pri, interrupts = false}
   163           (fn () =>
   164             let
   165               val task = the (Future.worker_task ());
   166               val _ = status task [Markup.running];
   167               val result =
   168                 Exn.capture (Future.interruptible_task e) ()
   169                 |> Future.identify_result pos
   170                 |> Exn.map_exn Runtime.thread_context;
   171               val errors =
   172                 Exn.capture (fn () =>
   173                   (case result of
   174                     Exn.Exn exn =>
   175                      (status task [Markup.failed];
   176                       status task [Markup.finished];
   177                       Output.report [Markup.markup_only (Markup.bad ())];
   178                       if exec_id = 0 then ()
   179                       else List.app (Future.error_message pos) (Runtime.exn_messages exn))
   180                   | Exn.Res _ =>
   181                       status task [Markup.finished])) ();
   182               val _ = status task [Markup.joined];
   183             in Exn.release errors; Exn.release result end);
   184 
   185       val _ = status (Future.task_of future) [Markup.forked];
   186     in future end)) ();
   187 
   188 
   189 (* print *)
   190 
   191 fun print ({name, pos, pri}: params) e =
   192   change_state (fn (execution_id, nodes, execs) =>
   193     let
   194       val exec_id = the_default 0 (Position.parse_id pos);
   195       val print = {name = name, pri = pri, body = e};
   196     in
   197       (case Inttab.lookup execs exec_id of
   198         SOME (groups, prints) =>
   199           let val execs' = Inttab.update (exec_id, (groups, print :: prints)) execs
   200           in (execution_id, nodes, execs') end
   201       | NONE => raise Fail (unregistered exec_id))
   202     end);
   203 
   204 fun fork_prints exec_id =
   205   (case Inttab.lookup (#execs (get_state ())) exec_id of
   206     SOME (_, prints) =>
   207       if Future.relevant prints then
   208         let val pos = Position.thread_data () in
   209           List.app (fn {name, pri, body} =>
   210             ignore (fork {name = name, pos = pos, pri = pri} body)) (rev prints)
   211         end
   212       else List.app (fn {body, ...} => body ()) (rev prints)
   213   | NONE => raise Fail (unregistered exec_id));
   214 
   215 
   216 (* cleanup *)
   217 
   218 fun purge exec_ids =
   219   change_state (fn (execution_id, nodes, execs) =>
   220     let
   221       val execs' = fold Inttab.delete_safe exec_ids execs;
   222       val () =
   223         (execs', ()) |-> Inttab.fold (fn (exec_id, (groups, _)) => fn () =>
   224           if Inttab.defined execs' exec_id then ()
   225           else groups |> List.app (fn group =>
   226             if Task_Queue.is_canceled group then ()
   227             else raise Fail ("Attempt to purge valid execution: " ^ Document_ID.print exec_id)));
   228     in (execution_id, nodes, execs') end);
   229 
   230 fun reset () =
   231   change_state_result (fn (_, _, execs) =>
   232     let val groups = Inttab.fold (append o #1 o #2) execs []
   233     in (groups, (Document_ID.none, Symtab.empty, init_execs)) end);
   234 
   235 fun shutdown () =
   236   (Future.shutdown ();
   237     (case maps Task_Queue.group_status (reset ()) of
   238       [] => ()
   239     | exns => raise Par_Exn.make exns));
   240 
   241 end;