more explicit type Exec.context;
authorwenzelm
Thu, 11 Jul 2013 23:24:40 +0200
changeset 52604 ff2f0818aebc
parent 52603 a44e9a1d5d8b
child 52605 a2a805549c74
more explicit type Exec.context; eliminated obsolete execution group -- NB: cancelation happens individually for registered execs;
src/Pure/PIDE/command.ML
src/Pure/PIDE/document.ML
src/Pure/PIDE/exec.ML
src/Pure/PIDE/protocol.ML
--- a/src/Pure/PIDE/command.ML	Thu Jul 11 22:53:56 2013 +0200
+++ b/src/Pure/PIDE/command.ML	Thu Jul 11 23:24:40 2013 +0200
@@ -20,7 +20,7 @@
   type exec = eval * print list
   val no_exec: exec
   val exec_ids: exec option -> Document_ID.exec list
-  val exec: exec -> unit
+  val exec: Exec.context -> exec -> unit
 end;
 
 structure Command: COMMAND =
@@ -43,32 +43,31 @@
     Result res => Exn.release res
   | Expr (exec_id, _) => error ("Unfinished execution result: " ^ Document_ID.print exec_id));
 
-fun memo_exec (Memo v) =
+fun memo_exec context (Memo v) =
   (case Synchronized.value v of
     Result res => res
-  | Expr (exec_id, _) =>
+  | Expr _ =>
       Synchronized.timed_access v (fn _ => SOME Time.zeroTime)
         (fn Result res => SOME (res, Result res)
-          | Expr (exec_id, e) =>
+          | expr as Expr (exec_id, e) =>
               uninterruptible (fn restore_attributes => fn () =>
-                let
-                  val _ = Exec.running exec_id;
-                  val res = Exn.capture (restore_attributes e) ();
-                  val _ = Exec.finished exec_id (not (Exn.is_interrupt_exn res));
-                in SOME (res, Result res) end) ())
-      |> (fn NONE => error ("Concurrent execution attempt: " ^ Document_ID.print exec_id)
-           | SOME res => res))
-  |> Exn.release;
+                if Exec.running context exec_id then
+                  let
+                    val res = Exn.capture (restore_attributes e) ();
+                    val _ = Exec.finished exec_id (not (Exn.is_interrupt_exn res));
+                  in SOME (res, Result res) end
+                else SOME (Exn.interrupt_exn, expr)) ())
+      |> (fn NONE => error "Concurrent execution attempt" | SOME res => res))
+  |> Exn.release |> ignore;
 
-fun memo_fork params (Memo v) =
+fun memo_fork params context (Memo v) =
   (case Synchronized.value v of
     Result _ => ()
-  | _ => ignore ((singleton o Future.forks) params (fn () => memo_exec (Memo v))));
+  | _ => ignore ((singleton o Future.forks) params (fn () => memo_exec context (Memo v))));
 
 end;
 
 
-
 (** main phases of execution **)
 
 (* read *)
@@ -277,15 +276,15 @@
 
 local
 
-fun run_print (Print {name, pri, print_process, ...}) =
+fun run_print context (Print {name, pri, print_process, ...}) =
   (if Multithreading.enabled () then
     memo_fork {name = name, group = NONE, deps = [], pri = pri, interrupts = true}
-  else memo_exec) print_process;
+  else memo_exec) context print_process;
 
 in
 
-fun exec (Eval {eval_process, ...}, prints) =
-  (memo_exec eval_process; List.app run_print prints);
+fun exec context (Eval {eval_process, ...}, prints) =
+  (memo_exec context eval_process; List.app (run_print context) prints);
 
 end;
 
--- a/src/Pure/PIDE/document.ML	Thu Jul 11 22:53:56 2013 +0200
+++ b/src/Pure/PIDE/document.ML	Thu Jul 11 23:24:40 2013 +0200
@@ -202,27 +202,14 @@
 end;
 
 
