src/Pure/PIDE/execution.ML
author wenzelm
Wed Mar 26 12:15:42 2014 +0100 (2014-03-26 ago)
changeset 56291 e79f76a48449
parent 54678 87910da843d5
child 56292 1a91a0da65ab
permissions -rw-r--r--
added Execution.print: accumulate print operations for some command execution, which are applied later and print time;
misc tuning;
     1 (*  Title:      Pure/PIDE/execution.ML
     2     Author:     Makarius
     3 
     4 Global management of execution.  Unique running execution serves as
     5 barrier for further exploration of forked command execs.
     6 *)
     7 
     8 signature EXECUTION =
     9 sig
    10   val start: unit -> Document_ID.execution
    11   val discontinue: unit -> unit
    12   val is_running: Document_ID.execution -> bool
    13   val is_running_exec: Document_ID.exec -> bool
    14   val running: Document_ID.execution -> Document_ID.exec -> Future.group list -> bool
    15   val peek: Document_ID.exec -> Future.group list
    16   val cancel: Document_ID.exec -> unit
    17   val terminate: Document_ID.exec -> unit
    18   val fork: {name: string, pos: Position.T, pri: int} -> (unit -> 'a) -> 'a future
    19   val print: (serial -> unit) -> unit
    20   val apply_prints: Document_ID.exec -> unit
    21   val purge: Document_ID.exec list -> unit
    22   val reset: unit -> Future.group list
    23   val shutdown: unit -> unit
    24 end;
    25 
    26 structure Execution: EXECUTION =
    27 struct
    28 
    29 (* global state *)
    30 
    31 type exec_state = Future.group list * (unit -> unit) list;  (*active forks, prints*)
    32 type state =
    33   Document_ID.execution * (*overall document execution*)
    34   exec_state Inttab.table;  (*running command execs*)
    35 
    36 val init_state: state = (Document_ID.none, Inttab.make [(Document_ID.none, ([], []))]);
    37 val state = Synchronized.var "Execution.state" init_state;
    38 
    39 fun get_state () = Synchronized.value state;
    40 fun change_state_result f = Synchronized.change_result state f;
    41 fun change_state f = Synchronized.change state f;
    42 
    43 fun unregistered exec_id = "Unregistered execution: " ^ Document_ID.print exec_id;
    44 
    45 
    46 (* unique running execution *)
    47 
    48 fun start () =
    49   let
    50     val execution_id = Document_ID.make ();
    51     val _ = change_state (apfst (K execution_id));
    52   in execution_id end;
    53 
    54 fun discontinue () = change_state (apfst (K Document_ID.none));
    55 
    56 fun is_running execution_id = execution_id = #1 (get_state ());
    57 
    58 
    59 (* execs *)
    60 
    61 fun is_running_exec exec_id =
    62   Inttab.defined (#2 (get_state ())) exec_id;
    63 
    64 fun running execution_id exec_id groups =
    65   change_state_result (fn (execution_id', execs) =>
    66     let
    67       val continue = execution_id = execution_id';
    68       val execs' =
    69         if continue then
    70           Inttab.update_new (exec_id, (groups, [])) execs
    71             handle Inttab.DUP dup =>
    72               raise Fail ("Execution already registered: " ^ Document_ID.print dup)
    73         else execs;
    74     in (continue, (execution_id', execs')) end);
    75 
    76 fun peek exec_id =
    77   (case Inttab.lookup (#2 (get_state ())) exec_id of
    78     SOME (groups, _) => groups
    79   | NONE => []);
    80 
    81 fun cancel exec_id = List.app Future.cancel_group (peek exec_id);
    82 fun terminate exec_id = List.app Future.terminate (peek exec_id);
    83 
    84 
    85 (* fork *)
    86 
    87 fun status task markups =
    88   let val props = Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]
    89   in Output.status (implode (map (Markup.markup_only o props) markups)) end;
    90 
    91 fun fork {name, pos, pri} e =
    92   uninterruptible (fn _ => Position.setmp_thread_data pos (fn () =>
    93     let
    94       val exec_id = the_default 0 (Position.parse_id pos);
    95       val group = Future.worker_subgroup ();
    96       val _ = change_state (apsnd (fn execs =>
    97         (case Inttab.lookup execs exec_id of
    98           SOME (groups, prints) =>
    99             Inttab.update (exec_id, (group :: groups, prints)) execs
   100         | NONE => raise Fail (unregistered exec_id))));
   101 
   102       val future =
   103         (singleton o Future.forks)
   104           {name = name, group = SOME group, deps = [], pri = pri, interrupts = false}
   105           (fn () =>
   106             let
   107               val task = the (Future.worker_task ());
   108               val _ = status task [Markup.running];
   109               val result =
   110                 Exn.capture (Future.interruptible_task e) ()
   111                 |> Future.identify_result pos;
   112               val _ = status task [Markup.joined];
   113               val _ =
   114                 (case result of
   115                   Exn.Exn exn =>
   116                    (status task [Markup.failed];
   117                     status task [Markup.finished];
   118                     Output.report (Markup.markup_only Markup.bad);
   119                     if exec_id = 0 then ()
   120                     else List.app (Future.error_msg pos) (ML_Compiler.exn_messages_ids exn))
   121                 | Exn.Res _ =>
   122                     status task [Markup.finished])
   123             in Exn.release result end);
   124 
   125       val _ = status (Future.task_of future) [Markup.forked];
   126     in future end)) ();
   127 
   128 
   129 (* print *)
   130 
   131 fun print pr =
   132   change_state (apsnd (fn execs =>
   133     let
   134       val exec_id = the_default 0 (Position.parse_id (Position.thread_data ()));
   135       val i = serial ();
   136     in
   137       (case Inttab.lookup execs exec_id of
   138         SOME (groups, prints) =>
   139           Inttab.update (exec_id, (groups, (fn () => pr i) :: prints)) execs
   140       | NONE => raise Fail (unregistered exec_id))
   141     end));
   142 
   143 fun apply_prints exec_id =
   144   (case Inttab.lookup (#2 (get_state ())) exec_id of
   145     SOME (_, prints) =>
   146       if null prints orelse null (tl prints) orelse not (Multithreading.enabled ())
   147       then List.app (fn e => e ()) (rev prints)
   148       else
   149         let
   150           val pos = Position.thread_data ();
   151           val pri =
   152             (case Future.worker_task () of
   153               SOME task => Task_Queue.pri_of_task task
   154             | NONE => 0);
   155           val futures = map (fork {name = "Execution.print", pos = pos, pri = pri}) (rev prints);
   156         in ignore (Future.joins futures) end
   157   | NONE => raise Fail (unregistered exec_id));
   158 
   159 
   160 (* cleanup *)
   161 
   162 fun purge exec_ids =
   163   (change_state o apsnd) (fn execs =>
   164     let
   165       val execs' = fold Inttab.delete_safe exec_ids execs;
   166       val () =
   167         (execs', ()) |-> Inttab.fold (fn (exec_id, (groups, _)) => fn () =>
   168           if Inttab.defined execs' exec_id then ()
   169           else groups |> List.app (fn group =>
   170             if Task_Queue.is_canceled group then ()
   171             else raise Fail ("Attempt to purge valid execution: " ^ Document_ID.print exec_id)));
   172     in execs' end);
   173 
   174 fun reset () =
   175   change_state_result (fn (_, execs) =>
   176     let val groups = Inttab.fold (append o #1 o #2) execs []
   177     in (groups, init_state) end);
   178 
   179 fun shutdown () =
   180   (Future.shutdown ();
   181     (case maps Task_Queue.group_status (reset ()) of
   182       [] => ()
   183     | exns => raise Par_Exn.make exns));
   184 
   185 end;
   186