eliminated Document.execution frontier (again, see 627fb639a2d9): just run into older execution, potentially stalling worker thread, but without global delay due to long-running tasks (notably sledgehammer);
authorwenzelm
Sun, 28 Dec 2014 21:34:45 +0100
changeset 59193 59f1591a11cb
parent 59192 a1d6d6db781b
child 59194 b51489b75bb9
eliminated Document.execution frontier (again, see 627fb639a2d9): just run into older execution, potentially stalling worker thread, but without global delay due to long-running tasks (notably sledgehammer); clarified Command.run_process etc.: join running eval, bypass running print; eliminated Command.memo in favour of regular Lazy.lazy; more Lazy.lazy status information;
src/Pure/Concurrent/lazy.ML
src/Pure/PIDE/command.ML
src/Pure/PIDE/document.ML
--- a/src/Pure/Concurrent/lazy.ML	Sun Dec 28 12:37:03 2014 +0100
+++ b/src/Pure/Concurrent/lazy.ML	Sun Dec 28 21:34:45 2014 +0100
@@ -9,10 +9,12 @@
 signature LAZY =
 sig
   type 'a lazy
-  val peek: 'a lazy -> 'a Exn.result option
-  val is_finished: 'a lazy -> bool
   val lazy: (unit -> 'a) -> 'a lazy
   val value: 'a -> 'a lazy
+  val peek: 'a lazy -> 'a Exn.result option
+  val is_running: 'a lazy -> bool
+  val is_finished: 'a lazy -> bool
+  val finished_result: 'a lazy -> 'a Exn.result option
   val force_result: 'a lazy -> 'a Exn.result
   val force: 'a lazy -> 'a
   val map: ('a -> 'b) -> 'a lazy -> 'b lazy
@@ -31,19 +33,36 @@
 abstype 'a lazy = Lazy of 'a expr Synchronized.var
 with
 
+fun lazy e = Lazy (Synchronized.var "lazy" (Expr e));
+fun value a = Lazy (Synchronized.var "lazy" (Result (Future.value a)));
+
 fun peek (Lazy var) =
   (case Synchronized.peek var of
     Expr _ => NONE
   | Result res => Future.peek res);
 
-fun is_finished (Lazy var) =
+
+(* status *)
+
+fun is_future pred (Lazy var) =
   (case Synchronized.value var of
     Expr _ => false
-  | Result res =>
-      Future.is_finished res andalso not (Exn.is_interrupt_exn (Future.join_result res)));
+  | Result res => pred res);
+
+fun is_running x = is_future (not o Future.is_finished) x;
+
+fun is_finished x =
+  is_future (fn res =>
+    Future.is_finished res andalso not (Exn.is_interrupt_exn (Future.join_result res))) x;
 
-fun lazy e = Lazy (Synchronized.var "lazy" (Expr e));
-fun value a = Lazy (Synchronized.var "lazy" (Result (Future.value a)));
+fun finished_result (Lazy var) =
+  (case Synchronized.value var of
+    Expr _ => NONE
+  | Result res =>
+      if Future.is_finished res then
+        let val result = Future.join_result res
+        in if Exn.is_interrupt_exn result then NONE else SOME result end
+      else NONE);
 
 
 (* force result *)
--- a/src/Pure/PIDE/command.ML	Sun Dec 28 12:37:03 2014 +0100
+++ b/src/Pure/PIDE/command.ML	Sun Dec 28 21:34:45 2014 +0100
@@ -36,55 +36,6 @@
 structure Command: COMMAND =
 struct
 
