src/Pure/PIDE/execution.ML
author wenzelm
Thu Mar 27 17:12:40 2014 +0100 (2014-03-27 ago)
changeset 56303 4cc3f4db3447
parent 56297 3925634718fb
child 56304 40274e4f5ebf
permissions -rw-r--r--
clarified Isabelle/ML bootstrap, such that Execution does not require ML_Compiler;
wenzelm@52605
     1
(*  Title:      Pure/PIDE/execution.ML
wenzelm@52596
     2
    Author:     Makarius
wenzelm@52596
     3
wenzelm@52606
     4
Global management of execution.  Unique running execution serves as
wenzelm@53192
     5
barrier for further exploration of forked command execs.
wenzelm@52596
     6
*)
wenzelm@52596
     7
wenzelm@52605
     8
signature EXECUTION =
wenzelm@52596
     9
sig
wenzelm@52606
    10
  val start: unit -> Document_ID.execution
wenzelm@52606
    11
  val discontinue: unit -> unit
wenzelm@52606
    12
  val is_running: Document_ID.execution -> bool
wenzelm@52784
    13
  val is_running_exec: Document_ID.exec -> bool
wenzelm@53375
    14
  val running: Document_ID.execution -> Document_ID.exec -> Future.group list -> bool
wenzelm@53192
    15
  val peek: Document_ID.exec -> Future.group list
wenzelm@52655
    16
  val cancel: Document_ID.exec -> unit
wenzelm@52655
    17
  val terminate: Document_ID.exec -> unit
wenzelm@56292
    18
  type params = {name: string, pos: Position.T, pri: int}
