central management of forked goals wrt. transaction id;
authorwenzelm
Sat, 01 Sep 2012 19:27:28 +0200
changeset 49061 7449b804073b
parent 49060 fa094e173cb9
child 49062 7e31dfd99ce7
central management of forked goals wrt. transaction id; clarified stable_exec_id: consider ragular failure as stable; more robust counting of forked proofs, with global reset after cancellation due to document editing;
src/Pure/Concurrent/par_exn.ML
src/Pure/PIDE/document.ML
src/Pure/PIDE/protocol.ML
src/Pure/goal.ML
--- a/src/Pure/Concurrent/par_exn.ML	Fri Aug 31 22:34:37 2012 +0200
+++ b/src/Pure/Concurrent/par_exn.ML	Sat Sep 01 19:27:28 2012 +0200
@@ -10,6 +10,7 @@
   val serial: exn -> serial * exn
   val make: exn list -> exn
   val dest: exn -> exn list option
+  val is_interrupted: 'a Exn.result list -> bool
   val release_all: 'a Exn.result list -> 'a list
   val release_first: 'a Exn.result list -> 'a list
 end;
@@ -53,6 +54,10 @@
 
 (* parallel results *)
 
+fun is_interrupted results =
+  exists (fn Exn.Exn _ => true | _ => false) results andalso
+    Exn.is_interrupt (make (map_filter Exn.get_exn results));
+
 fun release_all results =
   if forall (fn Exn.Res _ => true | _ => false) results
   then map Exn.release results
--- a/src/Pure/PIDE/document.ML	Fri Aug 31 22:34:37 2012 +0200
+++ b/src/Pure/PIDE/document.ML	Sat Sep 01 19:27:28 2012 +0200
@@ -359,10 +359,13 @@
     |> Toplevel.end_theory Position.none
     |> Thm.join_theory_proofs) ()));
 
-fun stable_command exec =
+fun stable_exec_id id =
+  not (Par_Exn.is_interrupted (Future.join_results (Goal.peek_futures id)));
+
+fun stable_exec exec =
   (case Exn.capture Command.memo_result exec of
     Exn.Exn exn => not (Exn.is_interrupt exn)
-  | Exn.Res ((st, _), _) => is_some (Exn.get_res (Exn.capture Toplevel.join_recent_proofs st)));
+  | Exn.Res _ => true);
 
 fun make_required nodes =
   let
@@ -423,7 +426,8 @@
             | SOME (exec_id, exec) =>
                 (case lookup_entry node0 id of
                   NONE => false
-                | SOME (exec_id0, _) => exec_id = exec_id0 andalso stable_command exec));
+                | SOME (exec_id0, _) =>
+                    exec_id = exec_id0 andalso stable_exec_id exec_id andalso stable_exec exec));
         in SOME (same', (prev, flags')) end
       else NONE;
     val (same, (common, flags)) =
--- a/src/Pure/PIDE/protocol.ML	Fri Aug 31 22:34:37 2012 +0200
+++ b/src/Pure/PIDE/protocol.ML	Sat Sep 01 19:27:28 2012 +0200
@@ -59,6 +59,7 @@
               in pair int (list (pair int (option int))) end
               |> YXML.string_of_body);
 
+        val _ = Goal.cancel_futures ();
         val _ = Document.start_execution state';
       in state' end));
 
--- a/src/Pure/goal.ML	Fri Aug 31 22:34:37 2012 +0200
+++ b/src/Pure/goal.ML	Sat Sep 01 19:27:28 2012 +0200
@@ -1,7 +1,7 @@
 (*  Title:      Pure/goal.ML
     Author:     Makarius
 
-Goals in tactical theorem proving.
+Goals in tactical theorem proving, with support for forked proofs.
 *)
 
 signature BASIC_GOAL =
@@ -26,6 +26,8 @@
   val norm_result: thm -> thm
   val fork_name: string -> (unit -> 'a) -> 'a future
   val fork: (unit -> 'a) -> 'a future
+  val peek_futures: serial -> unit future list
+  val cancel_futures: unit -> unit
   val future_enabled_level: int -> bool
   val future_enabled: unit -> bool
   val future_enabled_nested: int -> bool
@@ -109,25 +111,42 @@
 
 (* forked proofs *)
 
-val forked_proofs = Synchronized.var "forked_proofs" 0;
+local
 
-fun forked i =
-  Synchronized.change forked_proofs (fn m =>
+val forked_proofs =
+  Synchronized.var "forked_proofs"
+    (0, []: Future.group list, Inttab.empty: unit future list Inttab.table);
+
+fun count_forked i =
+  Synchronized.change forked_proofs (fn (m, groups, tab) =>
     let
       val n = m + i;
       val _ =
         Multithreading.tracing 2 (fn () =>
           ("PROOFS " ^ Time.toString (Time.now ()) ^ ": " ^ string_of_int n));
-    in n end);
+    in (n, groups, tab) end);
+
+fun register_forked id future =
+  Synchronized.change forked_proofs (fn (m, groups, tab) =>
+    let
+      val groups' = Task_Queue.group_of_task (Future.task_of future) :: groups;
+      val tab' = Inttab.cons_list (id, Future.map (K ()) future) tab;
+    in (m, groups', tab') end);
 
 fun status task markups =
   let val props = Markup.properties [(Isabelle_Markup.taskN, Task_Queue.str_of_task task)]
   in Output.status (implode (map (Markup.markup_only o props) markups)) end;
 
+in
+
 fun fork_name name e =
   uninterruptible (fn _ => fn () =>
     let
-      val _ = forked 1;
+      val id =
+        (case Position.get_id (Position.thread_data ()) of
+          NONE => 0
+        | SOME id => Markup.parse_int id);
+      val _ = count_forked 1;
       val future =
         (singleton o Future.forks)
           {name = name, group = NONE, deps = [], pri = ~1, interrupts = false}
@@ -144,12 +163,25 @@
                     (status task [Isabelle_Markup.failed];
                      Output.report (Markup.markup_only Isabelle_Markup.bad);
                      Output.error_msg (ML_Compiler.exn_message exn)));
+              val _ = count_forked ~1;
             in Exn.release result end);
       val _ = status (Future.task_of future) [Isabelle_Markup.forked];
+      val _ = register_forked id future;
     in future end) ();
 
 fun fork e = fork_name "Goal.fork" e;
 
+fun forked_count () = #1 (Synchronized.value forked_proofs);
+
+fun peek_futures id =
+  Inttab.lookup_list (#3 (Synchronized.value forked_proofs)) id;
+
+fun cancel_futures () =
+  Synchronized.change forked_proofs (fn (m, groups, tab) =>
+    (List.app Future.cancel_group groups; (0, [], Inttab.empty)));
+
+end;
+
 
 (* scheduling parameters *)
 
@@ -164,8 +196,7 @@
 
 fun future_enabled_nested n =
   future_enabled_level n andalso
-  Synchronized.value forked_proofs <
-    ! parallel_proofs_threshold * Multithreading.max_threads_value ();
+  forked_count () < ! parallel_proofs_threshold * Multithreading.max_threads_value ();
 
 
 (* future_result *)