# HG changeset patch # User wenzelm # Date 1373577880 -7200 # Node ID ff2f0818aebcbfa7eb7f20a15953aa44491b3542 # Parent a44e9a1d5d8bf51fb566a2c0015d3677eead1593 more explicit type Exec.context; eliminated obsolete execution group -- NB: cancelation happens individually for registered execs; diff -r a44e9a1d5d8b -r ff2f0818aebc src/Pure/PIDE/command.ML --- a/src/Pure/PIDE/command.ML Thu Jul 11 22:53:56 2013 +0200 +++ b/src/Pure/PIDE/command.ML Thu Jul 11 23:24:40 2013 +0200 @@ -20,7 +20,7 @@ type exec = eval * print list val no_exec: exec val exec_ids: exec option -> Document_ID.exec list - val exec: exec -> unit + val exec: Exec.context -> exec -> unit end; structure Command: COMMAND = @@ -43,32 +43,31 @@ Result res => Exn.release res | Expr (exec_id, _) => error ("Unfinished execution result: " ^ Document_ID.print exec_id)); -fun memo_exec (Memo v) = +fun memo_exec context (Memo v) = (case Synchronized.value v of Result res => res - | Expr (exec_id, _) => + | Expr _ => Synchronized.timed_access v (fn _ => SOME Time.zeroTime) (fn Result res => SOME (res, Result res) - | Expr (exec_id, e) => + | expr as Expr (exec_id, e) => uninterruptible (fn restore_attributes => fn () => - let - val _ = Exec.running exec_id; - val res = Exn.capture (restore_attributes e) (); - val _ = Exec.finished exec_id (not (Exn.is_interrupt_exn res)); - in SOME (res, Result res) end) ()) - |> (fn NONE => error ("Concurrent execution attempt: " ^ Document_ID.print exec_id) - | SOME res => res)) - |> Exn.release; + if Exec.running context exec_id then + let + val res = Exn.capture (restore_attributes e) (); + val _ = Exec.finished exec_id (not (Exn.is_interrupt_exn res)); + in SOME (res, Result res) end + else SOME (Exn.interrupt_exn, expr)) ()) + |> (fn NONE => error "Concurrent execution attempt" | SOME res => res)) + |> Exn.release |> ignore; -fun memo_fork params (Memo v) = +fun memo_fork params context (Memo v) = (case Synchronized.value v of Result _ => () - | _ => ignore ((singleton o Future.forks) params (fn () => memo_exec (Memo v)))); + | _ => ignore ((singleton o Future.forks) params (fn () => memo_exec context (Memo v)))); end; - (** main phases of execution **) (* read *) @@ -277,15 +276,15 @@ local -fun run_print (Print {name, pri, print_process, ...}) = +fun run_print context (Print {name, pri, print_process, ...}) = (if Multithreading.enabled () then memo_fork {name = name, group = NONE, deps = [], pri = pri, interrupts = true} - else memo_exec) print_process; + else memo_exec) context print_process; in -fun exec (Eval {eval_process, ...}, prints) = - (memo_exec eval_process; List.app run_print prints); +fun exec context (Eval {eval_process, ...}, prints) = + (memo_exec context eval_process; List.app (run_print context) prints); end; diff -r a44e9a1d5d8b -r ff2f0818aebc src/Pure/PIDE/document.ML --- a/src/Pure/PIDE/document.ML Thu Jul 11 22:53:56 2013 +0200 +++ b/src/Pure/PIDE/document.ML Thu Jul 11 23:24:40 2013 +0200 @@ -202,27 +202,14 @@ end; -(* associated execution *) - -type execution = - {version_id: Document_ID.version, - group: Future.group, - continue: bool Synchronized.var}; - -val no_execution = - {version_id = Document_ID.none, - group = Future.new_group NONE, - continue = Synchronized.var "continue" false}; - -fun make_execution version_id = - {version_id = version_id, - group = Future.new_group NONE, - continue = Synchronized.var "continue" true}; - - (** main state -- document structure and execution process **) +type execution = {version_id: Document_ID.version, context: Exec.context}; + +val no_execution = {version_id = Document_ID.none, context = Exec.no_context}; +fun make_execution version_id = {version_id = version_id, context = Exec.fresh_context ()}; + abstype state = State of {versions: version Inttab.table, (*version id -> document content*) commands: (string * Token.T list lazy) Inttab.table, (*command id -> named command span*) @@ -307,34 +294,35 @@ in (versions', commands', execution) end); - -(** document execution **) +(* document execution *) -val discontinue_execution = - execution_of #> (fn {continue, ...} => Synchronized.change continue (K false)); +fun discontinue_execution state = + Exec.drop_context (#context (execution_of state)); -val cancel_execution = execution_of #> (fn {group, ...} => Future.cancel_group group); +fun cancel_execution state = + (Exec.drop_context (#context (execution_of state)); Exec.terminate_all ()); fun start_execution state = let - val {version_id, group, continue} = execution_of state; - fun running () = Synchronized.guarded_access continue (fn x => SOME (x, x)); + val {version_id, context} = execution_of state; val _ = - if not (running ()) orelse Task_Queue.is_canceled group then [] - else + if Exec.is_running context then nodes_of (the_version state version_id) |> String_Graph.schedule (fn deps => fn (name, node) => if not (visible_node node) andalso finished_theory node then Future.value () else (singleton o Future.forks) - {name = "theory:" ^ name, group = SOME (Future.new_group (SOME group)), + {name = "theory:" ^ name, group = SOME (Future.new_group NONE), deps = map (Future.task_of o #2) deps, pri = ~2, interrupts = false} (fn () => iterate_entries (fn (_, opt_exec) => fn () => (case opt_exec of - SOME exec => if running () then SOME (Command.exec exec) else NONE - | NONE => NONE)) node ())); + SOME exec => + if Exec.is_running context then SOME (Command.exec context exec) + else NONE + | NONE => NONE)) node ())) + else []; in () end; diff -r a44e9a1d5d8b -r ff2f0818aebc src/Pure/PIDE/exec.ML --- a/src/Pure/PIDE/exec.ML Thu Jul 11 22:53:56 2013 +0200 +++ b/src/Pure/PIDE/exec.ML Thu Jul 11 23:24:40 2013 +0200 @@ -6,49 +6,95 @@ signature EXEC = sig - val running: Document_ID.exec -> unit + type context + val no_context: context + val drop_context: context -> unit + val fresh_context: unit -> context + val is_running: context -> bool + val running: context -> Document_ID.exec -> bool val finished: Document_ID.exec -> bool -> unit val is_stable: Document_ID.exec -> bool val peek_running: Document_ID.exec -> Future.group option - val purge_unstable: unit -> unit + val purge_canceled: unit -> unit + val terminate_all: unit -> unit end; structure Exec: EXEC = struct -type status = Future.group option; (*SOME group: running, NONE: canceled/unstable*) -val execs = Synchronized.var "Exec.execs" (Inttab.empty: status Inttab.table); +(* global state *) + +datatype context = Context of Document_ID.generic; +val no_context = Context Document_ID.none; + +type status = Future.group option; (*SOME group: running, NONE: canceled*) +val state = Synchronized.var "Exec.state" (no_context, Inttab.empty: status Inttab.table); -fun running exec_id = + +(* unique execution context *) + +fun drop_context context = + Synchronized.change state + (apfst (fn context' => if context = context' then no_context else context')); + +fun fresh_context () = let - val group = - (case Future.worker_group () of - SOME group => group - | NONE => error ("Missing worker thread context for execution " ^ Document_ID.print exec_id)); - in Synchronized.change execs (Inttab.update_new (exec_id, SOME group)) end; + val context = Context (Document_ID.make ()); + val _ = Synchronized.change state (apfst (K context)); + in context end; + +fun is_running context = context = fst (Synchronized.value state); + + +(* registered execs *) + +fun running context exec_id = + Synchronized.guarded_access state + (fn (current_context, execs) => + let + val cont = context = current_context; + val execs' = + if cont then + Inttab.update_new (exec_id, SOME (Future.the_worker_group ())) execs + handle Inttab.DUP dup => error ("Duplicate execution " ^ Document_ID.print dup) + else execs; + in SOME (cont, (current_context, execs')) end); fun finished exec_id stable = - Synchronized.change execs - (if stable then Inttab.delete exec_id else Inttab.update (exec_id, NONE)); + Synchronized.change state + (apsnd (fn execs => + if not (Inttab.defined execs exec_id) then + error ("Attempt to finish unknown execution: " ^ Document_ID.print exec_id) + else if stable then Inttab.delete exec_id execs + else Inttab.update (exec_id, NONE) execs)); fun is_stable exec_id = not (Par_Exn.is_interrupted (Future.join_results (Goal.peek_futures exec_id))) andalso - (case Inttab.lookup (Synchronized.value execs) exec_id of + (case Inttab.lookup (snd (Synchronized.value state)) exec_id of NONE => true | SOME status => is_some status); fun peek_running exec_id = - (case Inttab.lookup (Synchronized.value execs) exec_id of + (case Inttab.lookup (snd (Synchronized.value state)) exec_id of SOME (SOME group) => SOME group | _ => NONE); -fun purge_unstable () = - Synchronized.guarded_access execs - (fn tab => +fun purge_canceled () = + Synchronized.guarded_access state + (fn (context, execs) => let - val unstable = Inttab.fold (fn (exec_id, NONE) => cons exec_id | _ => I) tab []; - val tab' = fold Inttab.delete unstable tab; - in SOME ((), tab') end); + val canceled = Inttab.fold (fn (exec_id, NONE) => cons exec_id | _ => I) execs []; + val execs' = fold Inttab.delete canceled execs; + in SOME ((), (context, execs')) end); + +fun terminate_all () = + let + val groups = + Inttab.fold (fn (_, SOME group) => cons group | _ => I) + (snd (Synchronized.value state)) []; + val _ = List.app Future.cancel_group groups; + val _ = List.app Future.terminate groups; + in () end; end; diff -r a44e9a1d5d8b -r ff2f0818aebc src/Pure/PIDE/protocol.ML --- a/src/Pure/PIDE/protocol.ML Thu Jul 11 22:53:56 2013 +0200 +++ b/src/Pure/PIDE/protocol.ML Thu Jul 11 23:24:40 2013 +0200 @@ -54,7 +54,7 @@ val (assign_update, state') = Document.update old_id new_id edits state; val _ = List.app Future.cancel_group (Goal.reset_futures ()); - val _ = Exec.purge_unstable (); + val _ = Exec.purge_canceled (); val _ = Isabelle_Process.reset_tracing (); val _ =