wenzelm@56292
    19
  val fork: params -> (unit -> 'a) -> 'a future
wenzelm@56292
    20
  val print: params -> (serial -> unit) -> unit
wenzelm@56297
    21
  val print_report: string -> unit
wenzelm@56292
    22
  val fork_prints: Document_ID.exec -> unit
wenzelm@53192
    23
  val purge: Document_ID.exec list -> unit
wenzelm@53192
    24
  val reset: unit -> Future.group list
wenzelm@53192
    25
  val shutdown: unit -> unit
wenzelm@52596
    26
end;
wenzelm@52596
    27
wenzelm@52605
    28
structure Execution: EXECUTION =
wenzelm@52596
    29
struct
wenzelm@52596
    30
wenzelm@52787
    31
(* global state *)
wenzelm@52787
    32
wenzelm@56292
    33
type print = {name: string, pri: int, body: unit -> unit};
wenzelm@56292
    34
type exec_state = Future.group list * print list;  (*active forks, prints*)
wenzelm@56291
    35
type state =
wenzelm@56291
    36
  Document_ID.execution * (*overall document execution*)
wenzelm@56291
    37
  exec_state Inttab.table;  (*running command execs*)
wenzelm@52787
    38
wenzelm@56291
    39
val init_state: state = (Document_ID.none, Inttab.make [(Document_ID.none, ([], []))]);
wenzelm@52787
    40
val state = Synchronized.var "Execution.state" init_state;
wenzelm@52602
    41
wenzelm@56291
    42
fun get_state () = Synchronized.value state;
wenzelm@56291
    43
fun change_state_result f = Synchronized.change_result state f;
wenzelm@56291
    44
fun change_state f = Synchronized.change state f;
wenzelm@56291
    45
wenzelm@56291
    46
fun unregistered exec_id = "Unregistered execution: " ^ Document_ID.print exec_id;
wenzelm@53192
    47
wenzelm@52604
    48
wenzelm@52606
    49
(* unique running execution *)
wenzelm@52604
    50
wenzelm@52606
    51
fun start () =
wenzelm@52602
    52
  let
wenzelm@52606
    53
    val execution_id = Document_ID.make ();
wenzelm@53192
    54
    val _ = change_state (apfst (K execution_id));
wenzelm@52606
    55
  in execution_id end;
wenzelm@52604
    56
wenzelm@53192
    57
fun discontinue () = change_state (apfst (K Document_ID.none));
wenzelm@52606
    58
wenzelm@56291
    59
fun is_running execution_id = execution_id = #1 (get_state ());
wenzelm@52604
    60
wenzelm@52604
    61
wenzelm@53192
    62
(* execs *)
wenzelm@52604
    63
wenzelm@52784
    64
fun is_running_exec exec_id =
wenzelm@56291
    65
  Inttab.defined (#2 (get_state ())) exec_id;
wenzelm@52784
    66
wenzelm@53375
    67
fun running execution_id exec_id groups =
wenzelm@56291
    68
  change_state_result (fn (execution_id', execs) =>
wenzelm@56291
    69
    let
wenzelm@56291
    70
      val continue = execution_id = execution_id';
wenzelm@56291
    71
      val execs' =
wenzelm@56291
    72
        if continue then
wenzelm@56291
    73
          Inttab.update_new (exec_id, (groups, [])) execs
wenzelm@56291
    74
            handle Inttab.DUP dup =>
wenzelm@56291
    75
              raise Fail ("Execution already registered: " ^ Document_ID.print dup)
wenzelm@56291
    76
        else execs;
wenzelm@56291
    77
    in (continue, (execution_id', execs')) end);
wenzelm@53192
    78
wenzelm@53192
    79
fun peek exec_id =
wenzelm@56291
    80
  (case Inttab.lookup (#2 (get_state ())) exec_id of
wenzelm@56291
    81
    SOME (groups, _) => groups
wenzelm@56291
    82
  | NONE => []);
wenzelm@53192
    83
wenzelm@53192
    84
fun cancel exec_id = List.app Future.cancel_group (peek exec_id);
wenzelm@53192
    85
fun terminate exec_id = List.app Future.terminate (peek exec_id);
wenzelm@53192
    86
wenzelm@53192
    87
wenzelm@53192
    88
(* fork *)
wenzelm@53192
    89
wenzelm@53192
    90
fun status task markups =
wenzelm@56296
    91
  let
wenzelm@56296
    92
    val props =
wenzelm@56296
    93
      if ! Multithreading.trace >= 2
wenzelm@56296
    94
      then [(Markup.taskN, Task_Queue.str_of_task task)] else [];
wenzelm@56296
    95
  in Output.status (implode (map (Markup.markup_only o Markup.properties props) markups)) end;
wenzelm@53192
    96
wenzelm@56292
    97
type params = {name: string, pos: Position.T, pri: int};
wenzelm@56292
    98
wenzelm@56292
    99
fun fork ({name, pos, pri}: params) e =
wenzelm@53192
   100
  uninterruptible (fn _ => Position.setmp_thread_data pos (fn () =>
wenzelm@53192
   101
    let
wenzelm@53192
   102
      val exec_id = the_default 0 (Position.parse_id pos);
wenzelm@53192
   103
      val group = Future.worker_subgroup ();
wenzelm@53375
   104
      val _ = change_state (apsnd (fn execs =>
wenzelm@56291
   105
        (case Inttab.lookup execs exec_id of
wenzelm@56291
   106
          SOME (groups, prints) =>
wenzelm@56291
   107
            Inttab.update (exec_id, (group :: groups, prints)) execs
wenzelm@56291
   108
        | NONE => raise Fail (unregistered exec_id))));
wenzelm@52655
   109
wenzelm@53192
   110
      val future =
wenzelm@53192
   111
        (singleton o Future.forks)
wenzelm@53192
   112
          {name = name, group = SOME group, deps = [], pri = pri, interrupts = false}
wenzelm@53192
   113
          (fn () =>
wenzelm@53192
   114
            let
wenzelm@53192
   115
              val task = the (Future.worker_task ());
wenzelm@53192
   116
              val _ = status task [Markup.running];
wenzelm@53192
   117
              val result =
wenzelm@53192
   118
                Exn.capture (Future.interruptible_task e) ()
wenzelm@53192
   119
                |> Future.identify_result pos;
wenzelm@54646
   120
              val _ = status task [Markup.joined];
wenzelm@53192
   121
              val _ =
wenzelm@53192
   122
                (case result of
wenzelm@54678
   123
                  Exn.Exn exn =>
wenzelm@54646
   124
                   (status task [Markup.failed];
wenzelm@54678
   125
                    status task [Markup.finished];
wenzelm@54646
   126
                    Output.report (Markup.markup_only Markup.bad);
wenzelm@54646
   127
                    if exec_id = 0 then ()
wenzelm@56303
   128
                    else List.app (Future.error_msg pos) (Runtime.exn_messages_ids exn))
wenzelm@54678
   129
                | Exn.Res _ =>
wenzelm@54678
   130
                    status task [Markup.finished])
wenzelm@53192
   131
            in Exn.release result end);
wenzelm@54646
   132
wenzelm@53192
   133
      val _ = status (Future.task_of future) [Markup.forked];
wenzelm@53192
   134
    in future end)) ();
wenzelm@52604
   135
wenzelm@53192
   136
wenzelm@56291
   137
(* print *)
wenzelm@56291
   138
wenzelm@56292
   139
fun print ({name, pos, pri}: params) pr =
wenzelm@56291
   140
  change_state (apsnd (fn execs =>
wenzelm@56291
   141
    let
wenzelm@56292
   142
      val exec_id = the_default 0 (Position.parse_id pos);
wenzelm@56291
   143
      val i = serial ();
wenzelm@56292
   144
      val print = {name = name, pri = pri, body = fn () => pr i};
wenzelm@56291
   145
    in
wenzelm@56291
   146
      (case Inttab.lookup execs exec_id of
wenzelm@56292
   147
        SOME (groups, prints) => Inttab.update (exec_id, (groups, print :: prints)) execs
wenzelm@56291
   148
      | NONE => raise Fail (unregistered exec_id))
wenzelm@56291
   149
    end));
wenzelm@56291
   150
wenzelm@56297
   151
fun print_report s =
wenzelm@56297
   152
  if s = "" orelse not (Multithreading.enabled ()) then Output.direct_report s
wenzelm@56297
   153
  else
wenzelm@56297
   154
    let
wenzelm@56297
   155
      val body = YXML.parse_body s  (*sharable closure!*)
wenzelm@56297
   156
      val params = {name = "", pos = Position.thread_data (), pri = 0};
wenzelm@56297
   157
    in print params (fn _ => Output.direct_report (YXML.string_of_body body)) end;
wenzelm@56297
   158
wenzelm@56292
   159
fun fork_prints exec_id =
wenzelm@56291
   160
  (case Inttab.lookup (#2 (get_state ())) exec_id of
wenzelm@56291
   161
    SOME (_, prints) =>
wenzelm@56291
   162
      if null prints orelse null (tl prints) orelse not (Multithreading.enabled ())
wenzelm@56292
   163
      then List.app (fn {body, ...} => body ()) (rev prints)
wenzelm@56291
   164
      else
wenzelm@56296
   165
        let val pos = Position.thread_data () in
wenzelm@56292
   166
          List.app (fn {name, pri, body} =>
wenzelm@56292
   167
            ignore (fork {name = name, pos = pos, pri = pri} body)) (rev prints)
wenzelm@56292
   168
        end
wenzelm@56291
   169
  | NONE => raise Fail (unregistered exec_id));
wenzelm@56291
   170
wenzelm@56291
   171
wenzelm@53192
   172
(* cleanup *)
wenzelm@53192
   173
wenzelm@53192
   174
fun purge exec_ids =
wenzelm@53192
   175
  (change_state o apsnd) (fn execs =>
wenzelm@53192
   176
    let
wenzelm@53192
   177
      val execs' = fold Inttab.delete_safe exec_ids execs;
wenzelm@53192
   178
      val () =
wenzelm@56291
   179
        (execs', ()) |-> Inttab.fold (fn (exec_id, (groups, _)) => fn () =>
wenzelm@53192
   180
          if Inttab.defined execs' exec_id then ()
wenzelm@53192
   181
          else groups |> List.app (fn group =>
wenzelm@53192
   182
            if Task_Queue.is_canceled group then ()
wenzelm@53375
   183
            else raise Fail ("Attempt to purge valid execution: " ^ Document_ID.print exec_id)));
wenzelm@53192
   184
    in execs' end);
wenzelm@53192
   185
wenzelm@53192
   186
fun reset () =
wenzelm@56291
   187
  change_state_result (fn (_, execs) =>
wenzelm@56291
   188
    let val groups = Inttab.fold (append o #1 o #2) execs []
wenzelm@53192
   189
    in (groups, init_state) end);
wenzelm@53192
   190
wenzelm@53192
   191
fun shutdown () =
wenzelm@53192
   192
  (Future.shutdown ();
wenzelm@53192
   193
    (case maps Task_Queue.group_status (reset ()) of
wenzelm@53192
   194
      [] => ()
wenzelm@53192
   195
    | exns => raise Par_Exn.make exns));
wenzelm@53192
   196
wenzelm@53192
   197
end;
wenzelm@53192
   198