strictly monotonic Document.update: avoid disruptive cancel_execution, merely discontinue_execution and cancel/terminate old execs individually;
authorwenzelm
Thu, 11 Jul 2013 18:41:05 +0200 (2013-07-11)
changeset 52602 00170ef1dc39
parent 52601 55e62a25a7ce
child 52603 a44e9a1d5d8b
strictly monotonic Document.update: avoid disruptive cancel_execution, merely discontinue_execution and cancel/terminate old execs individually;
src/Pure/PIDE/command.ML
src/Pure/PIDE/document.ML
src/Pure/PIDE/exec.ML
src/Pure/PIDE/protocol.ML
--- a/src/Pure/PIDE/command.ML	Thu Jul 11 16:35:37 2013 +0200
+++ b/src/Pure/PIDE/command.ML	Thu Jul 11 18:41:05 2013 +0200
@@ -54,10 +54,7 @@
                 let
                   val _ = Exec.running exec_id;
                   val res = Exn.capture (restore_attributes e) ();
-                  val _ =
-                    if Exn.is_interrupt_exn res
-                    then Exec.canceled exec_id
-                    else Exec.finished exec_id;
+                  val _ = Exec.finished exec_id (not (Exn.is_interrupt_exn res));
                 in SOME (res, Result res) end) ())
       |> (fn NONE => error ("Concurrent execution attempt: " ^ Document_ID.print exec_id)
            | SOME res => res))
--- a/src/Pure/PIDE/document.ML	Thu Jul 11 16:35:37 2013 +0200
+++ b/src/Pure/PIDE/document.ML	Thu Jul 11 18:41:05 2013 +0200
@@ -314,7 +314,6 @@
   execution_of #> (fn {continue, ...} => Synchronized.change continue (K false));
 
 val cancel_execution = execution_of #> (fn {group, ...} => Future.cancel_group group);
-val terminate_execution = execution_of #> (fn {group, ...} => Future.terminate group);
 
 fun start_execution state =
   let
@@ -462,6 +461,11 @@
       val assign_update' = assign_update_new (command_id', SOME exec') assign_update;
       val init' = if Keyword.is_theory_begin command_name then NONE else init;
     in SOME (assign_update', (command_id', (eval', prints')), init') end;