-(** memo results **)
-
-datatype 'a expr =
-  Expr of Document_ID.exec * (unit -> 'a) |
-  Result of 'a Exn.result;
-
-abstype 'a memo = Memo of 'a expr Synchronized.var
-with
-
-fun memo exec_id e = Memo (Synchronized.var "Command.memo" (Expr (exec_id, e)));
-fun memo_value a = Memo (Synchronized.var "Command.memo" (Result (Exn.Res a)));
-
-fun memo_result (Memo v) =
-  (case Synchronized.value v of
-    Expr (exec_id, _) => error ("Unfinished execution result: " ^ Document_ID.print exec_id)
-  | Result res => Exn.release res);
-
-fun memo_finished (Memo v) =
-  (case Synchronized.value v of Expr _ => false | Result _ => true);
-
-fun memo_exec execution_id (Memo v) =
-  Synchronized.timed_access v (K (SOME Time.zeroTime))
-    (fn expr =>
-      (case expr of
-        Expr (exec_id, body) =>
-          uninterruptible (fn restore_attributes => fn () =>
-            let val group = Future.worker_subgroup () in
-              if Execution.running execution_id exec_id [group] then
-                let
-                  val res =
-                    (body
-                      |> restore_attributes
-                      |> Future.task_context "Command.memo_exec" group
-                      |> Exn.interruptible_capture) ();
-                in SOME ((), Result res) end
-              else SOME ((), expr)
-            end) ()
-      | Result _ => SOME ((), expr)))
-  |> (fn NONE => error "Conflicting command execution" | _ => ());
-
-fun memo_fork params execution_id (Memo v) =
-  (case Synchronized.peek v of
-    Result _ => ()
-  | _ => ignore ((singleton o Future.forks) params (fn () => memo_exec execution_id (Memo v))));
-
-end;
-
-
-
 (** main phases of execution **)
 
 (* read *)
@@ -184,15 +135,19 @@
 val init_eval_state =
   {failed = false, malformed = false, command = Toplevel.empty, state = Toplevel.toplevel};
 
-datatype eval = Eval of {exec_id: Document_ID.exec, eval_process: eval_state memo};
+datatype eval = Eval of {exec_id: Document_ID.exec, eval_process: eval_state lazy};
 
 fun eval_exec_id (Eval {exec_id, ...}) = exec_id;
 val eval_eq = op = o apply2 eval_exec_id;
 
 val eval_running = Execution.is_running_exec o eval_exec_id;
-fun eval_finished (Eval {eval_process, ...}) = memo_finished eval_process;
+fun eval_finished (Eval {eval_process, ...}) = Lazy.is_finished eval_process;
 
-fun eval_result (Eval {eval_process, ...}) = memo_result eval_process;
+fun eval_result (Eval {exec_id, eval_process}) =
+  (case Lazy.finished_result eval_process of
+    SOME result => Exn.release result
+  | NONE => error ("Unfinished execution result: " ^ Document_ID.print exec_id));
+
 val eval_result_state = #state o eval_result;
 
 local
@@ -279,7 +234,7 @@
           Position.setmp_thread_data (Position.id_only (Document_ID.print exec_id))
             (fn () => read keywords thy master_dir init blobs span |> Toplevel.exec_id exec_id) ();
       in eval_state keywords span tr eval_state0 end;
-  in Eval {exec_id = exec_id, eval_process = memo exec_id process} end;
+  in Eval {exec_id = exec_id, eval_process = Lazy.lazy process} end;
 
 end;
 
@@ -288,7 +243,7 @@
 
 datatype print = Print of
  {name: string, args: string list, delay: Time.time option, pri: int, persistent: bool,
-  exec_id: Document_ID.exec, print_process: unit memo};
+  exec_id: Document_ID.exec, print_process: unit lazy};
 
 fun print_exec_id (Print {exec_id, ...}) = exec_id;
 val print_eq = op = o apply2 print_exec_id;
@@ -310,7 +265,7 @@
       if Exn.is_interrupt exn then reraise exn
       else List.app (Future.error_message (Toplevel.pos_of tr)) (Runtime.exn_messages_ids exn);
 
-fun print_finished (Print {print_process, ...}) = memo_finished print_process;
+fun print_finished (Print {print_process, ...}) = Lazy.is_finished print_process;
 
 fun print_persistent (Print {persistent, ...}) = persistent;
 
@@ -337,7 +292,7 @@
       in
         Print {
           name = name, args = args, delay = delay, pri = pri, persistent = persistent,
-          exec_id = exec_id, print_process = memo exec_id process}
+          exec_id = exec_id, print_process = Lazy.lazy process}
       end;
 
     fun bad_print name args exn =
@@ -408,32 +363,45 @@
 
 type exec = eval * print list;
 val no_exec: exec =
-  (Eval {exec_id = Document_ID.none, eval_process = memo_value init_eval_state}, []);
+  (Eval {exec_id = Document_ID.none, eval_process = Lazy.value init_eval_state}, []);
 
 fun exec_ids NONE = []
   | exec_ids (SOME (eval, prints)) = eval_exec_id eval :: map print_exec_id prints;
 
 local
 
-fun run_print execution_id (Print {name, delay, pri, print_process, ...}) =
-  if pri <= 0 orelse (Multithreading.enabled () andalso Options.default_bool "parallel_print")
+fun run_process execution_id exec_id process =
+  let val group = Future.worker_subgroup () in
+    if Execution.running execution_id exec_id [group] then
+      ignore (Future.task_context "Command.run_process" group Lazy.force_result process)
+    else ()
+  end;
+
+fun run_eval execution_id (Eval {exec_id, eval_process}) =
+  if Lazy.is_finished eval_process then ()
+  else run_process execution_id exec_id eval_process;
+
+fun run_print execution_id (Print {name, delay, pri, exec_id, print_process, ...}) =
+  if Lazy.is_running print_process orelse Lazy.is_finished print_process then ()
+  else if pri <= 0 orelse (Multithreading.enabled () andalso Options.default_bool "parallel_print")
   then
     let
       val group = Future.worker_subgroup ();
       fun fork () =
