fork parallel prints early in execution: avoid degradation of priority due to main eval task;
authorwenzelm
Sun, 03 Jun 2018 22:02:20 +0200
changeset 68366 cd387c55e085
parent 68365 f9379279f98c
child 68367 2549d7d4718a
fork parallel prints early in execution: avoid degradation of priority due to main eval task;
src/Pure/PIDE/command.ML
src/Pure/PIDE/document.ML
--- a/src/Pure/PIDE/command.ML	Sun Jun 03 20:37:16 2018 +0200
+++ b/src/Pure/PIDE/command.ML	Sun Jun 03 22:02:20 2018 +0200
@@ -26,6 +26,7 @@
   val print0: {pri: int, print_fn: print_fn} -> eval -> print
   val print: bool -> (string * string list) list -> Keyword.keywords -> string ->
     eval -> print list -> print list option
+  val parallel_print: print -> bool
   type print_function =
     {keywords: Keyword.keywords, command_name: string, args: string list, exec_id: Document_ID.exec} ->
       {delay: Time.time option, pri: int, persistent: bool, strict: bool, print_fn: print_fn} option
@@ -36,6 +37,7 @@
   val no_exec: exec
   val exec_ids: exec option -> Document_ID.exec list
   val exec: Document_ID.execution -> exec -> unit
+  val exec_parallel_prints: Document_ID.execution -> Future.task list -> exec -> exec option
 end;
 
 structure Command: COMMAND =
@@ -386,6 +388,9 @@
     if eq_list print_eq (old_prints, new_prints) then NONE else SOME new_prints
   end;
 
+fun parallel_print (Print {pri, ...}) =
+  pri <= 0 orelse (Future.enabled () andalso Options.default_bool "parallel_print");
+
 fun print_function name f =
   Synchronized.change print_functions (fn funs =>
    (if name = "" then error "Unnamed print function" else ();
@@ -448,23 +453,24 @@
   if Lazy.is_finished eval_process then ()
   else run_process execution_id exec_id eval_process;
 
-fun run_print execution_id (Print {name, delay, pri, exec_id, print_process, ...}) =
+fun fork_print execution_id deps (Print {name, delay, pri, exec_id, print_process, ...}) =
+  let
+    val group = Future.worker_subgroup ();
+    fun fork () =
+      ignore ((singleton o Future.forks)
+        {name = name, group = SOME group, deps = deps, pri = pri, interrupts = true}
+        (fn () =>
+          if ignore_process print_process then ()
+          else run_process execution_id exec_id print_process));
+  in
+    (case delay of
+      NONE => fork ()
+    | SOME d => ignore (Event_Timer.request (Time.now () + d) fork))
+  end;
+
+fun run_print execution_id (print as Print {exec_id, print_process, ...}) =
   if ignore_process print_process then ()
-  else if pri <= 0 orelse (Future.enabled () andalso Options.default_bool "parallel_print")
-  then
-    let
-      val group = Future.worker_subgroup ();
-      fun fork () =
-        ignore ((singleton o Future.forks)
-          {name = name, group = SOME group, deps = [], pri = pri, interrupts = true}
-          (fn () =>
-            if ignore_process print_process then ()
-            else run_process execution_id exec_id print_process));
-    in
-      (case delay of
-        NONE => fork ()
-      | SOME d => ignore (Event_Timer.request (Time.now () + d) fork))
-    end
+  else if parallel_print print then fork_print execution_id [] print
   else run_process execution_id exec_id print_process;
 
 in
@@ -472,7 +478,11 @@
 fun exec execution_id (eval, prints) =
   (run_eval execution_id eval; List.app (run_print execution_id) prints);
 
+fun exec_parallel_prints execution_id deps (exec as (Eval {eval_process, ...}, prints)) =
+  if Lazy.is_finished eval_process
+  then (List.app (fork_print execution_id deps) prints; NONE)
+  else SOME exec;
+
 end;
 
 end;
-
--- a/src/Pure/PIDE/document.ML	Sun Jun 03 20:37:16 2018 +0200
+++ b/src/Pure/PIDE/document.ML	Sun Jun 03 22:02:20 2018 +0200
@@ -314,17 +314,20 @@
 type execution =
  {version_id: Document_ID.version,  (*static version id*)
   execution_id: Document_ID.execution,  (*dynamic execution id*)
-  delay_request: unit future};  (*pending event timer request*)
+  delay_request: unit future,  (*pending event timer request*)
+  parallel_prints: Command.exec list};  (*parallel prints for early execution*)
 
 val no_execution: execution =
   {version_id = Document_ID.none,
    execution_id = Document_ID.none,
-   delay_request = Future.value ()};
+   delay_request = Future.value (),
+   parallel_prints = []};
 