+
+fun cancel_old_execs node0 (command_id, exec_ids) =
+  subtract (op =) exec_ids (Command.exec_ids (lookup_entry node0 command_id))
+  |> map_filter (Exec.peek_running #> Option.map (tap Future.cancel_group));
+
 in
 
 fun update old_version_id new_version_id edits state =
@@ -472,7 +476,6 @@
     val nodes = nodes_of new_version;
     val is_required = make_required nodes;
 
-    val _ = timeit "Document.terminate_execution" (fn () => terminate_execution state);
     val updated = timeit "Document.update" (fn () =>
       nodes |> String_Graph.schedule
         (fn deps => fn (name, node) =>
@@ -482,7 +485,7 @@
             (fn () =>
               let
                 val imports = map (apsnd Future.join) deps;
-                val imports_changed = exists (#3 o #1 o #2) imports;
+                val imports_changed = exists (#4 o #1 o #2) imports;
                 val node_required = is_required name;
               in
                 if imports_changed orelse AList.defined (op =) edits name orelse
@@ -521,22 +524,29 @@
                       if is_none last_exec orelse is_some (after_entry node last_exec) then NONE
                       else SOME eval';
 
+                    val assign_update = assign_update_result assigned_execs;
+                    val old_groups = maps (cancel_old_execs node0) assign_update;
+
                     val node' = node
                       |> assign_update_apply assigned_execs
                       |> entries_stable (assign_update_null updated_execs)
                       |> set_result result;
                     val assigned_node = SOME (name, node');
                     val result_changed = changed_result node0 node';
-                  in
-                    ((assign_update_result assigned_execs, assigned_node, result_changed), node')
-                  end
-                else (([], NONE, false), node)
+                  in ((assign_update, old_groups, assigned_node, result_changed), node') end
+                else (([], [], NONE, false), node)
               end))
       |> Future.joins |> map #1);
 
+    val assign_update = maps #1 updated;
+    val old_groups = maps #2 updated;
+    val assigned_nodes = map_filter #3 updated;
+
     val state' = state
-      |> define_version new_version_id (fold put_node (map_filter #2 updated) new_version);
-  in (maps #1 updated, state') end;
+      |> define_version new_version_id (fold put_node assigned_nodes new_version);
+
+    val _ = timeit "Document.terminate" (fn () => List.app Future.terminate old_groups);
+  in (assign_update, state') end;
 
 end;
 
--- a/src/Pure/PIDE/exec.ML	Thu Jul 11 16:35:37 2013 +0200
+++ b/src/Pure/PIDE/exec.ML	Thu Jul 11 18:41:05 2013 +0200
@@ -6,39 +6,47 @@
 
 signature EXEC =
 sig
+  val running: Document_ID.exec -> unit
+  val finished: Document_ID.exec -> bool -> unit
   val is_stable: Document_ID.exec -> bool
-  val running: Document_ID.exec -> unit
-  val finished: Document_ID.exec -> unit
-  val canceled: Document_ID.exec -> unit
+  val peek_running: Document_ID.exec -> Future.group option
   val purge_unstable: unit -> unit
 end;
 
 structure Exec: EXEC =
 struct
 
-val running_execs =
-  Synchronized.var "Exec.running_execs" (Inttab.empty: {stable: bool} Inttab.table);
+type status = Future.group option;  (*SOME group: running, NONE: canceled/unstable*)
+val execs = Synchronized.var "Exec.execs" (Inttab.empty: status Inttab.table);
+
+fun running exec_id =
+  let
+    val group =
+      (case Future.worker_group () of
+        SOME group => group
+      | NONE => error ("Missing worker thread context for execution " ^ Document_ID.print exec_id));
+  in Synchronized.change execs (Inttab.update_new (exec_id, SOME group)) end;
+
+fun finished exec_id stable =
+  Synchronized.change execs
+    (if stable then Inttab.delete exec_id else Inttab.update (exec_id, NONE));
 
 fun is_stable exec_id =
   not (Par_Exn.is_interrupted (Future.join_results (Goal.peek_futures exec_id))) andalso
-  (case Inttab.lookup (Synchronized.value running_execs) exec_id of
+  (case Inttab.lookup (Synchronized.value execs) exec_id of
     NONE => true
-  | SOME {stable} => stable);
+  | SOME status => is_some status);
 
-fun running exec_id =
-  Synchronized.change running_execs (Inttab.update_new (exec_id, {stable = true}));
-
-fun finished exec_id =
-  Synchronized.change running_execs (Inttab.delete exec_id);
-
-fun canceled exec_id =
-  Synchronized.change running_execs (Inttab.update (exec_id, {stable = false}));
+fun peek_running exec_id =
+  (case Inttab.lookup (Synchronized.value execs) exec_id of
+    SOME (SOME group) => SOME group
+  | _ => NONE);
 
 fun purge_unstable () =
-  Synchronized.guarded_access running_execs
+  Synchronized.guarded_access execs
     (fn tab =>
       let
-        val unstable = Inttab.fold (fn (exec_id, {stable = false}) => cons exec_id | _ => I) tab [];
+        val unstable = Inttab.fold (fn (exec_id, NONE) => cons exec_id | _ => I) tab [];
         val tab' = fold Inttab.delete unstable tab;
       in SOME ((), tab') end);
 
--- a/src/Pure/PIDE/protocol.ML	Thu Jul 11 16:35:37 2013 +0200
+++ b/src/Pure/PIDE/protocol.ML	Thu Jul 11 18:41:05 2013 +0200
@@ -27,7 +27,7 @@
   Isabelle_Process.protocol_command "Document.update"
     (fn [old_id_string, new_id_string, edits_yxml] => Document.change_state (fn state =>
       let
-        val _ = Document.cancel_execution state;
+        val _ = Document.discontinue_execution state;
 
         val old_id = Document_ID.parse old_id_string;
         val new_id = Document_ID.parse new_id_string;