-        memo_fork {name = name, group = SOME group, deps = [], pri = pri, interrupts = true}
-          execution_id print_process;
+        ignore ((singleton o Future.forks)
+          {name = name, group = SOME group, deps = [], pri = pri, interrupts = true}
+          (fn () => run_process execution_id exec_id print_process));
     in
       (case delay of
         NONE => fork ()
       | SOME d => ignore (Event_Timer.request (Time.+ (Time.now (), d)) fork))
     end
-  else memo_exec execution_id print_process;
+  else run_process execution_id exec_id print_process;
 
 in
 
-fun exec execution_id (Eval {eval_process, ...}, prints) =
-  (memo_exec execution_id eval_process; List.app (run_print execution_id) prints);
+fun exec execution_id (eval, prints) =
+  (run_eval execution_id eval; List.app (run_print execution_id) prints);
 
 end;
 
--- a/src/Pure/PIDE/document.ML	Sun Dec 28 12:37:03 2014 +0100
+++ b/src/Pure/PIDE/document.ML	Sun Dec 28 21:34:45 2014 +0100
@@ -37,9 +37,6 @@
 
 
 
-
-
-
 (** document structure **)
 
 fun err_dup kind id = error ("Duplicate " ^ kind ^ ": " ^ Document_ID.print id);
@@ -285,16 +282,17 @@
 type execution =
  {version_id: Document_ID.version,  (*static version id*)
   execution_id: Document_ID.execution,  (*dynamic execution id*)
-  delay_request: unit future,  (*pending event timer request*)
-  frontier: Future.task Symtab.table};  (*node name -> running execution task*)
+  delay_request: unit future};  (*pending event timer request*)
 
 val no_execution: execution =
-  {version_id = Document_ID.none, execution_id = Document_ID.none,
-   delay_request = Future.value (), frontier = Symtab.empty};
+  {version_id = Document_ID.none,
+   execution_id = Document_ID.none,
+   delay_request = Future.value ()};
 
-fun new_execution version_id delay_request frontier : execution =
-  {version_id = version_id, execution_id = Execution.start (),
-   delay_request = delay_request, frontier = frontier};
+fun new_execution version_id delay_request : execution =
+  {version_id = version_id,
+   execution_id = Execution.start (),
+   delay_request = delay_request};
 
 abstype state = State of
  {versions: version Inttab.table,  (*version id -> document content*)
@@ -318,11 +316,11 @@
 (* document versions *)
 
 fun define_version version_id version =
-  map_state (fn (versions, blobs, commands, {delay_request, frontier, ...}) =>
+  map_state (fn (versions, blobs, commands, {delay_request, ...}) =>
     let
       val versions' = Inttab.update_new (version_id, version) versions
         handle Inttab.DUP dup => err_dup "document version" dup;
-      val execution' = new_execution version_id delay_request frontier;
+      val execution' = new_execution version_id delay_request;
     in (versions', blobs, commands, execution') end);
 
 fun the_version (State {versions, ...}) version_id =
@@ -408,20 +406,18 @@
 fun start_execution state = state |> map_state (fn (versions, blobs, commands, execution) =>
   timeit "Document.start_execution" (fn () =>
     let
-      val {version_id, execution_id, delay_request, frontier} = execution;
+      val {version_id, execution_id, delay_request} = execution;
 
       val delay = seconds (Options.default_real "editor_execution_delay");
 
       val _ = Future.cancel delay_request;
       val delay_request' = Event_Timer.future (Time.+ (Time.now (), delay));
 
-      val new_tasks =
+      val _ =
         nodes_of (the_version state version_id) |> String_Graph.schedule
           (fn deps => fn (name, node) =>
             if visible_node node orelse pending_result node then
               let
-                val more_deps =
-                  Future.task_of delay_request' :: the_list (Symtab.lookup frontier name);
                 fun body () =
                   iterate_entries (fn (_, opt_exec) => fn () =>
                     (case opt_exec of
@@ -433,15 +429,14 @@
                   handle exn => if Exn.is_interrupt exn then () (*sic!*) else reraise exn;
                 val future =
                   (singleton o Future.forks)
-                    {name = "theory:" ^ name, group = SOME (Future.new_group NONE),
-                      deps = more_deps @ map #2 (maps #2 deps),
-                      pri = 0, interrupts = false} body;
-              in [(name, Future.task_of future)] end
+                   {name = "theory:" ^ name,
+                    group = SOME (Future.new_group NONE),
+                    deps = Future.task_of delay_request' :: maps #2 deps,
+                    pri = 0, interrupts = false} body;
+              in [Future.task_of future] end
             else []);
-      val frontier' = (fold o fold) Symtab.update new_tasks frontier;
       val execution' =
-        {version_id = version_id, execution_id = execution_id,
-         delay_request = delay_request', frontier = frontier'};
+        {version_id = version_id, execution_id = execution_id, delay_request = delay_request'};
     in (versions, blobs, commands, execution') end));