-fun new_execution version_id delay_request : execution =
+fun new_execution version_id delay_request parallel_prints : execution =
   {version_id = version_id,
    execution_id = Execution.start (),
-   delay_request = delay_request};
+   delay_request = delay_request,
+   parallel_prints = parallel_prints};
 
 abstype state = State of
  {versions: version Inttab.table,  (*version id -> document content*)
@@ -347,12 +350,23 @@
 
 (* document versions *)
 
-fun define_version version_id version =
+fun parallel_prints_node node =
+  iterate_entries (fn (_, opt_exec) => fn rev_result =>
+    (case opt_exec of
+      SOME (eval, prints) =>
+        (case filter Command.parallel_print prints of
+          [] => SOME rev_result
+        | prints' => SOME ((eval, prints') :: rev_result))
+    | NONE => NONE)) node [] |> rev;
+
+fun define_version version_id version assigned_nodes =
   map_state (fn (versions, blobs, commands, {delay_request, ...}) =>
     let
-      val versions' = Inttab.update_new (version_id, version) versions
+      val version' = fold put_node assigned_nodes version;
+      val versions' = Inttab.update_new (version_id, version') versions
         handle Inttab.DUP dup => err_dup "document version" dup;
-      val execution' = new_execution version_id delay_request;
+      val parallel_prints = maps (parallel_prints_node o #2) assigned_nodes;
+      val execution' = new_execution version_id delay_request parallel_prints;
     in (versions', blobs, commands, execution') end);
 
 fun the_version (State {versions, ...}) version_id =
@@ -479,12 +493,16 @@
 fun start_execution state = state |> map_state (fn (versions, blobs, commands, execution) =>
   timeit "Document.start_execution" (fn () =>
     let
-      val {version_id, execution_id, delay_request} = execution;
+      val {version_id, execution_id, delay_request, parallel_prints} = execution;
 
       val delay = seconds (Options.default_real "editor_execution_delay");
 
       val _ = Future.cancel delay_request;
       val delay_request' = Event_Timer.future (Time.now () + delay);
+      val delay = Future.task_of delay_request';
+
+      val parallel_prints' = parallel_prints
+        |> map_filter (Command.exec_parallel_prints execution_id [delay]);
 
       fun finished_import (name, (node, _)) =
         finished_result node orelse is_some (Thy_Info.lookup_theory name);
@@ -516,14 +534,13 @@
                   (singleton o Future.forks)
                    {name = "theory:" ^ name,
                     group = SOME (Future.new_group NONE),
-                    deps =
-                      Future.task_of delay_request' :: Execution.active_tasks name @
-                        maps (the_list o #2 o #2) deps,
+                    deps = delay :: Execution.active_tasks name @ maps (the_list o #2 o #2) deps,
                     pri = 0, interrupts = false} body;
               in (node, SOME (Future.task_of future)) end
             else (node, NONE));
       val execution' =
-        {version_id = version_id, execution_id = execution_id, delay_request = delay_request'};
+        {version_id = version_id, execution_id = execution_id,
+          delay_request = delay_request', parallel_prints = parallel_prints'};
     in (versions, blobs, commands, execution') end));
 
 
@@ -838,7 +855,7 @@
     val assigned_nodes = map_filter #3 updated;
 
     val state' = state
-      |> define_version new_version_id (fold put_node assigned_nodes new_version);
+      |> define_version new_version_id new_version assigned_nodes;
 
   in (removed, assign_result, state') end);