maintain goal forks as part of global execution;
authorwenzelm
Sun Aug 25 20:32:26 2013 +0200 (2013-08-25)
changeset 5319204df1d236e1c
parent 53191 14ab2f821e1d
child 53193 2ddc5e788f7c
maintain goal forks as part of global execution;
tuned;
src/Pure/Isar/proof.ML
src/Pure/Isar/toplevel.ML
src/Pure/PIDE/command.ML
src/Pure/PIDE/execution.ML
src/Pure/PIDE/protocol.ML
src/Pure/ROOT.ML
src/Pure/System/isabelle_process.ML
src/Pure/System/session.ML
src/Pure/Thy/thy_info.ML
src/Pure/goal.ML
     1.1 --- a/src/Pure/Isar/proof.ML	Sun Aug 25 17:17:48 2013 +0200
     1.2 +++ b/src/Pure/Isar/proof.ML	Sun Aug 25 20:32:26 2013 +0200
     1.3 @@ -1170,7 +1170,7 @@
     1.4          val pos = Position.thread_data ();
     1.5          val props = Markup.command_timing :: (Markup.nameN, "by") :: Position.properties_of pos;
     1.6        in
     1.7 -        Goal.fork_params {name = "Proof.future_terminal_proof", pos = pos, pri = ~1}
     1.8 +        Execution.fork {name = "Proof.future_terminal_proof", pos = pos, pri = ~1}
     1.9            (fn () => ((), Timing.protocol props proof2 state'))
    1.10        end) |> snd |> done
    1.11    else proof1 state;
     2.1 --- a/src/Pure/Isar/toplevel.ML	Sun Aug 25 17:17:48 2013 +0200
     2.2 +++ b/src/Pure/Isar/toplevel.ML	Sun Aug 25 20:32:26 2013 +0200
     2.3 @@ -708,7 +708,7 @@
     2.4    let
     2.5      val st' =
     2.6        if Goal.future_enabled 1 andalso Keyword.is_diag (name_of tr) then
     2.7 -        (Goal.fork_params
     2.8 +        (Execution.fork
     2.9            {name = "Toplevel.diag", pos = pos_of tr,
    2.10              pri = priority (timing_estimate true (Thy_Syntax.atom tr))}
    2.11            (fn () => command tr st); st)
    2.12 @@ -736,7 +736,7 @@
    2.13  
    2.14              val future_proof =
    2.15                Proof.future_proof (fn state =>
    2.16 -                Goal.fork_params
    2.17 +                Execution.fork
    2.18                    {name = "Toplevel.future_proof", pos = pos_of head_tr, pri = priority estimate}
    2.19                    (fn () =>
    2.20                      let
     3.1 --- a/src/Pure/PIDE/command.ML	Sun Aug 25 17:17:48 2013 +0200
     3.2 +++ b/src/Pure/PIDE/command.ML	Sun Aug 25 20:32:26 2013 +0200
     3.3 @@ -131,7 +131,7 @@
     3.4  
     3.5  fun run int tr st =
     3.6    if Goal.future_enabled 1 andalso Keyword.is_diag (Toplevel.name_of tr) then
     3.7 -    (Goal.fork_params {name = "Toplevel.diag", pos = Toplevel.pos_of tr, pri = ~1}
     3.8 +    (Execution.fork {name = "Toplevel.diag", pos = Toplevel.pos_of tr, pri = ~1}
     3.9        (fn () => Toplevel.command_exception int tr st); ([], SOME st))
    3.10    else Toplevel.command_errors int tr st;
    3.11  
     4.1 --- a/src/Pure/PIDE/execution.ML	Sun Aug 25 17:17:48 2013 +0200
     4.2 +++ b/src/Pure/PIDE/execution.ML	Sun Aug 25 20:32:26 2013 +0200
     4.3 @@ -2,19 +2,23 @@
     4.4      Author:     Makarius
     4.5  
     4.6  Global management of execution.  Unique running execution serves as
     4.7 -barrier for further exploration of exec fragments.
     4.8 +barrier for further exploration of forked command execs.
     4.9  *)
    4.10  
    4.11  signature EXECUTION =
    4.12  sig
    4.13 -  val reset: unit -> unit
    4.14    val start: unit -> Document_ID.execution
    4.15    val discontinue: unit -> unit
    4.16    val is_running: Document_ID.execution -> bool
    4.17    val is_running_exec: Document_ID.exec -> bool
    4.18    val running: Document_ID.execution -> Document_ID.exec -> bool
    4.19 +  val peek: Document_ID.exec -> Future.group list
    4.20    val cancel: Document_ID.exec -> unit
    4.21    val terminate: Document_ID.exec -> unit
    4.22 +  val fork: {name: string, pos: Position.T, pri: int} -> (unit -> 'a) -> 'a future
    4.23 +  val purge: Document_ID.exec list -> unit
    4.24 +  val reset: unit -> Future.group list
    4.25 +  val shutdown: unit -> unit
    4.26  end;
    4.27  
    4.28  structure Execution: EXECUTION =
    4.29 @@ -22,50 +26,129 @@
    4.30  
    4.31  (* global state *)
    4.32  
    4.33 -type state = Document_ID.execution * Future.group Inttab.table;
    4.34 +datatype state = State of
    4.35 +  {execution: Document_ID.execution,  (*overall document execution*)
    4.36 +   execs: Future.group list Inttab.table};  (*command execs with registered forks*)
    4.37  
    4.38 -val init_state: state = (Document_ID.none, Inttab.empty);
    4.39 +fun make_state (execution, execs) = State {execution = execution, execs = execs};
    4.40 +fun map_state f (State {execution, execs}) = make_state (f (execution, execs));
    4.41 +
    4.42 +val init_state = make_state (Document_ID.none, Inttab.empty);
    4.43  val state = Synchronized.var "Execution.state" init_state;
    4.44  
    4.45 +fun get_state () = let val State args = Synchronized.value state in args end;
    4.46 +fun change_state f = Synchronized.change state (map_state f);
    4.47 +
    4.48  
    4.49  (* unique running execution *)
    4.50  
    4.51 -fun reset () = Synchronized.change state (K init_state);
    4.52 -
    4.53  fun start () =
    4.54    let
    4.55      val execution_id = Document_ID.make ();
    4.56 -    val _ = Synchronized.change state (apfst (K execution_id));
    4.57 +    val _ = change_state (apfst (K execution_id));
    4.58    in execution_id end;
    4.59  
    4.60 -fun discontinue () =
    4.61 -  Synchronized.change state (apfst (K Document_ID.none));
    4.62 +fun discontinue () = change_state (apfst (K Document_ID.none));
    4.63  
    4.64 -fun is_running execution_id = execution_id = fst (Synchronized.value state);
    4.65 +fun is_running execution_id = execution_id = #execution (get_state ());
    4.66  
    4.67  
    4.68 -(* registered execs *)
    4.69 +(* execs *)
    4.70  
    4.71  fun is_running_exec exec_id =
    4.72 -  Inttab.defined (snd (Synchronized.value state)) exec_id;
    4.73 +  Inttab.defined (#execs (get_state ())) exec_id;
    4.74  
    4.75  fun running execution_id exec_id =
    4.76    Synchronized.guarded_access state
    4.77 -    (fn (execution_id', execs) =>
    4.78 +    (fn State {execution = execution_id', execs} =>
    4.79        let
    4.80          val continue = execution_id = execution_id';
    4.81          val execs' =
    4.82            if continue then
    4.83 -            Inttab.update_new (exec_id, Future.the_worker_group ()) execs
    4.84 -              handle Inttab.DUP dup => error ("Duplicate execution " ^ Document_ID.print dup)
    4.85 +            Inttab.update_new (exec_id, [Future.the_worker_group ()]) execs
    4.86 +              handle Inttab.DUP dup =>
    4.87 +                error ("Execution already registered: " ^ Document_ID.print dup)
    4.88            else execs;
    4.89 -      in SOME (continue, (execution_id', execs')) end);
    4.90 +      in SOME (continue, make_state (execution_id', execs')) end);
    4.91 +
    4.92 +fun peek exec_id =
    4.93 +  Inttab.lookup_list (#execs (get_state ())) exec_id;
    4.94 +
    4.95 +fun cancel exec_id = List.app Future.cancel_group (peek exec_id);
    4.96 +fun terminate exec_id = List.app Future.terminate (peek exec_id);
    4.97 +
    4.98 +
    4.99 +(* fork *)
   4.100 +
   4.101 +local
   4.102 +
   4.103 +fun status task markups =
   4.104 +  let val props = Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]
   4.105 +  in Output.status (implode (map (Markup.markup_only o props) markups)) end;
   4.106 +
   4.107 +fun register exec =
   4.108 +  change_state (fn (execution, execs) => (execution, Inttab.cons_list exec execs));
   4.109 +
   4.110 +in
   4.111  
   4.112 -fun peek_list exec_id =
   4.113 -  the_list (Inttab.lookup (snd (Synchronized.value state)) exec_id);
   4.114 +fun fork {name, pos, pri} e =
   4.115 +  uninterruptible (fn _ => Position.setmp_thread_data pos (fn () =>
   4.116 +    let
   4.117 +      val exec_id = the_default 0 (Position.parse_id pos);
   4.118 +      val group = Future.worker_subgroup ();
   4.119 +      val _ = register (exec_id, group);
   4.120  
   4.121 -fun cancel exec_id = List.app Future.cancel_group (peek_list exec_id);
   4.122 -fun terminate exec_id = List.app Future.terminate (peek_list exec_id);
   4.123 +      val future =
   4.124 +        (singleton o Future.forks)
   4.125 +          {name = name, group = SOME group, deps = [], pri = pri, interrupts = false}
   4.126 +          (fn () =>
   4.127 +            let
   4.128 +              val task = the (Future.worker_task ());
   4.129 +              val _ = status task [Markup.running];
   4.130 +              val result =
   4.131 +                Exn.capture (Future.interruptible_task e) ()
   4.132 +                |> Future.identify_result pos;
   4.133 +              val _ = status task [Markup.finished, Markup.joined];
   4.134 +              val _ =
   4.135 +                (case result of
   4.136 +                  Exn.Res _ => ()
   4.137 +                | Exn.Exn exn =>
   4.138 +                    if exec_id = 0 orelse Exn.is_interrupt exn then ()
   4.139 +                    else
   4.140 +                      (status task [Markup.failed];
   4.141 +                       Output.report (Markup.markup_only Markup.bad);
   4.142 +                       List.app (Future.error_msg pos) (ML_Compiler.exn_messages_ids exn)));
   4.143 +            in Exn.release result end);
   4.144 +      val _ = status (Future.task_of future) [Markup.forked];
   4.145 +    in future end)) ();
   4.146  
   4.147  end;
   4.148  
   4.149 +
   4.150 +(* cleanup *)
   4.151 +
   4.152 +fun purge exec_ids =
   4.153 +  (change_state o apsnd) (fn execs =>
   4.154 +    let
   4.155 +      val execs' = fold Inttab.delete_safe exec_ids execs;
   4.156 +      val () =
   4.157 +        (execs', ()) |-> Inttab.fold (fn (exec_id, groups) => fn () =>
   4.158 +          if Inttab.defined execs' exec_id then ()
   4.159 +          else groups |> List.app (fn group =>
   4.160 +            if Task_Queue.is_canceled group then ()
   4.161 +            else error ("Attempt to purge valid execution: " ^ Document_ID.print exec_id)));
   4.162 +    in execs' end);
   4.163 +
   4.164 +fun reset () =
   4.165 +  Synchronized.change_result state (fn State {execs, ...} =>
   4.166 +    let val groups = Inttab.fold (append o #2) execs []
   4.167 +    in (groups, init_state) end);
   4.168 +
   4.169 +fun shutdown () =
   4.170 +  (Future.shutdown ();
   4.171 +    (case maps Task_Queue.group_status (reset ()) of
   4.172 +      [] => ()
   4.173 +    | exns => raise Par_Exn.make exns));
   4.174 +
   4.175 +end;
   4.176 +
     5.1 --- a/src/Pure/PIDE/protocol.ML	Sun Aug 25 17:17:48 2013 +0200
     5.2 +++ b/src/Pure/PIDE/protocol.ML	Sun Aug 25 20:32:26 2013 +0200
     5.3 @@ -67,7 +67,7 @@
     5.4  
     5.5          val (removed, assign_update, state') = Document.update old_id new_id edits state;
     5.6          val _ = List.app Execution.terminate removed;
     5.7 -        val _ = Goal.purge_futures removed;
     5.8 +        val _ = Execution.purge removed;
     5.9          val _ = List.app Isabelle_Process.reset_tracing removed;
    5.10  
    5.11          val _ =
     6.1 --- a/src/Pure/ROOT.ML	Sun Aug 25 17:17:48 2013 +0200
     6.2 +++ b/src/Pure/ROOT.ML	Sun Aug 25 20:32:26 2013 +0200
     6.3 @@ -185,6 +185,9 @@
     6.4  if ML_System.is_polyml then use "ML/ml_compiler_polyml.ML" else ();
     6.5  if ML_System.name = "polyml-5.5.1" then use "ML/exn_trace_polyml-5.5.1.ML" else ();
     6.6  
     6.7 +(*global execution*)
     6.8 +use "PIDE/document_id.ML";
     6.9 +use "PIDE/execution.ML";
    6.10  use "skip_proof.ML";
    6.11  use "goal.ML";
    6.12  
    6.13 @@ -257,7 +260,6 @@
    6.14  
    6.15  (*toplevel transactions*)
    6.16  use "Isar/proof_node.ML";
    6.17 -use "PIDE/document_id.ML";
    6.18  use "Isar/toplevel.ML";
    6.19  
    6.20  (*theory documents*)
    6.21 @@ -267,7 +269,6 @@
    6.22  use "Isar/outer_syntax.ML";
    6.23  use "General/graph_display.ML";
    6.24  use "Thy/present.ML";
    6.25 -use "PIDE/execution.ML";
    6.26  use "PIDE/command.ML";
    6.27  use "PIDE/query_operation.ML";
    6.28  use "Thy/thy_load.ML";
     7.1 --- a/src/Pure/System/isabelle_process.ML	Sun Aug 25 17:17:48 2013 +0200
     7.2 +++ b/src/Pure/System/isabelle_process.ML	Sun Aug 25 20:32:26 2013 +0200
     7.3 @@ -173,7 +173,7 @@
     7.4        | exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true);
     7.5    in
     7.6      if continue then loop channel
     7.7 -    else (Future.shutdown (); Goal.reset_futures (); Execution.reset ())
     7.8 +    else (Future.shutdown (); Execution.reset (); ())
     7.9    end;
    7.10  
    7.11  end;
     8.1 --- a/src/Pure/System/session.ML	Sun Aug 25 17:17:48 2013 +0200
     8.2 +++ b/src/Pure/System/session.ML	Sun Aug 25 20:32:26 2013 +0200
     8.3 @@ -51,7 +51,7 @@
     8.4  (* finish *)
     8.5  
     8.6  fun finish () =
     8.7 - (Goal.shutdown_futures ();
     8.8 + (Execution.shutdown ();
     8.9    Thy_Info.finish ();
    8.10    Present.finish ();
    8.11    Outer_Syntax.check_syntax ();
     9.1 --- a/src/Pure/Thy/thy_info.ML	Sun Aug 25 17:17:48 2013 +0200
     9.2 +++ b/src/Pure/Thy/thy_info.ML	Sun Aug 25 20:32:26 2013 +0200
     9.3 @@ -176,7 +176,7 @@
     9.4  
     9.5  fun join_theory (Result {theory, id, ...}) =
     9.6    Exn.capture Thm.join_theory_proofs theory ::
     9.7 -  map Exn.Exn (maps Task_Queue.group_status (Goal.peek_futures id));
     9.8 +  map Exn.Exn (maps Task_Queue.group_status (Execution.peek id));
     9.9  
    9.10  
    9.11  datatype task =
    9.12 @@ -235,7 +235,7 @@
    9.13        |> map (fn future => Exn.capture (fn () => result_commit (Future.join future) ()) ());
    9.14  
    9.15      (* FIXME avoid global reset_futures (!??) *)
    9.16 -    val results4 = map Exn.Exn (maps Task_Queue.group_status (Goal.reset_futures ()));
    9.17 +    val results4 = map Exn.Exn (maps Task_Queue.group_status (Execution.reset ()));
    9.18  
    9.19      val _ = Par_Exn.release_all (results1 @ results2 @ results3 @ results4);
    9.20    in () end);
    10.1 --- a/src/Pure/goal.ML	Sun Aug 25 17:17:48 2013 +0200
    10.2 +++ b/src/Pure/goal.ML	Sun Aug 25 20:32:26 2013 +0200
    10.3 @@ -24,12 +24,6 @@
    10.4    val check_finished: Proof.context -> thm -> thm
    10.5    val finish: Proof.context -> thm -> thm
    10.6    val norm_result: thm -> thm
    10.7 -  val fork_params: {name: string, pos: Position.T, pri: int} -> (unit -> 'a) -> 'a future
    10.8 -  val fork: int -> (unit -> 'a) -> 'a future
    10.9 -  val peek_futures: int -> Future.group list
   10.10 -  val purge_futures: int list -> unit
   10.11 -  val reset_futures: unit -> Future.group list
   10.12 -  val shutdown_futures: unit -> unit
   10.13    val skip_proofs_enabled: unit -> bool
   10.14    val future_enabled: int -> bool
   10.15    val future_enabled_timing: Time.time -> bool
   10.16 @@ -111,86 +105,6 @@
   10.17    #> Drule.zero_var_indexes;
   10.18  
   10.19  
   10.20 -(* forked proofs *)
   10.21 -
   10.22 -local
   10.23 -
   10.24 -val forked_proofs =
   10.25 -  Synchronized.var "forked_proofs"
   10.26 -    (Inttab.empty: Future.group list Inttab.table);
   10.27 -
   10.28 -fun register_forked id future =
   10.29 -  Synchronized.change forked_proofs
   10.30 -    (Inttab.cons_list (id, Task_Queue.group_of_task (Future.task_of future)));
   10.31 -
   10.32 -fun status task markups =
   10.33 -  let val props = Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]
   10.34 -  in Output.status (implode (map (Markup.markup_only o props) markups)) end;
   10.35 -
   10.36 -in
   10.37 -
   10.38 -fun fork_params {name, pos, pri} e =
   10.39 -  uninterruptible (fn _ => Position.setmp_thread_data pos (fn () =>
   10.40 -    let
   10.41 -      val id = the_default 0 (Position.parse_id pos);
   10.42 -
   10.43 -      val future =
   10.44 -        (singleton o Future.forks)
   10.45 -          {name = name, group = NONE, deps = [], pri = pri, interrupts = false}
   10.46 -          (fn () =>
   10.47 -            let
   10.48 -              val task = the (Future.worker_task ());
   10.49 -              val _ = status task [Markup.running];
   10.50 -              val result =
   10.51 -                Exn.capture (Future.interruptible_task e) ()
   10.52 -                |> Future.identify_result pos;
   10.53 -              val _ = status task [Markup.finished, Markup.joined];
   10.54 -              val _ =
   10.55 -                (case result of
   10.56 -                  Exn.Res _ => ()
   10.57 -                | Exn.Exn exn =>
   10.58 -                    if id = 0 orelse Exn.is_interrupt exn then ()
   10.59 -                    else
   10.60 -                      (status task [Markup.failed];
   10.61 -                       Output.report (Markup.markup_only Markup.bad);
   10.62 -                       List.app (Future.error_msg pos) (ML_Compiler.exn_messages_ids exn)));
   10.63 -            in Exn.release result end);
   10.64 -      val _ = status (Future.task_of future) [Markup.forked];
   10.65 -      val _ = register_forked id future;
   10.66 -    in future end)) ();
   10.67 -
   10.68 -fun fork pri e =
   10.69 -  fork_params {name = "Goal.fork", pos = Position.thread_data (), pri = pri} e;
   10.70 -
   10.71 -fun peek_futures id =
   10.72 -  Inttab.lookup_list (Synchronized.value forked_proofs) id;
   10.73 -
   10.74 -fun purge_futures ids =
   10.75 -  Synchronized.change forked_proofs (fn tab =>
   10.76 -    let
   10.77 -      val tab' = fold Inttab.delete_safe ids tab;
   10.78 -      val () =
   10.79 -        Inttab.fold (fn (id, groups) => fn () =>
   10.80 -          if Inttab.defined tab' id then ()
   10.81 -          else groups |> List.app (fn group =>
   10.82 -            if Task_Queue.is_canceled group then ()
   10.83 -            else raise Fail ("Attempt to purge valid execution " ^ string_of_int id))) tab ();
   10.84 -    in tab' end);
   10.85 -
   10.86 -fun reset_futures () =
   10.87 -  Synchronized.change_result forked_proofs (fn tab =>
   10.88 -    let val groups = Inttab.fold (fold cons o #2) tab []
   10.89 -    in (groups, Inttab.empty) end);
   10.90 -
   10.91 -fun shutdown_futures () =
   10.92 -  (Future.shutdown ();
   10.93 -    (case maps Task_Queue.group_status (reset_futures ()) of
   10.94 -      [] => ()
   10.95 -    | exns => raise Par_Exn.make exns));
   10.96 -
   10.97 -end;
   10.98 -
   10.99 -
  10.100  (* scheduling parameters *)
  10.101  
  10.102  fun skip_proofs_enabled () =
  10.103 @@ -307,9 +221,11 @@
  10.104              else err ("Proved a different theorem: " ^ string_of_term (Thm.prop_of res))
  10.105            end);
  10.106      val res =
  10.107 -      if immediate orelse schematic orelse not future orelse skip
  10.108 -      then result ()
  10.109 -      else future_result ctxt' (fork pri result) (Thm.term_of stmt);
  10.110 +      if immediate orelse schematic orelse not future orelse skip then result ()
  10.111 +      else
  10.112 +        future_result ctxt'
  10.113 +          (Execution.fork {name = "Goal.prove", pos = Position.thread_data (), pri = pri} result)
  10.114 +          (Thm.term_of stmt);
  10.115    in
  10.116      Conjunction.elim_balanced (length props) res
  10.117      |> map (Assumption.export false ctxt' ctxt)