src/Pure/PIDE/execution.ML
author wenzelm
Sun Mar 10 14:19:30 2019 +0100 (4 months ago ago)
changeset 70070 be04e9a053a7
parent 68880 8b98db8fd183
permissions -rw-r--r--
markup and document markers for some meta data from "Dublin Core Metadata Element Set";
wenzelm@52605
     1
(*  Title:      Pure/PIDE/execution.ML
wenzelm@52596
     2
    Author:     Makarius
wenzelm@52596
     3
wenzelm@52606
     4
Global management of execution.  Unique running execution serves as
wenzelm@53192
     5
barrier for further exploration of forked command execs.
wenzelm@52596
     6
*)
wenzelm@52596
     7
wenzelm@52605
     8
signature EXECUTION =
wenzelm@52596
     9
sig
wenzelm@52606
    10
  val start: unit -> Document_ID.execution
wenzelm@52606
    11
  val discontinue: unit -> unit
wenzelm@52606
    12
  val is_running: Document_ID.execution -> bool
wenzelm@68355
    13
  val active_tasks: string -> Future.task list
wenzelm@68355
    14
  val worker_task_active: bool -> string -> unit
wenzelm@52784
    15
  val is_running_exec: Document_ID.exec -> bool
wenzelm@53375
    16
  val running: Document_ID.execution -> Document_ID.exec -> Future.group list -> bool
wenzelm@66373
    17
  val snapshot: Document_ID.exec list -> Future.task list
wenzelm@66374
    18
  val join: Document_ID.exec list -> unit
wenzelm@53192
    19
  val peek: Document_ID.exec -> Future.group list
wenzelm@52655
    20
  val cancel: Document_ID.exec -> unit
wenzelm@56292
    21
  type params = {name: string, pos: Position.T, pri: int}
