maintain goal forks as part of global execution;
authorwenzelm
Sun, 25 Aug 2013 20:32:26 +0200
changeset 53192 04df1d236e1c
parent 53191 14ab2f821e1d
child 53193 2ddc5e788f7c
maintain goal forks as part of global execution; tuned;
src/Pure/Isar/proof.ML
src/Pure/Isar/toplevel.ML
src/Pure/PIDE/command.ML
src/Pure/PIDE/execution.ML
src/Pure/PIDE/protocol.ML
src/Pure/ROOT.ML
src/Pure/System/isabelle_process.ML
src/Pure/System/session.ML
src/Pure/Thy/thy_info.ML
src/Pure/goal.ML
--- a/src/Pure/Isar/proof.ML	Sun Aug 25 17:17:48 2013 +0200
+++ b/src/Pure/Isar/proof.ML	Sun Aug 25 20:32:26 2013 +0200
@@ -1170,7 +1170,7 @@
         val pos = Position.thread_data ();
         val props = Markup.command_timing :: (Markup.nameN, "by") :: Position.properties_of pos;
       in
-        Goal.fork_params {name = "Proof.future_terminal_proof", pos = pos, pri = ~1}
+        Execution.fork {name = "Proof.future_terminal_proof", pos = pos, pri = ~1}
           (fn () => ((), Timing.protocol props proof2 state'))
       end) |> snd |> done
   else proof1 state;
--- a/src/Pure/Isar/toplevel.ML	Sun Aug 25 17:17:48 2013 +0200
+++ b/src/Pure/Isar/toplevel.ML	Sun Aug 25 20:32:26 2013 +0200
@@ -708,7 +708,7 @@
   let
     val st' =
       if Goal.future_enabled 1 andalso Keyword.is_diag (name_of tr) then
-        (Goal.fork_params
+        (Execution.fork
           {name = "Toplevel.diag", pos = pos_of tr,
             pri = priority (timing_estimate true (Thy_Syntax.atom tr))}
           (fn () => command tr st); st)
@@ -736,7 +736,7 @@
 
             val future_proof =
               Proof.future_proof (fn state =>
-                Goal.fork_params
+                Execution.fork
                   {name = "Toplevel.future_proof", pos = pos_of head_tr, pri = priority estimate}
                   (fn () =>
                     let
--- a/src/Pure/PIDE/command.ML	Sun Aug 25 17:17:48 2013 +0200
+++ b/src/Pure/PIDE/command.ML	Sun Aug 25 20:32:26 2013 +0200
@@ -131,7 +131,7 @@
 
 fun run int tr st =
   if Goal.future_enabled 1 andalso Keyword.is_diag (Toplevel.name_of tr) then
-    (Goal.fork_params {name = "Toplevel.diag", pos = Toplevel.pos_of tr, pri = ~1}
+    (Execution.fork {name = "Toplevel.diag", pos = Toplevel.pos_of tr, pri = ~1}
       (fn () => Toplevel.command_exception int tr st); ([], SOME st))
   else Toplevel.command_errors int tr st;
 
--- a/src/Pure/PIDE/execution.ML	Sun Aug 25 17:17:48 2013 +0200
+++ b/src/Pure/PIDE/execution.ML	Sun Aug 25 20:32:26 2013 +0200
@@ -2,19 +2,23 @@
     Author:     Makarius
 
 Global management of execution.  Unique running execution serves as
-barrier for further exploration of exec fragments.
+barrier for further exploration of forked command execs.
 *)
 
 signature EXECUTION =
 sig
-  val reset: unit -> unit
   val start: unit -> Document_ID.execution
   val discontinue: unit -> unit
   val is_running: Document_ID.execution -> bool
   val is_running_exec: Document_ID.exec -> bool
   val running: Document_ID.execution -> Document_ID.exec -> bool
+  val peek: Document_ID.exec -> Future.group list
   val cancel: Document_ID.exec -> unit
   val terminate: Document_ID.exec -> unit
+  val fork: {name: string, pos: Position.T, pri: int} -> (unit -> 'a) -> 'a future
+  val purge: Document_ID.exec list -> unit
+  val reset: unit -> Future.group list
+  val shutdown: unit -> unit
 end;
 
 structure Execution: EXECUTION =
@@ -22,50 +26,129 @@
 
 (* global state *)
 
-type state = Document_ID.execution * Future.group Inttab.table;
+datatype state = State of
+  {execution: Document_ID.execution,  (*overall document execution*)
+   execs: Future.group list Inttab.table};  (*command execs with registered forks*)
 
-val init_state: state = (Document_ID.none, Inttab.empty);
+fun make_state (execution, execs) = State {execution = execution, execs = execs};
+fun map_state f (State {execution, execs}) = make_state (f (execution, execs));
+
+val init_state = make_state (Document_ID.none, Inttab.empty);
 val state = Synchronized.var "Execution.state" init_state;
 
+fun get_state () = let val State args = Synchronized.value state in args end;
+fun change_state f = Synchronized.change state (map_state f);
+
 
 (* unique running execution *)
 
-fun reset () = Synchronized.change state (K init_state);
-
 fun start () =
   let
     val execution_id = Document_ID.make ();
-    val _ = Synchronized.change state (apfst (K execution_id));
+    val _ = change_state (apfst (K execution_id));
   in execution_id end;
 
-fun discontinue () =
-  Synchronized.change state (apfst (K Document_ID.none));
+fun discontinue () = change_state (apfst (K Document_ID.none));
 
-fun is_running execution_id = execution_id = fst (Synchronized.value state);
+fun is_running execution_id = execution_id = #execution (get_state ());
 
 
-(* registered execs *)
+(* execs *)
 
 fun is_running_exec exec_id =
-  Inttab.defined (snd (Synchronized.value state)) exec_id;
+  Inttab.defined (#execs (get_state ())) exec_id;
 
 fun running execution_id exec_id =
   Synchronized.guarded_access state
-    (fn (execution_id', execs) =>
+    (fn State {execution = execution_id', execs} =>
       let
         val continue = execution_id = execution_id';
         val execs' =
           if continue then
-            Inttab.update_new (exec_id, Future.the_worker_group ()) execs
-              handle Inttab.DUP dup => error ("Duplicate execution " ^ Document_ID.print dup)
+            Inttab.update_new (exec_id, [Future.the_worker_group ()]) execs
+              handle Inttab.DUP dup =>
+                error ("Execution already registered: " ^ Document_ID.print dup)
           else execs;
-      in SOME (continue, (execution_id', execs')) end);
+      in SOME (continue, make_state (execution_id', execs')) end);
+
+fun peek exec_id =
+  Inttab.lookup_list (#execs (get_state ())) exec_id;
+
+fun cancel exec_id = List.app Future.cancel_group (peek exec_id);
+fun terminate exec_id = List.app Future.terminate (peek exec_id);
+
+
+(* fork *)
+
+local
+
+fun status task markups =
+  let val props = Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]
+  in Output.status (implode (map (Markup.markup_only o props) markups)) end;
+
+fun register exec =
+  change_state (fn (execution, execs) => (execution, Inttab.cons_list exec execs));
+
+in
 
-fun peek_list exec_id =
-  the_list (Inttab.lookup (snd (Synchronized.value state)) exec_id);
+fun fork {name, pos, pri} e =
+  uninterruptible (fn _ => Position.setmp_thread_data pos (fn () =>
+    let
+      val exec_id = the_default 0 (Position.parse_id pos);
+      val group = Future.worker_subgroup ();
+      val _ = register (exec_id, group);
 
-fun cancel exec_id = List.app Future.cancel_group (peek_list exec_id);
-fun terminate exec_id = List.app Future.terminate (peek_list exec_id);
+      val future =
+        (singleton o Future.forks)
+          {name = name, group = SOME group, deps = [], pri = pri, interrupts = false}
+          (fn () =>
+            let
+              val task = the (Future.worker_task ());
+              val _ = status task [Markup.running];
+              val result =
+                Exn.capture (Future.interruptible_task e) ()
+                |> Future.identify_result pos;
+              val _ = status task [Markup.finished, Markup.joined];
+              val _ =
+                (case result of
+                  Exn.Res _ => ()
+                | Exn.Exn exn =>
+                    if exec_id = 0 orelse Exn.is_interrupt exn then ()
+                    else
+                      (status task [Markup.failed];
+                       Output.report (Markup.markup_only Markup.bad);
+                       List.app (Future.error_msg pos) (ML_Compiler.exn_messages_ids exn)));
+            in Exn.release result end);
+      val _ = status (Future.task_of future) [Markup.forked];
+    in future end)) ();
 
 end;
 
+
+(* cleanup *)
+
+fun purge exec_ids =
+  (change_state o apsnd) (fn execs =>
+    let
+      val execs' = fold Inttab.delete_safe exec_ids execs;
+      val () =
+        (execs', ()) |-> Inttab.fold (fn (exec_id, groups) => fn () =>
+          if Inttab.defined execs' exec_id then ()
+          else groups |> List.app (fn group =>
+            if Task_Queue.is_canceled group then ()
+            else error ("Attempt to purge valid execution: " ^ Document_ID.print exec_id)));
+    in execs' end);
+
+fun reset () =
+  Synchronized.change_result state (fn State {execs, ...} =>
+    let val groups = Inttab.fold (append o #2) execs []
+    in (groups, init_state) end);
+
+fun shutdown () =
+  (Future.shutdown ();
+    (case maps Task_Queue.group_status (reset ()) of
+      [] => ()
+    | exns => raise Par_Exn.make exns));
+
+end;
+
--- a/src/Pure/PIDE/protocol.ML	Sun Aug 25 17:17:48 2013 +0200
+++ b/src/Pure/PIDE/protocol.ML	Sun Aug 25 20:32:26 2013 +0200
@@ -67,7 +67,7 @@
 
         val (removed, assign_update, state') = Document.update old_id new_id edits state;
         val _ = List.app Execution.terminate removed;
-        val _ = Goal.purge_futures removed;
+        val _ = Execution.purge removed;
         val _ = List.app Isabelle_Process.reset_tracing removed;
 
         val _ =
--- a/src/Pure/ROOT.ML	Sun Aug 25 17:17:48 2013 +0200
+++ b/src/Pure/ROOT.ML	Sun Aug 25 20:32:26 2013 +0200
@@ -185,6 +185,9 @@
 if ML_System.is_polyml then use "ML/ml_compiler_polyml.ML" else ();
 if ML_System.name = "polyml-5.5.1" then use "ML/exn_trace_polyml-5.5.1.ML" else ();
 
+(*global execution*)
+use "PIDE/document_id.ML";
+use "PIDE/execution.ML";
 use "skip_proof.ML";
 use "goal.ML";
 
@@ -257,7 +260,6 @@
 
 (*toplevel transactions*)
 use "Isar/proof_node.ML";
-use "PIDE/document_id.ML";
 use "Isar/toplevel.ML";
 
 (*theory documents*)
@@ -267,7 +269,6 @@
 use "Isar/outer_syntax.ML";
 use "General/graph_display.ML";
 use "Thy/present.ML";
-use "PIDE/execution.ML";
 use "PIDE/command.ML";
 use "PIDE/query_operation.ML";
 use "Thy/thy_load.ML";
--- a/src/Pure/System/isabelle_process.ML	Sun Aug 25 17:17:48 2013 +0200
+++ b/src/Pure/System/isabelle_process.ML	Sun Aug 25 20:32:26 2013 +0200
@@ -173,7 +173,7 @@
       | exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true);
   in
     if continue then loop channel
-    else (Future.shutdown (); Goal.reset_futures (); Execution.reset ())
+    else (Future.shutdown (); Execution.reset (); ())
   end;
 
 end;
--- a/src/Pure/System/session.ML	Sun Aug 25 17:17:48 2013 +0200
+++ b/src/Pure/System/session.ML	Sun Aug 25 20:32:26 2013 +0200
@@ -51,7 +51,7 @@
 (* finish *)
 
 fun finish () =
- (Goal.shutdown_futures ();
+ (Execution.shutdown ();
   Thy_Info.finish ();
   Present.finish ();
   Outer_Syntax.check_syntax ();
--- a/src/Pure/Thy/thy_info.ML	Sun Aug 25 17:17:48 2013 +0200
+++ b/src/Pure/Thy/thy_info.ML	Sun Aug 25 20:32:26 2013 +0200
@@ -176,7 +176,7 @@
 
 fun join_theory (Result {theory, id, ...}) =
   Exn.capture Thm.join_theory_proofs theory ::
-  map Exn.Exn (maps Task_Queue.group_status (Goal.peek_futures id));
+  map Exn.Exn (maps Task_Queue.group_status (Execution.peek id));
 
 
 datatype task =
@@ -235,7 +235,7 @@
       |> map (fn future => Exn.capture (fn () => result_commit (Future.join future) ()) ());
 
     (* FIXME avoid global reset_futures (!??) *)
-    val results4 = map Exn.Exn (maps Task_Queue.group_status (Goal.reset_futures ()));
+    val results4 = map Exn.Exn (maps Task_Queue.group_status (Execution.reset ()));
 
     val _ = Par_Exn.release_all (results1 @ results2 @ results3 @ results4);
   in () end);
--- a/src/Pure/goal.ML	Sun Aug 25 17:17:48 2013 +0200
+++ b/src/Pure/goal.ML	Sun Aug 25 20:32:26 2013 +0200
@@ -24,12 +24,6 @@
   val check_finished: Proof.context -> thm -> thm
   val finish: Proof.context -> thm -> thm
   val norm_result: thm -> thm
-  val fork_params: {name: string, pos: Position.T, pri: int} -> (unit -> 'a) -> 'a future
-  val fork: int -> (unit -> 'a) -> 'a future
-  val peek_futures: int -> Future.group list
-  val purge_futures: int list -> unit
-  val reset_futures: unit -> Future.group list
-  val shutdown_futures: unit -> unit
   val skip_proofs_enabled: unit -> bool
   val future_enabled: int -> bool
   val future_enabled_timing: Time.time -> bool
@@ -111,86 +105,6 @@
   #> Drule.zero_var_indexes;
 
 
-(* forked proofs *)
-
-local
-
-val forked_proofs =
-  Synchronized.var "forked_proofs"
-    (Inttab.empty: Future.group list Inttab.table);
-
-fun register_forked id future =
-  Synchronized.change forked_proofs
-    (Inttab.cons_list (id, Task_Queue.group_of_task (Future.task_of future)));
-
-fun status task markups =
-  let val props = Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]
-  in Output.status (implode (map (Markup.markup_only o props) markups)) end;
-
-in
-
-fun fork_params {name, pos, pri} e =
-  uninterruptible (fn _ => Position.setmp_thread_data pos (fn () =>
-    let
-      val id = the_default 0 (Position.parse_id pos);
-
-      val future =
-        (singleton o Future.forks)
-          {name = name, group = NONE, deps = [], pri = pri, interrupts = false}
-          (fn () =>
-            let
-              val task = the (Future.worker_task ());
-              val _ = status task [Markup.running];
-              val result =
-                Exn.capture (Future.interruptible_task e) ()
-                |> Future.identify_result pos;
-              val _ = status task [Markup.finished, Markup.joined];
-              val _ =
-                (case result of
-                  Exn.Res _ => ()
-                | Exn.Exn exn =>
-                    if id = 0 orelse Exn.is_interrupt exn then ()
-                    else
-                      (status task [Markup.failed];
-                       Output.report (Markup.markup_only Markup.bad);
-                       List.app (Future.error_msg pos) (ML_Compiler.exn_messages_ids exn)));
-            in Exn.release result end);
-      val _ = status (Future.task_of future) [Markup.forked];
-      val _ = register_forked id future;
-    in future end)) ();
-
-fun fork pri e =
-  fork_params {name = "Goal.fork", pos = Position.thread_data (), pri = pri} e;
-
-fun peek_futures id =
-  Inttab.lookup_list (Synchronized.value forked_proofs) id;
-
-fun purge_futures ids =
-  Synchronized.change forked_proofs (fn tab =>
-    let
-      val tab' = fold Inttab.delete_safe ids tab;
-      val () =
-        Inttab.fold (fn (id, groups) => fn () =>
-          if Inttab.defined tab' id then ()
-          else groups |> List.app (fn group =>
-            if Task_Queue.is_canceled group then ()
-            else raise Fail ("Attempt to purge valid execution " ^ string_of_int id))) tab ();
-    in tab' end);
-
-fun reset_futures () =
-  Synchronized.change_result forked_proofs (fn tab =>
-    let val groups = Inttab.fold (fold cons o #2) tab []
-    in (groups, Inttab.empty) end);
-
-fun shutdown_futures () =
-  (Future.shutdown ();
-    (case maps Task_Queue.group_status (reset_futures ()) of
-      [] => ()
-    | exns => raise Par_Exn.make exns));
-
-end;
-
-
 (* scheduling parameters *)
 
 fun skip_proofs_enabled () =
@@ -307,9 +221,11 @@
             else err ("Proved a different theorem: " ^ string_of_term (Thm.prop_of res))
           end);
     val res =
-      if immediate orelse schematic orelse not future orelse skip
-      then result ()
-      else future_result ctxt' (fork pri result) (Thm.term_of stmt);
+      if immediate orelse schematic orelse not future orelse skip then result ()
+      else
+        future_result ctxt'
+          (Execution.fork {name = "Goal.prove", pos = Position.thread_data (), pri = pri} result)
+          (Thm.term_of stmt);
   in
     Conjunction.elim_balanced (length props) res
     |> map (Assumption.export false ctxt' ctxt)