src/Pure/PIDE/execution.ML
author wenzelm
Mon Nov 25 20:49:20 2013 +0100 (2013-11-25 ago)
changeset 54646 2b38549374ba
parent 53375 78693e46a237
child 54678 87910da843d5
permissions -rw-r--r--
more robust status reports: avoid loosing information about physical events (see also 28d207ba9340, 2bc5924b117f, 9edfd36a0355) -- NB: TTY and Proof General ignore Output.status and Output.report;
more defensive order of Markup.failed vs. Markup.finished, to allow breakdown of ML execution context (see also d7a1973b063c);
     1 (*  Title:      Pure/PIDE/execution.ML
     2     Author:     Makarius
     3 
     4 Global management of execution.  Unique running execution serves as
     5 barrier for further exploration of forked command execs.
     6 *)
     7 
     8 signature EXECUTION =
     9 sig
    10   val start: unit -> Document_ID.execution
    11   val discontinue: unit -> unit
    12   val is_running: Document_ID.execution -> bool
    13   val is_running_exec: Document_ID.exec -> bool
    14   val running: Document_ID.execution -> Document_ID.exec -> Future.group list -> bool
    15   val peek: Document_ID.exec -> Future.group list
    16   val cancel: Document_ID.exec -> unit
    17   val terminate: Document_ID.exec -> unit
    18   val fork: {name: string, pos: Position.T, pri: int} -> (unit -> 'a) -> 'a future
    19   val purge: Document_ID.exec list -> unit
    20   val reset: unit -> Future.group list
    21   val shutdown: unit -> unit
    22 end;
    23 
    24 structure Execution: EXECUTION =
    25 struct
    26 
    27 (* global state *)
    28 
    29 datatype state = State of
    30   {execution: Document_ID.execution,  (*overall document execution*)
    31    execs: Future.group list Inttab.table};  (*command execs with registered forks*)
    32 
    33 fun make_state (execution, execs) = State {execution = execution, execs = execs};
    34 fun map_state f (State {execution, execs}) = make_state (f (execution, execs));
    35 
    36 val init_state = make_state (Document_ID.none, Inttab.make [(Document_ID.none, [])]);
    37 val state = Synchronized.var "Execution.state" init_state;
    38 
    39 fun get_state () = let val State args = Synchronized.value state in args end;
    40 fun change_state f = Synchronized.change state (map_state f);
    41 
    42 
    43 (* unique running execution *)
    44 
    45 fun start () =
    46   let
    47     val execution_id = Document_ID.make ();
    48     val _ = change_state (apfst (K execution_id));
    49   in execution_id end;
    50 
    51 fun discontinue () = change_state (apfst (K Document_ID.none));
    52 
    53 fun is_running execution_id = execution_id = #execution (get_state ());
    54 
    55 
    56 (* execs *)
    57 
    58 fun is_running_exec exec_id =
    59   Inttab.defined (#execs (get_state ())) exec_id;
    60 
    61 fun running execution_id exec_id groups =
    62   Synchronized.change_result state
    63     (fn State {execution = execution_id', execs} =>
    64       let
    65         val continue = execution_id = execution_id';
    66         val execs' =
    67           if continue then
    68             Inttab.update_new (exec_id, groups) execs
    69               handle Inttab.DUP dup =>
    70                 raise Fail ("Execution already registered: " ^ Document_ID.print dup)
    71           else execs;
    72       in (continue, make_state (execution_id', execs')) end);
    73 
    74 fun peek exec_id =
    75   Inttab.lookup_list (#execs (get_state ())) exec_id;
    76 
    77 fun cancel exec_id = List.app Future.cancel_group (peek exec_id);
    78 fun terminate exec_id = List.app Future.terminate (peek exec_id);
    79 
    80 
    81 (* fork *)
    82 
    83 fun status task markups =
    84   let val props = Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]
    85   in Output.status (implode (map (Markup.markup_only o props) markups)) end;
    86 
    87 fun fork {name, pos, pri} e =
    88   uninterruptible (fn _ => Position.setmp_thread_data pos (fn () =>
    89     let
    90       val exec_id = the_default 0 (Position.parse_id pos);
    91       val group = Future.worker_subgroup ();
    92       val _ = change_state (apsnd (fn execs =>
    93         if Inttab.defined execs exec_id
    94         then Inttab.cons_list (exec_id, group) execs
    95         else raise Fail ("Cannot fork from unregistered execution: " ^ Document_ID.print exec_id)));
    96 
    97       val future =
    98         (singleton o Future.forks)
    99           {name = name, group = SOME group, deps = [], pri = pri, interrupts = false}
   100           (fn () =>
   101             let
   102               val task = the (Future.worker_task ());
   103               val _ = status task [Markup.running];
   104               val result =
   105                 Exn.capture (Future.interruptible_task e) ()
   106                 |> Future.identify_result pos;
   107               val _ = status task [Markup.joined];
   108               val _ =
   109                 (case result of
   110                   Exn.Res _ => ()
   111                 | Exn.Exn exn =>
   112                    (status task [Markup.failed];
   113                     Output.report (Markup.markup_only Markup.bad);
   114                     if exec_id = 0 then ()
   115                     else List.app (Future.error_msg pos) (ML_Compiler.exn_messages_ids exn)));
   116               val _ = status task [Markup.finished];
   117             in Exn.release result end);
   118 
   119       val _ = status (Future.task_of future) [Markup.forked];
   120     in future end)) ();
   121 
   122 
   123 (* cleanup *)
   124 
   125 fun purge exec_ids =
   126   (change_state o apsnd) (fn execs =>
   127     let
   128       val execs' = fold Inttab.delete_safe exec_ids execs;
   129       val () =
   130         (execs', ()) |-> Inttab.fold (fn (exec_id, groups) => fn () =>
   131           if Inttab.defined execs' exec_id then ()
   132           else groups |> List.app (fn group =>
   133             if Task_Queue.is_canceled group then ()
   134             else raise Fail ("Attempt to purge valid execution: " ^ Document_ID.print exec_id)));
   135     in execs' end);
   136 
   137 fun reset () =
   138   Synchronized.change_result state (fn State {execs, ...} =>
   139     let val groups = Inttab.fold (append o #2) execs []
   140     in (groups, init_state) end);
   141 
   142 fun shutdown () =
   143   (Future.shutdown ();
   144     (case maps Task_Queue.group_status (reset ()) of
   145       [] => ()
   146     | exns => raise Par_Exn.make exns));
   147 
   148 end;
   149