src/Pure/PIDE/execution.ML
author wenzelm
Thu Mar 27 17:12:40 2014 +0100 (2014-03-27 ago)
changeset 56303 4cc3f4db3447
parent 56297 3925634718fb
child 56304 40274e4f5ebf
permissions -rw-r--r--
clarified Isabelle/ML bootstrap, such that Execution does not require ML_Compiler;
     1 (*  Title:      Pure/PIDE/execution.ML
     2     Author:     Makarius
     3 
     4 Global management of execution.  Unique running execution serves as
     5 barrier for further exploration of forked command execs.
     6 *)
     7 
     8 signature EXECUTION =
     9 sig
    10   val start: unit -> Document_ID.execution
    11   val discontinue: unit -> unit
    12   val is_running: Document_ID.execution -> bool
    13   val is_running_exec: Document_ID.exec -> bool
    14   val running: Document_ID.execution -> Document_ID.exec -> Future.group list -> bool
    15   val peek: Document_ID.exec -> Future.group list
    16   val cancel: Document_ID.exec -> unit
    17   val terminate: Document_ID.exec -> unit
    18   type params = {name: string, pos: Position.T, pri: int}
    19   val fork: params -> (unit -> 'a) -> 'a future
    20   val print: params -> (serial -> unit) -> unit
    21   val print_report: string -> unit
    22   val fork_prints: Document_ID.exec -> unit
    23   val purge: Document_ID.exec list -> unit
    24   val reset: unit -> Future.group list
    25   val shutdown: unit -> unit
    26 end;
    27 
    28 structure Execution: EXECUTION =
    29 struct
    30 
    31 (* global state *)
    32 
    33 type print = {name: string, pri: int, body: unit -> unit};
    34 type exec_state = Future.group list * print list;  (*active forks, prints*)
    35 type state =
    36   Document_ID.execution * (*overall document execution*)
    37   exec_state Inttab.table;  (*running command execs*)
    38 
    39 val init_state: state = (Document_ID.none, Inttab.make [(Document_ID.none, ([], []))]);
    40 val state = Synchronized.var "Execution.state" init_state;
    41 
    42 fun get_state () = Synchronized.value state;
    43 fun change_state_result f = Synchronized.change_result state f;
    44 fun change_state f = Synchronized.change state f;
    45 
    46 fun unregistered exec_id = "Unregistered execution: " ^ Document_ID.print exec_id;
    47 
    48 
    49 (* unique running execution *)
    50 
    51 fun start () =
    52   let
    53     val execution_id = Document_ID.make ();
    54     val _ = change_state (apfst (K execution_id));
    55   in execution_id end;
    56 
    57 fun discontinue () = change_state (apfst (K Document_ID.none));
    58 
    59 fun is_running execution_id = execution_id = #1 (get_state ());
    60 
    61 
    62 (* execs *)
    63 
    64 fun is_running_exec exec_id =
    65   Inttab.defined (#2 (get_state ())) exec_id;
    66 
    67 fun running execution_id exec_id groups =
    68   change_state_result (fn (execution_id', execs) =>
    69     let
    70       val continue = execution_id = execution_id';
    71       val execs' =
    72         if continue then
    73           Inttab.update_new (exec_id, (groups, [])) execs
    74             handle Inttab.DUP dup =>
    75               raise Fail ("Execution already registered: " ^ Document_ID.print dup)
    76         else execs;
    77     in (continue, (execution_id', execs')) end);
    78 
    79 fun peek exec_id =
    80   (case Inttab.lookup (#2 (get_state ())) exec_id of
    81     SOME (groups, _) => groups
    82   | NONE => []);
    83 
    84 fun cancel exec_id = List.app Future.cancel_group (peek exec_id);
    85 fun terminate exec_id = List.app Future.terminate (peek exec_id);
    86 
    87 
    88 (* fork *)
    89 
    90 fun status task markups =
    91   let
    92     val props =
    93       if ! Multithreading.trace >= 2
    94       then [(Markup.taskN, Task_Queue.str_of_task task)] else [];
    95   in Output.status (implode (map (Markup.markup_only o Markup.properties props) markups)) end;
    96 
    97 type params = {name: string, pos: Position.T, pri: int};
    98 
    99 fun fork ({name, pos, pri}: params) e =
   100   uninterruptible (fn _ => Position.setmp_thread_data pos (fn () =>
   101     let
   102       val exec_id = the_default 0 (Position.parse_id pos);
   103       val group = Future.worker_subgroup ();
   104       val _ = change_state (apsnd (fn execs =>
   105         (case Inttab.lookup execs exec_id of
   106           SOME (groups, prints) =>
   107             Inttab.update (exec_id, (group :: groups, prints)) execs
   108         | NONE => raise Fail (unregistered exec_id))));
   109 
   110       val future =
   111         (singleton o Future.forks)
   112           {name = name, group = SOME group, deps = [], pri = pri, interrupts = false}
   113           (fn () =>
   114             let
   115               val task = the (Future.worker_task ());
   116               val _ = status task [Markup.running];
   117               val result =
   118                 Exn.capture (Future.interruptible_task e) ()
   119                 |> Future.identify_result pos;
   120               val _ = status task [Markup.joined];
   121               val _ =
   122                 (case result of
   123                   Exn.Exn exn =>
   124                    (status task [Markup.failed];
   125                     status task [Markup.finished];
   126                     Output.report (Markup.markup_only Markup.bad);
   127                     if exec_id = 0 then ()
   128                     else List.app (Future.error_msg pos) (Runtime.exn_messages_ids exn))
   129                 | Exn.Res _ =>
   130                     status task [Markup.finished])
   131             in Exn.release result end);
   132 
   133       val _ = status (Future.task_of future) [Markup.forked];
   134     in future end)) ();
   135 
   136 
   137 (* print *)
   138 
   139 fun print ({name, pos, pri}: params) pr =
   140   change_state (apsnd (fn execs =>
   141     let
   142       val exec_id = the_default 0 (Position.parse_id pos);
   143       val i = serial ();
   144       val print = {name = name, pri = pri, body = fn () => pr i};
   145     in
   146       (case Inttab.lookup execs exec_id of
   147         SOME (groups, prints) => Inttab.update (exec_id, (groups, print :: prints)) execs
   148       | NONE => raise Fail (unregistered exec_id))
   149     end));
   150 
   151 fun print_report s =
   152   if s = "" orelse not (Multithreading.enabled ()) then Output.direct_report s
   153   else
   154     let
   155       val body = YXML.parse_body s  (*sharable closure!*)
   156       val params = {name = "", pos = Position.thread_data (), pri = 0};
   157     in print params (fn _ => Output.direct_report (YXML.string_of_body body)) end;
   158 
   159 fun fork_prints exec_id =
   160   (case Inttab.lookup (#2 (get_state ())) exec_id of
   161     SOME (_, prints) =>
   162       if null prints orelse null (tl prints) orelse not (Multithreading.enabled ())
   163       then List.app (fn {body, ...} => body ()) (rev prints)
   164       else
   165         let val pos = Position.thread_data () in
   166           List.app (fn {name, pri, body} =>
   167             ignore (fork {name = name, pos = pos, pri = pri} body)) (rev prints)
   168         end
   169   | NONE => raise Fail (unregistered exec_id));
   170 
   171 
   172 (* cleanup *)
   173 
   174 fun purge exec_ids =
   175   (change_state o apsnd) (fn execs =>
   176     let
   177       val execs' = fold Inttab.delete_safe exec_ids execs;
   178       val () =
   179         (execs', ()) |-> Inttab.fold (fn (exec_id, (groups, _)) => fn () =>
   180           if Inttab.defined execs' exec_id then ()
   181           else groups |> List.app (fn group =>
   182             if Task_Queue.is_canceled group then ()
   183             else raise Fail ("Attempt to purge valid execution: " ^ Document_ID.print exec_id)));
   184     in execs' end);
   185 
   186 fun reset () =
   187   change_state_result (fn (_, execs) =>
   188     let val groups = Inttab.fold (append o #1 o #2) execs []
   189     in (groups, init_state) end);
   190 
   191 fun shutdown () =
   192   (Future.shutdown ();
   193     (case maps Task_Queue.group_status (reset ()) of
   194       [] => ()
   195     | exns => raise Par_Exn.make exns));
   196 
   197 end;
   198