fork parallel prints early in execution: avoid degradation of priority due to main eval task;
authorwenzelm
Sun Jun 03 22:02:20 2018 +0200 (13 months ago)
changeset 68366cd387c55e085
parent 68365 f9379279f98c
child 68367 2549d7d4718a
fork parallel prints early in execution: avoid degradation of priority due to main eval task;
src/Pure/PIDE/command.ML
src/Pure/PIDE/document.ML
     1.1 --- a/src/Pure/PIDE/command.ML	Sun Jun 03 20:37:16 2018 +0200
     1.2 +++ b/src/Pure/PIDE/command.ML	Sun Jun 03 22:02:20 2018 +0200
     1.3 @@ -26,6 +26,7 @@
     1.4    val print0: {pri: int, print_fn: print_fn} -> eval -> print
     1.5    val print: bool -> (string * string list) list -> Keyword.keywords -> string ->
     1.6      eval -> print list -> print list option
     1.7 +  val parallel_print: print -> bool
     1.8    type print_function =
     1.9      {keywords: Keyword.keywords, command_name: string, args: string list, exec_id: Document_ID.exec} ->
    1.10        {delay: Time.time option, pri: int, persistent: bool, strict: bool, print_fn: print_fn} option
    1.11 @@ -36,6 +37,7 @@
    1.12    val no_exec: exec
    1.13    val exec_ids: exec option -> Document_ID.exec list
    1.14    val exec: Document_ID.execution -> exec -> unit
    1.15 +  val exec_parallel_prints: Document_ID.execution -> Future.task list -> exec -> exec option
    1.16  end;
    1.17  
    1.18  structure Command: COMMAND =
    1.19 @@ -386,6 +388,9 @@
    1.20      if eq_list print_eq (old_prints, new_prints) then NONE else SOME new_prints
    1.21    end;
    1.22  
    1.23 +fun parallel_print (Print {pri, ...}) =
    1.24 +  pri <= 0 orelse (Future.enabled () andalso Options.default_bool "parallel_print");
    1.25 +
    1.26  fun print_function name f =
    1.27    Synchronized.change print_functions (fn funs =>
    1.28     (if name = "" then error "Unnamed print function" else ();
    1.29 @@ -448,23 +453,24 @@
    1.30    if Lazy.is_finished eval_process then ()
    1.31    else run_process execution_id exec_id eval_process;
    1.32  
    1.33 -fun run_print execution_id (Print {name, delay, pri, exec_id, print_process, ...}) =
    1.34 +fun fork_print execution_id deps (Print {name, delay, pri, exec_id, print_process, ...}) =
    1.35 +  let
    1.36 +    val group = Future.worker_subgroup ();
    1.37 +    fun fork () =
    1.38 +      ignore ((singleton o Future.forks)
    1.39 +        {name = name, group = SOME group, deps = deps, pri = pri, interrupts = true}
    1.40 +        (fn () =>
    1.41 +          if ignore_process print_process then ()
    1.42 +          else run_process execution_id exec_id print_process));
    1.43 +  in
    1.44 +    (case delay of
    1.45 +      NONE => fork ()
    1.46 +    | SOME d => ignore (Event_Timer.request (Time.now () + d) fork))
    1.47 +  end;
    1.48 +
    1.49 +fun run_print execution_id (print as Print {exec_id, print_process, ...}) =
    1.50    if ignore_process print_process then ()
    1.51 -  else if pri <= 0 orelse (Future.enabled () andalso Options.default_bool "parallel_print")
    1.52 -  then
    1.53 -    let
    1.54 -      val group = Future.worker_subgroup ();
    1.55 -      fun fork () =
    1.56 -        ignore ((singleton o Future.forks)
    1.57 -          {name = name, group = SOME group, deps = [], pri = pri, interrupts = true}
    1.58 -          (fn () =>
    1.59 -            if ignore_process print_process then ()
    1.60 -            else run_process execution_id exec_id print_process));
    1.61 -    in
    1.62 -      (case delay of
    1.63 -        NONE => fork ()
    1.64 -      | SOME d => ignore (Event_Timer.request (Time.now () + d) fork))
    1.65 -    end
    1.66 +  else if parallel_print print then fork_print execution_id [] print
    1.67    else run_process execution_id exec_id print_process;
    1.68  
    1.69  in
    1.70 @@ -472,7 +478,11 @@
    1.71  fun exec execution_id (eval, prints) =
    1.72    (run_eval execution_id eval; List.app (run_print execution_id) prints);
    1.73  
    1.74 +fun exec_parallel_prints execution_id deps (exec as (Eval {eval_process, ...}, prints)) =
    1.75 +  if Lazy.is_finished eval_process
    1.76 +  then (List.app (fork_print execution_id deps) prints; NONE)
    1.77 +  else SOME exec;
    1.78 +
    1.79  end;
    1.80  
    1.81  end;
    1.82 -
     2.1 --- a/src/Pure/PIDE/document.ML	Sun Jun 03 20:37:16 2018 +0200
     2.2 +++ b/src/Pure/PIDE/document.ML	Sun Jun 03 22:02:20 2018 +0200
     2.3 @@ -314,17 +314,20 @@
     2.4  type execution =
     2.5   {version_id: Document_ID.version,  (*static version id*)
     2.6    execution_id: Document_ID.execution,  (*dynamic execution id*)
     2.7 -  delay_request: unit future};  (*pending event timer request*)
     2.8 +  delay_request: unit future,  (*pending event timer request*)
     2.9 +  parallel_prints: Command.exec list};  (*parallel prints for early execution*)
    2.10  
    2.11  val no_execution: execution =
    2.12    {version_id = Document_ID.none,
    2.13     execution_id = Document_ID.none,
    2.14 -   delay_request = Future.value ()};
    2.15 +   delay_request = Future.value (),
    2.16 +   parallel_prints = []};
    2.17  
    2.18 -fun new_execution version_id delay_request : execution =
    2.19 +fun new_execution version_id delay_request parallel_prints : execution =
    2.20    {version_id = version_id,
    2.21     execution_id = Execution.start (),
    2.22 -   delay_request = delay_request};
    2.23 +   delay_request = delay_request,
    2.24 +   parallel_prints = parallel_prints};
    2.25  
    2.26  abstype state = State of
    2.27   {versions: version Inttab.table,  (*version id -> document content*)
    2.28 @@ -347,12 +350,23 @@
    2.29  
    2.30  (* document versions *)
    2.31  
    2.32 -fun define_version version_id version =
    2.33 +fun parallel_prints_node node =
    2.34 +  iterate_entries (fn (_, opt_exec) => fn rev_result =>
    2.35 +    (case opt_exec of
    2.36 +      SOME (eval, prints) =>
    2.37 +        (case filter Command.parallel_print prints of
    2.38 +          [] => SOME rev_result
    2.39 +        | prints' => SOME ((eval, prints') :: rev_result))
    2.40 +    | NONE => NONE)) node [] |> rev;
    2.41 +
    2.42 +fun define_version version_id version assigned_nodes =
    2.43    map_state (fn (versions, blobs, commands, {delay_request, ...}) =>
    2.44      let
    2.45 -      val versions' = Inttab.update_new (version_id, version) versions
    2.46 +      val version' = fold put_node assigned_nodes version;
    2.47 +      val versions' = Inttab.update_new (version_id, version') versions
    2.48          handle Inttab.DUP dup => err_dup "document version" dup;
    2.49 -      val execution' = new_execution version_id delay_request;
    2.50 +      val parallel_prints = maps (parallel_prints_node o #2) assigned_nodes;
    2.51 +      val execution' = new_execution version_id delay_request parallel_prints;
    2.52      in (versions', blobs, commands, execution') end);
    2.53  
    2.54  fun the_version (State {versions, ...}) version_id =
    2.55 @@ -479,12 +493,16 @@
    2.56  fun start_execution state = state |> map_state (fn (versions, blobs, commands, execution) =>
    2.57    timeit "Document.start_execution" (fn () =>
    2.58      let
    2.59 -      val {version_id, execution_id, delay_request} = execution;
    2.60 +      val {version_id, execution_id, delay_request, parallel_prints} = execution;
    2.61  
    2.62        val delay = seconds (Options.default_real "editor_execution_delay");
    2.63  
    2.64        val _ = Future.cancel delay_request;
    2.65        val delay_request' = Event_Timer.future (Time.now () + delay);
    2.66 +      val delay = Future.task_of delay_request';
    2.67 +
    2.68 +      val parallel_prints' = parallel_prints
    2.69 +        |> map_filter (Command.exec_parallel_prints execution_id [delay]);
    2.70  
    2.71        fun finished_import (name, (node, _)) =
    2.72          finished_result node orelse is_some (Thy_Info.lookup_theory name);
    2.73 @@ -516,14 +534,13 @@
    2.74                    (singleton o Future.forks)
    2.75                     {name = "theory:" ^ name,
    2.76                      group = SOME (Future.new_group NONE),
    2.77 -                    deps =
    2.78 -                      Future.task_of delay_request' :: Execution.active_tasks name @
    2.79 -                        maps (the_list o #2 o #2) deps,
    2.80 +                    deps = delay :: Execution.active_tasks name @ maps (the_list o #2 o #2) deps,
    2.81                      pri = 0, interrupts = false} body;
    2.82                in (node, SOME (Future.task_of future)) end
    2.83              else (node, NONE));
    2.84        val execution' =
    2.85 -        {version_id = version_id, execution_id = execution_id, delay_request = delay_request'};
    2.86 +        {version_id = version_id, execution_id = execution_id,
    2.87 +          delay_request = delay_request', parallel_prints = parallel_prints'};
    2.88      in (versions, blobs, commands, execution') end));
    2.89  
    2.90  
    2.91 @@ -838,7 +855,7 @@
    2.92      val assigned_nodes = map_filter #3 updated;
    2.93  
    2.94      val state' = state
    2.95 -      |> define_version new_version_id (fold put_node assigned_nodes new_version);
    2.96 +      |> define_version new_version_id new_version assigned_nodes;
    2.97  
    2.98    in (removed, assign_result, state') end);
    2.99