-(* associated execution *)
-
-type execution =
- {version_id: Document_ID.version,
-  group: Future.group,
-  continue: bool Synchronized.var};
-
-val no_execution =
- {version_id = Document_ID.none,
-  group = Future.new_group NONE,
-  continue = Synchronized.var "continue" false};
-
-fun make_execution version_id =
- {version_id = version_id,
-  group = Future.new_group NONE,
-  continue = Synchronized.var "continue" true};
-
-
 
 (** main state -- document structure and execution process **)
 
+type execution = {version_id: Document_ID.version, context: Exec.context};
+
+val no_execution = {version_id = Document_ID.none, context = Exec.no_context};
+fun make_execution version_id = {version_id = version_id, context = Exec.fresh_context ()};
+
 abstype state = State of
  {versions: version Inttab.table,  (*version id -> document content*)
   commands: (string * Token.T list lazy) Inttab.table,  (*command id -> named command span*)
@@ -307,34 +294,35 @@
   in (versions', commands', execution) end);
 
 
-
-(** document execution **)
+(* document execution *)
 
-val discontinue_execution =
-  execution_of #> (fn {continue, ...} => Synchronized.change continue (K false));
+fun discontinue_execution state =
+  Exec.drop_context (#context (execution_of state));
 
-val cancel_execution = execution_of #> (fn {group, ...} => Future.cancel_group group);
+fun cancel_execution state =
+  (Exec.drop_context (#context (execution_of state)); Exec.terminate_all ());
 
 fun start_execution state =
   let
-    val {version_id, group, continue} = execution_of state;
-    fun running () = Synchronized.guarded_access continue (fn x => SOME (x, x));
+    val {version_id, context} = execution_of state;
     val _ =
-      if not (running ()) orelse Task_Queue.is_canceled group then []
-      else
+      if Exec.is_running context then
         nodes_of (the_version state version_id) |> String_Graph.schedule
           (fn deps => fn (name, node) =>
             if not (visible_node node) andalso finished_theory node then
               Future.value ()
             else
               (singleton o Future.forks)
-                {name = "theory:" ^ name, group = SOME (Future.new_group (SOME group)),
+                {name = "theory:" ^ name, group = SOME (Future.new_group NONE),
                   deps = map (Future.task_of o #2) deps, pri = ~2, interrupts = false}
                 (fn () =>
                   iterate_entries (fn (_, opt_exec) => fn () =>
                     (case opt_exec of
-                      SOME exec => if running () then SOME (Command.exec exec) else NONE
-                    | NONE => NONE)) node ()));
+                      SOME exec =>
+                        if Exec.is_running context then SOME (Command.exec context exec)
+                        else NONE
+                    | NONE => NONE)) node ()))
+      else [];
   in () end;
 
 
--- a/src/Pure/PIDE/exec.ML	Thu Jul 11 22:53:56 2013 +0200
+++ b/src/Pure/PIDE/exec.ML	Thu Jul 11 23:24:40 2013 +0200
@@ -6,49 +6,95 @@
 
 signature EXEC =
 sig
-  val running: Document_ID.exec -> unit
+  type context
+  val no_context: context
+  val drop_context: context -> unit
+  val fresh_context: unit -> context
+  val is_running: context -> bool
+  val running: context -> Document_ID.exec -> bool
   val finished: Document_ID.exec -> bool -> unit
   val is_stable: Document_ID.exec -> bool
   val peek_running: Document_ID.exec -> Future.group option
-  val purge_unstable: unit -> unit
+  val purge_canceled: unit -> unit
+  val terminate_all: unit -> unit
 end;
 
 structure Exec: EXEC =
 struct
 
-type status = Future.group option;  (*SOME group: running, NONE: canceled/unstable*)
-val execs = Synchronized.var "Exec.execs" (Inttab.empty: status Inttab.table);
+(* global state *)
+
+datatype context = Context of Document_ID.generic;
+val no_context = Context Document_ID.none;
+
+type status = Future.group option;  (*SOME group: running, NONE: canceled*)
+val state = Synchronized.var "Exec.state" (no_context, Inttab.empty: status Inttab.table);
 
-fun running exec_id =
+
+(* unique execution context *)
+
+fun drop_context context =
+  Synchronized.change state
+    (apfst (fn context' => if context = context' then no_context else context'));
+
+fun fresh_context () =
   let
-    val group =
-      (case Future.worker_group () of
-        SOME group => group
-      | NONE => error ("Missing worker thread context for execution " ^ Document_ID.print exec_id));
-  in Synchronized.change execs (Inttab.update_new (exec_id, SOME group)) end;
+    val context = Context (Document_ID.make ());
+    val _ = Synchronized.change state (apfst (K context));
+  in context end;
+
+fun is_running context = context = fst (Synchronized.value state);
+
+
+(* registered execs *)
+
+fun running context exec_id =
+  Synchronized.guarded_access state
+    (fn (current_context, execs) =>
+      let
+        val cont = context = current_context;
+        val execs' =
+          if cont then
+            Inttab.update_new (exec_id, SOME (Future.the_worker_group ())) execs
+              handle Inttab.DUP dup => error ("Duplicate execution " ^ Document_ID.print dup)
+          else execs;
+      in SOME (cont, (current_context, execs')) end);
 
 fun finished exec_id stable =
-  Synchronized.change execs
-    (if stable then Inttab.delete exec_id else Inttab.update (exec_id, NONE));
+  Synchronized.change state
+    (apsnd (fn execs =>
+      if not (Inttab.defined execs exec_id) then
+        error ("Attempt to finish unknown execution: " ^ Document_ID.print exec_id)
+      else if stable then Inttab.delete exec_id execs
+      else Inttab.update (exec_id, NONE) execs));
 
 fun is_stable exec_id =
   not (Par_Exn.is_interrupted (Future.join_results (Goal.peek_futures exec_id))) andalso
-  (case Inttab.lookup (Synchronized.value execs) exec_id of
+  (case Inttab.lookup (snd (Synchronized.value state)) exec_id of
     NONE => true
   | SOME status => is_some status);
 
 fun peek_running exec_id =
-  (case Inttab.lookup (Synchronized.value execs) exec_id of
+  (case Inttab.lookup (snd (Synchronized.value state)) exec_id of
     SOME (SOME group) => SOME group
   | _ => NONE);
 
-fun purge_unstable () =
-  Synchronized.guarded_access execs
-    (fn tab =>
+fun purge_canceled () =
+  Synchronized.guarded_access state
+    (fn (context, execs) =>
       let
-        val unstable = Inttab.fold (fn (exec_id, NONE) => cons exec_id | _ => I) tab [];
-        val tab' = fold Inttab.delete unstable tab;
-      in SOME ((), tab') end);
+        val canceled = Inttab.fold (fn (exec_id, NONE) => cons exec_id | _ => I) execs [];
+        val execs' = fold Inttab.delete canceled execs;
+      in SOME ((), (context, execs')) end);
+
+fun terminate_all () =
+  let
+    val groups =
+      Inttab.fold (fn (_, SOME group) => cons group | _ => I)
+        (snd (Synchronized.value state)) [];
+    val _ = List.app Future.cancel_group groups;
+    val _ = List.app Future.terminate groups;
+  in () end;
 
 end;
 
--- a/src/Pure/PIDE/protocol.ML	Thu Jul 11 22:53:56 2013 +0200
+++ b/src/Pure/PIDE/protocol.ML	Thu Jul 11 23:24:40 2013 +0200
@@ -54,7 +54,7 @@
         val (assign_update, state') = Document.update old_id new_id edits state;
 
         val _ = List.app Future.cancel_group (Goal.reset_futures ());
-        val _ = Exec.purge_unstable ();
+        val _ = Exec.purge_canceled ();
         val _ = Isabelle_Process.reset_tracing ();
 
         val _ =