src/Pure/PIDE/exec.ML
changeset 52604 ff2f0818aebc
parent 52602 00170ef1dc39
equal deleted inserted replaced
52603:a44e9a1d5d8b 52604:ff2f0818aebc
     4 Global management of command execution fragments.
     4 Global management of command execution fragments.
     5 *)
     5 *)
     6 
     6 
     7 signature EXEC =
     7 signature EXEC =
     8 sig
     8 sig
     9   val running: Document_ID.exec -> unit
     9   type context
       
    10   val no_context: context
       
    11   val drop_context: context -> unit
       
    12   val fresh_context: unit -> context
       
    13   val is_running: context -> bool
       
    14   val running: context -> Document_ID.exec -> bool
    10   val finished: Document_ID.exec -> bool -> unit
    15   val finished: Document_ID.exec -> bool -> unit
    11   val is_stable: Document_ID.exec -> bool
    16   val is_stable: Document_ID.exec -> bool
    12   val peek_running: Document_ID.exec -> Future.group option
    17   val peek_running: Document_ID.exec -> Future.group option
    13   val purge_unstable: unit -> unit
    18   val purge_canceled: unit -> unit
       
    19   val terminate_all: unit -> unit
    14 end;
    20 end;
    15 
    21 
    16 structure Exec: EXEC =
    22 structure Exec: EXEC =
    17 struct
    23 struct
    18 
    24 
    19 type status = Future.group option;  (*SOME group: running, NONE: canceled/unstable*)
    25 (* global state *)
    20 val execs = Synchronized.var "Exec.execs" (Inttab.empty: status Inttab.table);
       
    21 
    26 
    22 fun running exec_id =
    27 datatype context = Context of Document_ID.generic;
       
    28 val no_context = Context Document_ID.none;
       
    29 
       
    30 type status = Future.group option;  (*SOME group: running, NONE: canceled*)
       
    31 val state = Synchronized.var "Exec.state" (no_context, Inttab.empty: status Inttab.table);
       
    32 
       
    33 
       
    34 (* unique execution context *)
       
    35 
       
    36 fun drop_context context =
       
    37   Synchronized.change state
       
    38     (apfst (fn context' => if context = context' then no_context else context'));
       
    39 
       
    40 fun fresh_context () =
    23   let
    41   let
    24     val group =
    42     val context = Context (Document_ID.make ());
    25       (case Future.worker_group () of
    43     val _ = Synchronized.change state (apfst (K context));
    26         SOME group => group
    44   in context end;
    27       | NONE => error ("Missing worker thread context for execution " ^ Document_ID.print exec_id));
    45 
    28   in Synchronized.change execs (Inttab.update_new (exec_id, SOME group)) end;
    46 fun is_running context = context = fst (Synchronized.value state);
       
    47 
       
    48 
       
    49 (* registered execs *)
       
    50 
       
    51 fun running context exec_id =
       
    52   Synchronized.guarded_access state
       
    53     (fn (current_context, execs) =>
       
    54       let
       
    55         val cont = context = current_context;
       
    56         val execs' =
       
    57           if cont then
       
    58             Inttab.update_new (exec_id, SOME (Future.the_worker_group ())) execs
       
    59               handle Inttab.DUP dup => error ("Duplicate execution " ^ Document_ID.print dup)
       
    60           else execs;
       
    61       in SOME (cont, (current_context, execs')) end);
    29 
    62 
    30 fun finished exec_id stable =
    63 fun finished exec_id stable =
    31   Synchronized.change execs
    64   Synchronized.change state
    32     (if stable then Inttab.delete exec_id else Inttab.update (exec_id, NONE));
    65     (apsnd (fn execs =>
       
    66       if not (Inttab.defined execs exec_id) then
       
    67         error ("Attempt to finish unknown execution: " ^ Document_ID.print exec_id)
       
    68       else if stable then Inttab.delete exec_id execs
       
    69       else Inttab.update (exec_id, NONE) execs));
    33 
    70 
    34 fun is_stable exec_id =
    71 fun is_stable exec_id =
    35   not (Par_Exn.is_interrupted (Future.join_results (Goal.peek_futures exec_id))) andalso
    72   not (Par_Exn.is_interrupted (Future.join_results (Goal.peek_futures exec_id))) andalso
    36   (case Inttab.lookup (Synchronized.value execs) exec_id of
    73   (case Inttab.lookup (snd (Synchronized.value state)) exec_id of
    37     NONE => true
    74     NONE => true
    38   | SOME status => is_some status);
    75   | SOME status => is_some status);
    39 
    76 
    40 fun peek_running exec_id =
    77 fun peek_running exec_id =
    41   (case Inttab.lookup (Synchronized.value execs) exec_id of
    78   (case Inttab.lookup (snd (Synchronized.value state)) exec_id of
    42     SOME (SOME group) => SOME group
    79     SOME (SOME group) => SOME group
    43   | _ => NONE);
    80   | _ => NONE);
    44 
    81 
    45 fun purge_unstable () =
    82 fun purge_canceled () =
    46   Synchronized.guarded_access execs
    83   Synchronized.guarded_access state
    47     (fn tab =>
    84     (fn (context, execs) =>
    48       let
    85       let
    49         val unstable = Inttab.fold (fn (exec_id, NONE) => cons exec_id | _ => I) tab [];
    86         val canceled = Inttab.fold (fn (exec_id, NONE) => cons exec_id | _ => I) execs [];
    50         val tab' = fold Inttab.delete unstable tab;
    87         val execs' = fold Inttab.delete canceled execs;
    51       in SOME ((), tab') end);
    88       in SOME ((), (context, execs')) end);
       
    89 
       
    90 fun terminate_all () =
       
    91   let
       
    92     val groups =
       
    93       Inttab.fold (fn (_, SOME group) => cons group | _ => I)
       
    94         (snd (Synchronized.value state)) [];
       
    95     val _ = List.app Future.cancel_group groups;
       
    96     val _ = List.app Future.terminate groups;
       
    97   in () end;
    52 
    98 
    53 end;
    99 end;
    54 
   100