wenzelm@56292
    22
  val fork: params -> (unit -> 'a) -> 'a future
wenzelm@56305
    23
  val print: params -> (unit -> unit) -> unit
wenzelm@56292
    24
  val fork_prints: Document_ID.exec -> unit
wenzelm@53192
    25
  val purge: Document_ID.exec list -> unit
wenzelm@53192
    26
  val reset: unit -> Future.group list
wenzelm@53192
    27
  val shutdown: unit -> unit
wenzelm@52596
    28
end;
wenzelm@52596
    29
wenzelm@52605
    30
structure Execution: EXECUTION =
wenzelm@52596
    31
struct
wenzelm@52596
    32
wenzelm@52787
    33
(* global state *)
wenzelm@52787
    34
wenzelm@56292
    35
type print = {name: string, pri: int, body: unit -> unit};
wenzelm@68354
    36
type execs = (Future.group list * print list) (*active forks, prints*) Inttab.table;
wenzelm@68354
    37
val init_execs: execs = Inttab.make [(Document_ID.none, ([], []))];
wenzelm@68354
    38
wenzelm@68354
    39
datatype state =
wenzelm@68354
    40
  State of
wenzelm@68354
    41
   {execution_id: Document_ID.execution,  (*overall document execution*)
wenzelm@68355
    42
    nodes: Future.task list Symtab.table,  (*active nodes*)
wenzelm@68354
    43
    execs: execs};  (*running command execs*)
wenzelm@68354
    44
wenzelm@68355
    45
fun make_state (execution_id, nodes, execs) =
wenzelm@68355
    46
  State {execution_id = execution_id, nodes = nodes, execs = execs};
wenzelm@52787
    47
wenzelm@68354
    48
local
wenzelm@68354
    49
  val state =
wenzelm@68355
    50
    Synchronized.var "Execution.state" (make_state (Document_ID.none, Symtab.empty, init_execs));
wenzelm@68354
    51
in
wenzelm@68354
    52
wenzelm@68354
    53
fun get_state () = let val State args = Synchronized.value state in args end;
wenzelm@52602
    54
wenzelm@68354
    55
fun change_state_result f =
wenzelm@68355
    56
  Synchronized.change_result state (fn (State {execution_id, nodes, execs}) =>
wenzelm@68355
    57
    let val (result, args') = f (execution_id, nodes, execs)
wenzelm@68354
    58
    in (result, make_state args') end);
wenzelm@68354
    59
wenzelm@68354
    60
fun change_state f =
wenzelm@68355
    61
  Synchronized.change state (fn (State {execution_id, nodes, execs}) =>
wenzelm@68355
    62
    make_state (f (execution_id, nodes, execs)));
wenzelm@68354
    63
wenzelm@68354
    64
end;
wenzelm@56291
    65
wenzelm@56291
    66
fun unregistered exec_id = "Unregistered execution: " ^ Document_ID.print exec_id;
wenzelm@53192
    67
wenzelm@52604
    68
wenzelm@52606
    69
(* unique running execution *)
wenzelm@52604
    70
wenzelm@52606
    71
fun start () =
wenzelm@52602
    72
  let
wenzelm@52606
    73
    val execution_id = Document_ID.make ();
wenzelm@68355
    74
    val _ = change_state (fn (_, nodes, execs) => (execution_id, nodes, execs));
wenzelm@52606
    75
  in execution_id end;
wenzelm@52604
    76
wenzelm@68355
    77
fun discontinue () =
wenzelm@68355
    78
  change_state (fn (_, nodes, execs) => (Document_ID.none, nodes, execs));
wenzelm@52606
    79
wenzelm@68354
    80
fun is_running execution_id =
wenzelm@68354
    81
  execution_id = #execution_id (get_state ());
wenzelm@52604
    82
wenzelm@52604
    83
wenzelm@68355
    84
(* active nodes *)
wenzelm@68355
    85
wenzelm@68355
    86
fun active_tasks node_name =
wenzelm@68355
    87
  Symtab.lookup_list (#nodes (get_state ())) node_name;
wenzelm@68355
    88
wenzelm@68355
    89
fun worker_task_active insert node_name =
wenzelm@68355
    90
  change_state (fn (execution_id, nodes, execs) =>
wenzelm@68355
    91
    let
wenzelm@68355
    92
      val nodes' = nodes
wenzelm@68355
    93
        |> (if insert then Symtab.insert_list else Symtab.remove_list)
wenzelm@68355
    94
          Task_Queue.eq_task (node_name, the (Future.worker_task ()));
wenzelm@68355
    95
    in (execution_id, nodes', execs) end);
wenzelm@68355
    96
wenzelm@68355
    97
wenzelm@66374
    98
(* running execs *)
wenzelm@52604
    99
wenzelm@52784
   100
fun is_running_exec exec_id =
wenzelm@68354
   101
  Inttab.defined (#execs (get_state ())) exec_id;
wenzelm@52784
   102
wenzelm@53375
   103
fun running execution_id exec_id groups =
wenzelm@68355
   104
  change_state_result (fn (execution_id', nodes, execs) =>
wenzelm@56291
   105
    let
wenzelm@59348
   106
      val ok = execution_id = execution_id' andalso not (Inttab.defined execs exec_id);
wenzelm@59348
   107
      val execs' = execs |> ok ? Inttab.update (exec_id, (groups, []));
wenzelm@68355
   108
    in (ok, (execution_id', nodes, execs')) end);
wenzelm@53192
   109
wenzelm@66374
   110
wenzelm@66374
   111
(* exec groups and tasks *)
wenzelm@66374
   112
wenzelm@68354
   113
fun exec_groups (execs: execs) exec_id =
wenzelm@66373
   114
  (case Inttab.lookup execs exec_id of
wenzelm@56291
   115
    SOME (groups, _) => groups
wenzelm@56291
   116
  | NONE => []);
wenzelm@53192
   117
wenzelm@68380
   118
fun snapshot [] = []
wenzelm@68380
   119
  | snapshot exec_ids =
wenzelm@68380
   120
      change_state_result
wenzelm@68380
   121
        (`(fn (_, _, execs) => Future.snapshot (maps (exec_groups execs) exec_ids)));
wenzelm@66373
   122
wenzelm@66374
   123
fun join exec_ids =
wenzelm@66374
   124
  (case snapshot exec_ids of
wenzelm@66374
   125
    [] => ()
wenzelm@66374
   126
  | tasks =>
wenzelm@66374
   127
      ((singleton o Future.forks)
wenzelm@66374
   128
        {name = "Execution.join", group = SOME (Future.new_group NONE),
wenzelm@66374
   129
          deps = tasks, pri = 0, interrupts = false} I
wenzelm@66374
   130
      |> Future.join; join exec_ids));
wenzelm@66374
   131
wenzelm@68354
   132
fun peek exec_id = exec_groups (#execs (get_state ())) exec_id;
wenzelm@66373
   133
wenzelm@53192
   134
fun cancel exec_id = List.app Future.cancel_group (peek exec_id);
wenzelm@53192
   135
wenzelm@53192
   136
wenzelm@53192
   137
(* fork *)
wenzelm@53192
   138
wenzelm@53192
   139
fun status task markups =
wenzelm@56296
   140
  let
wenzelm@56296
   141
    val props =
wenzelm@56296
   142
      if ! Multithreading.trace >= 2
wenzelm@56296
   143
      then [(Markup.taskN, Task_Queue.str_of_task task)] else [];
wenzelm@56296
   144
  in Output.status (implode (map (Markup.markup_only o Markup.properties props) markups)) end;
wenzelm@53192
   145
wenzelm@56292
   146
type params = {name: string, pos: Position.T, pri: int};
wenzelm@56292
   147
wenzelm@56292
   148
fun fork ({name, pos, pri}: params) e =
wenzelm@62923
   149
  Thread_Attributes.uninterruptible (fn _ => Position.setmp_thread_data pos (fn () =>
wenzelm@53192
   150
    let
wenzelm@53192
   151
      val exec_id = the_default 0 (Position.parse_id pos);
wenzelm@53192
   152
      val group = Future.worker_subgroup ();
wenzelm@68355
   153
      val _ = change_state (fn (execution_id, nodes, execs) =>
wenzelm@56291
   154
        (case Inttab.lookup execs exec_id of
wenzelm@56291
   155
          SOME (groups, prints) =>
wenzelm@68355
   156
            let val execs' = Inttab.update (exec_id, (group :: groups, prints)) execs
wenzelm@68355
   157
            in (execution_id, nodes, execs') end
wenzelm@68355
   158
        | NONE => raise Fail (unregistered exec_id)));
wenzelm@52655
   159
wenzelm@53192
   160
      val future =
wenzelm@53192
   161
        (singleton o Future.forks)
wenzelm@53192
   162
          {name = name, group = SOME group, deps = [], pri = pri, interrupts = false}
wenzelm@53192
   163
          (fn () =>
wenzelm@53192
   164
            let
wenzelm@53192
   165
              val task = the (Future.worker_task ());
wenzelm@53192
   166
              val _ = status task [Markup.running];
wenzelm@53192
   167
              val result =
wenzelm@53192
   168
                Exn.capture (Future.interruptible_task e) ()
wenzelm@61077
   169
                |> Future.identify_result pos
wenzelm@61077
   170
                |> Exn.map_exn Runtime.thread_context;
wenzelm@68880
   171
              val errors =
wenzelm@68880
   172
                Exn.capture (fn () =>
wenzelm@68880
   173
                  (case result of
wenzelm@68880
   174
                    Exn.Exn exn =>
wenzelm@68880
   175
                     (status task [Markup.failed];
wenzelm@68880
   176
                      status task [Markup.finished];
wenzelm@68880
   177
                      Output.report [Markup.markup_only (Markup.bad ())];
wenzelm@68880
   178
                      if exec_id = 0 then ()
wenzelm@68880
   179
                      else List.app (Future.error_message pos) (Runtime.exn_messages exn))
wenzelm@68880
   180
                  | Exn.Res _ =>
wenzelm@68880
   181
                      status task [Markup.finished])) ();
wenzelm@54646
   182
              val _ = status task [Markup.joined];
wenzelm@68880
   183
            in Exn.release errors; Exn.release result end);
wenzelm@54646
   184
wenzelm@53192
   185
      val _ = status (Future.task_of future) [Markup.forked];
wenzelm@53192
   186
    in future end)) ();
wenzelm@52604
   187
wenzelm@53192
   188
wenzelm@56291
   189
(* print *)
wenzelm@56291
   190
wenzelm@56305
   191
fun print ({name, pos, pri}: params) e =
wenzelm@68355
   192
  change_state (fn (execution_id, nodes, execs) =>
wenzelm@56291
   193
    let
wenzelm@56292
   194
      val exec_id = the_default 0 (Position.parse_id pos);
wenzelm@56305
   195
      val print = {name = name, pri = pri, body = e};
wenzelm@56291
   196
    in
wenzelm@56291
   197
      (case Inttab.lookup execs exec_id of
wenzelm@68355
   198
        SOME (groups, prints) =>
wenzelm@68355
   199
          let val execs' = Inttab.update (exec_id, (groups, print :: prints)) execs
wenzelm@68355
   200
          in (execution_id, nodes, execs') end
wenzelm@56291
   201
      | NONE => raise Fail (unregistered exec_id))
wenzelm@68355
   202
    end);
wenzelm@56291
   203
wenzelm@56292
   204
fun fork_prints exec_id =
wenzelm@68354
   205
  (case Inttab.lookup (#execs (get_state ())) exec_id of
wenzelm@56291
   206
    SOME (_, prints) =>
wenzelm@68132
   207
      if Future.relevant prints then
wenzelm@56296
   208
        let val pos = Position.thread_data () in
wenzelm@56292
   209
          List.app (fn {name, pri, body} =>
wenzelm@56292
   210
            ignore (fork {name = name, pos = pos, pri = pri} body)) (rev prints)
wenzelm@56292
   211
        end
wenzelm@67661
   212
      else List.app (fn {body, ...} => body ()) (rev prints)
wenzelm@56291
   213
  | NONE => raise Fail (unregistered exec_id));
wenzelm@56291
   214
wenzelm@56291
   215
wenzelm@53192
   216
(* cleanup *)
wenzelm@53192
   217
wenzelm@53192
   218
fun purge exec_ids =
wenzelm@68355
   219
  change_state (fn (execution_id, nodes, execs) =>
wenzelm@53192
   220
    let
wenzelm@53192
   221
      val execs' = fold Inttab.delete_safe exec_ids execs;
wenzelm@53192
   222
      val () =
wenzelm@56291
   223
        (execs', ()) |-> Inttab.fold (fn (exec_id, (groups, _)) => fn () =>
wenzelm@53192
   224
          if Inttab.defined execs' exec_id then ()
wenzelm@53192
   225
          else groups |> List.app (fn group =>
wenzelm@53192
   226
            if Task_Queue.is_canceled group then ()
wenzelm@53375
   227
            else raise Fail ("Attempt to purge valid execution: " ^ Document_ID.print exec_id)));
wenzelm@68355
   228
    in (execution_id, nodes, execs') end);
wenzelm@53192
   229
wenzelm@53192
   230
fun reset () =
wenzelm@68355
   231
  change_state_result (fn (_, _, execs) =>
wenzelm@56291
   232
    let val groups = Inttab.fold (append o #1 o #2) execs []
wenzelm@68355
   233
    in (groups, (Document_ID.none, Symtab.empty, init_execs)) end);
wenzelm@53192
   234
wenzelm@53192
   235
fun shutdown () =
wenzelm@53192
   236
  (Future.shutdown ();
wenzelm@53192
   237
    (case maps Task_Queue.group_status (reset ()) of
wenzelm@53192
   238
      [] => ()
wenzelm@53192
   239
    | exns => raise Par_Exn.make exns));
wenzelm@53192
   240
wenzelm@53192
   241
end;