# HG changeset patch # User wenzelm # Date 1419798885 -3600 # Node ID 59f1591a11cb82accf585f8685430167bc5cd805 # Parent a1d6d6db781be34901b77cc4d486321a0154e7ad eliminated Document.execution frontier (again, see 627fb639a2d9): just run into older execution, potentially stalling worker thread, but without global delay due to long-running tasks (notably sledgehammer); clarified Command.run_process etc.: join running eval, bypass running print; eliminated Command.memo in favour of regular Lazy.lazy; more Lazy.lazy status information; diff -r a1d6d6db781b -r 59f1591a11cb src/Pure/Concurrent/lazy.ML --- a/src/Pure/Concurrent/lazy.ML Sun Dec 28 12:37:03 2014 +0100 +++ b/src/Pure/Concurrent/lazy.ML Sun Dec 28 21:34:45 2014 +0100 @@ -9,10 +9,12 @@ signature LAZY = sig type 'a lazy - val peek: 'a lazy -> 'a Exn.result option - val is_finished: 'a lazy -> bool val lazy: (unit -> 'a) -> 'a lazy val value: 'a -> 'a lazy + val peek: 'a lazy -> 'a Exn.result option + val is_running: 'a lazy -> bool + val is_finished: 'a lazy -> bool + val finished_result: 'a lazy -> 'a Exn.result option val force_result: 'a lazy -> 'a Exn.result val force: 'a lazy -> 'a val map: ('a -> 'b) -> 'a lazy -> 'b lazy @@ -31,19 +33,36 @@ abstype 'a lazy = Lazy of 'a expr Synchronized.var with +fun lazy e = Lazy (Synchronized.var "lazy" (Expr e)); +fun value a = Lazy (Synchronized.var "lazy" (Result (Future.value a))); + fun peek (Lazy var) = (case Synchronized.peek var of Expr _ => NONE | Result res => Future.peek res); -fun is_finished (Lazy var) = + +(* status *) + +fun is_future pred (Lazy var) = (case Synchronized.value var of Expr _ => false - | Result res => - Future.is_finished res andalso not (Exn.is_interrupt_exn (Future.join_result res))); + | Result res => pred res); + +fun is_running x = is_future (not o Future.is_finished) x; + +fun is_finished x = + is_future (fn res => + Future.is_finished res andalso not (Exn.is_interrupt_exn (Future.join_result res))) x; -fun lazy e = Lazy (Synchronized.var "lazy" (Expr e)); -fun value a = Lazy (Synchronized.var "lazy" (Result (Future.value a))); +fun finished_result (Lazy var) = + (case Synchronized.value var of + Expr _ => NONE + | Result res => + if Future.is_finished res then + let val result = Future.join_result res + in if Exn.is_interrupt_exn result then NONE else SOME result end + else NONE); (* force result *) diff -r a1d6d6db781b -r 59f1591a11cb src/Pure/PIDE/command.ML --- a/src/Pure/PIDE/command.ML Sun Dec 28 12:37:03 2014 +0100 +++ b/src/Pure/PIDE/command.ML Sun Dec 28 21:34:45 2014 +0100 @@ -36,55 +36,6 @@ structure Command: COMMAND = struct -(** memo results **) - -datatype 'a expr = - Expr of Document_ID.exec * (unit -> 'a) | - Result of 'a Exn.result; - -abstype 'a memo = Memo of 'a expr Synchronized.var -with - -fun memo exec_id e = Memo (Synchronized.var "Command.memo" (Expr (exec_id, e))); -fun memo_value a = Memo (Synchronized.var "Command.memo" (Result (Exn.Res a))); - -fun memo_result (Memo v) = - (case Synchronized.value v of - Expr (exec_id, _) => error ("Unfinished execution result: " ^ Document_ID.print exec_id) - | Result res => Exn.release res); - -fun memo_finished (Memo v) = - (case Synchronized.value v of Expr _ => false | Result _ => true); - -fun memo_exec execution_id (Memo v) = - Synchronized.timed_access v (K (SOME Time.zeroTime)) - (fn expr => - (case expr of - Expr (exec_id, body) => - uninterruptible (fn restore_attributes => fn () => - let val group = Future.worker_subgroup () in - if Execution.running execution_id exec_id [group] then - let - val res = - (body - |> restore_attributes - |> Future.task_context "Command.memo_exec" group - |> Exn.interruptible_capture) (); - in SOME ((), Result res) end - else SOME ((), expr) - end) () - | Result _ => SOME ((), expr))) - |> (fn NONE => error "Conflicting command execution" | _ => ()); - -fun memo_fork params execution_id (Memo v) = - (case Synchronized.peek v of - Result _ => () - | _ => ignore ((singleton o Future.forks) params (fn () => memo_exec execution_id (Memo v)))); - -end; - - - (** main phases of execution **) (* read *) @@ -184,15 +135,19 @@ val init_eval_state = {failed = false, malformed = false, command = Toplevel.empty, state = Toplevel.toplevel}; -datatype eval = Eval of {exec_id: Document_ID.exec, eval_process: eval_state memo}; +datatype eval = Eval of {exec_id: Document_ID.exec, eval_process: eval_state lazy}; fun eval_exec_id (Eval {exec_id, ...}) = exec_id; val eval_eq = op = o apply2 eval_exec_id; val eval_running = Execution.is_running_exec o eval_exec_id; -fun eval_finished (Eval {eval_process, ...}) = memo_finished eval_process; +fun eval_finished (Eval {eval_process, ...}) = Lazy.is_finished eval_process; -fun eval_result (Eval {eval_process, ...}) = memo_result eval_process; +fun eval_result (Eval {exec_id, eval_process}) = + (case Lazy.finished_result eval_process of + SOME result => Exn.release result + | NONE => error ("Unfinished execution result: " ^ Document_ID.print exec_id)); + val eval_result_state = #state o eval_result; local @@ -279,7 +234,7 @@ Position.setmp_thread_data (Position.id_only (Document_ID.print exec_id)) (fn () => read keywords thy master_dir init blobs span |> Toplevel.exec_id exec_id) (); in eval_state keywords span tr eval_state0 end; - in Eval {exec_id = exec_id, eval_process = memo exec_id process} end; + in Eval {exec_id = exec_id, eval_process = Lazy.lazy process} end; end; @@ -288,7 +243,7 @@ datatype print = Print of {name: string, args: string list, delay: Time.time option, pri: int, persistent: bool, - exec_id: Document_ID.exec, print_process: unit memo}; + exec_id: Document_ID.exec, print_process: unit lazy}; fun print_exec_id (Print {exec_id, ...}) = exec_id; val print_eq = op = o apply2 print_exec_id; @@ -310,7 +265,7 @@ if Exn.is_interrupt exn then reraise exn else List.app (Future.error_message (Toplevel.pos_of tr)) (Runtime.exn_messages_ids exn); -fun print_finished (Print {print_process, ...}) = memo_finished print_process; +fun print_finished (Print {print_process, ...}) = Lazy.is_finished print_process; fun print_persistent (Print {persistent, ...}) = persistent; @@ -337,7 +292,7 @@ in Print { name = name, args = args, delay = delay, pri = pri, persistent = persistent, - exec_id = exec_id, print_process = memo exec_id process} + exec_id = exec_id, print_process = Lazy.lazy process} end; fun bad_print name args exn = @@ -408,32 +363,45 @@ type exec = eval * print list; val no_exec: exec = - (Eval {exec_id = Document_ID.none, eval_process = memo_value init_eval_state}, []); + (Eval {exec_id = Document_ID.none, eval_process = Lazy.value init_eval_state}, []); fun exec_ids NONE = [] | exec_ids (SOME (eval, prints)) = eval_exec_id eval :: map print_exec_id prints; local -fun run_print execution_id (Print {name, delay, pri, print_process, ...}) = - if pri <= 0 orelse (Multithreading.enabled () andalso Options.default_bool "parallel_print") +fun run_process execution_id exec_id process = + let val group = Future.worker_subgroup () in + if Execution.running execution_id exec_id [group] then + ignore (Future.task_context "Command.run_process" group Lazy.force_result process) + else () + end; + +fun run_eval execution_id (Eval {exec_id, eval_process}) = + if Lazy.is_finished eval_process then () + else run_process execution_id exec_id eval_process; + +fun run_print execution_id (Print {name, delay, pri, exec_id, print_process, ...}) = + if Lazy.is_running print_process orelse Lazy.is_finished print_process then () + else if pri <= 0 orelse (Multithreading.enabled () andalso Options.default_bool "parallel_print") then let val group = Future.worker_subgroup (); fun fork () = - memo_fork {name = name, group = SOME group, deps = [], pri = pri, interrupts = true} - execution_id print_process; + ignore ((singleton o Future.forks) + {name = name, group = SOME group, deps = [], pri = pri, interrupts = true} + (fn () => run_process execution_id exec_id print_process)); in (case delay of NONE => fork () | SOME d => ignore (Event_Timer.request (Time.+ (Time.now (), d)) fork)) end - else memo_exec execution_id print_process; + else run_process execution_id exec_id print_process; in -fun exec execution_id (Eval {eval_process, ...}, prints) = - (memo_exec execution_id eval_process; List.app (run_print execution_id) prints); +fun exec execution_id (eval, prints) = + (run_eval execution_id eval; List.app (run_print execution_id) prints); end; diff -r a1d6d6db781b -r 59f1591a11cb src/Pure/PIDE/document.ML --- a/src/Pure/PIDE/document.ML Sun Dec 28 12:37:03 2014 +0100 +++ b/src/Pure/PIDE/document.ML Sun Dec 28 21:34:45 2014 +0100 @@ -37,9 +37,6 @@ - - - (** document structure **) fun err_dup kind id = error ("Duplicate " ^ kind ^ ": " ^ Document_ID.print id); @@ -285,16 +282,17 @@ type execution = {version_id: Document_ID.version, (*static version id*) execution_id: Document_ID.execution, (*dynamic execution id*) - delay_request: unit future, (*pending event timer request*) - frontier: Future.task Symtab.table}; (*node name -> running execution task*) + delay_request: unit future}; (*pending event timer request*) val no_execution: execution = - {version_id = Document_ID.none, execution_id = Document_ID.none, - delay_request = Future.value (), frontier = Symtab.empty}; + {version_id = Document_ID.none, + execution_id = Document_ID.none, + delay_request = Future.value ()}; -fun new_execution version_id delay_request frontier : execution = - {version_id = version_id, execution_id = Execution.start (), - delay_request = delay_request, frontier = frontier}; +fun new_execution version_id delay_request : execution = + {version_id = version_id, + execution_id = Execution.start (), + delay_request = delay_request}; abstype state = State of {versions: version Inttab.table, (*version id -> document content*) @@ -318,11 +316,11 @@ (* document versions *) fun define_version version_id version = - map_state (fn (versions, blobs, commands, {delay_request, frontier, ...}) => + map_state (fn (versions, blobs, commands, {delay_request, ...}) => let val versions' = Inttab.update_new (version_id, version) versions handle Inttab.DUP dup => err_dup "document version" dup; - val execution' = new_execution version_id delay_request frontier; + val execution' = new_execution version_id delay_request; in (versions', blobs, commands, execution') end); fun the_version (State {versions, ...}) version_id = @@ -408,20 +406,18 @@ fun start_execution state = state |> map_state (fn (versions, blobs, commands, execution) => timeit "Document.start_execution" (fn () => let - val {version_id, execution_id, delay_request, frontier} = execution; + val {version_id, execution_id, delay_request} = execution; val delay = seconds (Options.default_real "editor_execution_delay"); val _ = Future.cancel delay_request; val delay_request' = Event_Timer.future (Time.+ (Time.now (), delay)); - val new_tasks = + val _ = nodes_of (the_version state version_id) |> String_Graph.schedule (fn deps => fn (name, node) => if visible_node node orelse pending_result node then let - val more_deps = - Future.task_of delay_request' :: the_list (Symtab.lookup frontier name); fun body () = iterate_entries (fn (_, opt_exec) => fn () => (case opt_exec of @@ -433,15 +429,14 @@ handle exn => if Exn.is_interrupt exn then () (*sic!*) else reraise exn; val future = (singleton o Future.forks) - {name = "theory:" ^ name, group = SOME (Future.new_group NONE), - deps = more_deps @ map #2 (maps #2 deps), - pri = 0, interrupts = false} body; - in [(name, Future.task_of future)] end + {name = "theory:" ^ name, + group = SOME (Future.new_group NONE), + deps = Future.task_of delay_request' :: maps #2 deps, + pri = 0, interrupts = false} body; + in [Future.task_of future] end else []); - val frontier' = (fold o fold) Symtab.update new_tasks frontier; val execution' = - {version_id = version_id, execution_id = execution_id, - delay_request = delay_request', frontier = frontier'}; + {version_id = version_id, execution_id = execution_id, delay_request = delay_request'}; in (versions, blobs, commands, execution') end));