# HG changeset patch # User wenzelm # Date 1377455546 -7200 # Node ID 04df1d236e1c441ed601724c4687aa04c1dd1c2d # Parent 14ab2f821e1d367f09acaba5b6e81ef2ea925950 maintain goal forks as part of global execution; tuned; diff -r 14ab2f821e1d -r 04df1d236e1c src/Pure/Isar/proof.ML --- a/src/Pure/Isar/proof.ML Sun Aug 25 17:17:48 2013 +0200 +++ b/src/Pure/Isar/proof.ML Sun Aug 25 20:32:26 2013 +0200 @@ -1170,7 +1170,7 @@ val pos = Position.thread_data (); val props = Markup.command_timing :: (Markup.nameN, "by") :: Position.properties_of pos; in - Goal.fork_params {name = "Proof.future_terminal_proof", pos = pos, pri = ~1} + Execution.fork {name = "Proof.future_terminal_proof", pos = pos, pri = ~1} (fn () => ((), Timing.protocol props proof2 state')) end) |> snd |> done else proof1 state; diff -r 14ab2f821e1d -r 04df1d236e1c src/Pure/Isar/toplevel.ML --- a/src/Pure/Isar/toplevel.ML Sun Aug 25 17:17:48 2013 +0200 +++ b/src/Pure/Isar/toplevel.ML Sun Aug 25 20:32:26 2013 +0200 @@ -708,7 +708,7 @@ let val st' = if Goal.future_enabled 1 andalso Keyword.is_diag (name_of tr) then - (Goal.fork_params + (Execution.fork {name = "Toplevel.diag", pos = pos_of tr, pri = priority (timing_estimate true (Thy_Syntax.atom tr))} (fn () => command tr st); st) @@ -736,7 +736,7 @@ val future_proof = Proof.future_proof (fn state => - Goal.fork_params + Execution.fork {name = "Toplevel.future_proof", pos = pos_of head_tr, pri = priority estimate} (fn () => let diff -r 14ab2f821e1d -r 04df1d236e1c src/Pure/PIDE/command.ML --- a/src/Pure/PIDE/command.ML Sun Aug 25 17:17:48 2013 +0200 +++ b/src/Pure/PIDE/command.ML Sun Aug 25 20:32:26 2013 +0200 @@ -131,7 +131,7 @@ fun run int tr st = if Goal.future_enabled 1 andalso Keyword.is_diag (Toplevel.name_of tr) then - (Goal.fork_params {name = "Toplevel.diag", pos = Toplevel.pos_of tr, pri = ~1} + (Execution.fork {name = "Toplevel.diag", pos = Toplevel.pos_of tr, pri = ~1} (fn () => Toplevel.command_exception int tr st); ([], SOME st)) else Toplevel.command_errors int tr st; diff -r 14ab2f821e1d -r 04df1d236e1c src/Pure/PIDE/execution.ML --- a/src/Pure/PIDE/execution.ML Sun Aug 25 17:17:48 2013 +0200 +++ b/src/Pure/PIDE/execution.ML Sun Aug 25 20:32:26 2013 +0200 @@ -2,19 +2,23 @@ Author: Makarius Global management of execution. Unique running execution serves as -barrier for further exploration of exec fragments. +barrier for further exploration of forked command execs. *) signature EXECUTION = sig - val reset: unit -> unit val start: unit -> Document_ID.execution val discontinue: unit -> unit val is_running: Document_ID.execution -> bool val is_running_exec: Document_ID.exec -> bool val running: Document_ID.execution -> Document_ID.exec -> bool + val peek: Document_ID.exec -> Future.group list val cancel: Document_ID.exec -> unit val terminate: Document_ID.exec -> unit + val fork: {name: string, pos: Position.T, pri: int} -> (unit -> 'a) -> 'a future + val purge: Document_ID.exec list -> unit + val reset: unit -> Future.group list + val shutdown: unit -> unit end; structure Execution: EXECUTION = @@ -22,50 +26,129 @@ (* global state *) -type state = Document_ID.execution * Future.group Inttab.table; +datatype state = State of + {execution: Document_ID.execution, (*overall document execution*) + execs: Future.group list Inttab.table}; (*command execs with registered forks*) -val init_state: state = (Document_ID.none, Inttab.empty); +fun make_state (execution, execs) = State {execution = execution, execs = execs}; +fun map_state f (State {execution, execs}) = make_state (f (execution, execs)); + +val init_state = make_state (Document_ID.none, Inttab.empty); val state = Synchronized.var "Execution.state" init_state; +fun get_state () = let val State args = Synchronized.value state in args end; +fun change_state f = Synchronized.change state (map_state f); + (* unique running execution *) -fun reset () = Synchronized.change state (K init_state); - fun start () = let val execution_id = Document_ID.make (); - val _ = Synchronized.change state (apfst (K execution_id)); + val _ = change_state (apfst (K execution_id)); in execution_id end; -fun discontinue () = - Synchronized.change state (apfst (K Document_ID.none)); +fun discontinue () = change_state (apfst (K Document_ID.none)); -fun is_running execution_id = execution_id = fst (Synchronized.value state); +fun is_running execution_id = execution_id = #execution (get_state ()); -(* registered execs *) +(* execs *) fun is_running_exec exec_id = - Inttab.defined (snd (Synchronized.value state)) exec_id; + Inttab.defined (#execs (get_state ())) exec_id; fun running execution_id exec_id = Synchronized.guarded_access state - (fn (execution_id', execs) => + (fn State {execution = execution_id', execs} => let val continue = execution_id = execution_id'; val execs' = if continue then - Inttab.update_new (exec_id, Future.the_worker_group ()) execs - handle Inttab.DUP dup => error ("Duplicate execution " ^ Document_ID.print dup) + Inttab.update_new (exec_id, [Future.the_worker_group ()]) execs + handle Inttab.DUP dup => + error ("Execution already registered: " ^ Document_ID.print dup) else execs; - in SOME (continue, (execution_id', execs')) end); + in SOME (continue, make_state (execution_id', execs')) end); + +fun peek exec_id = + Inttab.lookup_list (#execs (get_state ())) exec_id; + +fun cancel exec_id = List.app Future.cancel_group (peek exec_id); +fun terminate exec_id = List.app Future.terminate (peek exec_id); + + +(* fork *) + +local + +fun status task markups = + let val props = Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)] + in Output.status (implode (map (Markup.markup_only o props) markups)) end; + +fun register exec = + change_state (fn (execution, execs) => (execution, Inttab.cons_list exec execs)); + +in -fun peek_list exec_id = - the_list (Inttab.lookup (snd (Synchronized.value state)) exec_id); +fun fork {name, pos, pri} e = + uninterruptible (fn _ => Position.setmp_thread_data pos (fn () => + let + val exec_id = the_default 0 (Position.parse_id pos); + val group = Future.worker_subgroup (); + val _ = register (exec_id, group); -fun cancel exec_id = List.app Future.cancel_group (peek_list exec_id); -fun terminate exec_id = List.app Future.terminate (peek_list exec_id); + val future = + (singleton o Future.forks) + {name = name, group = SOME group, deps = [], pri = pri, interrupts = false} + (fn () => + let + val task = the (Future.worker_task ()); + val _ = status task [Markup.running]; + val result = + Exn.capture (Future.interruptible_task e) () + |> Future.identify_result pos; + val _ = status task [Markup.finished, Markup.joined]; + val _ = + (case result of + Exn.Res _ => () + | Exn.Exn exn => + if exec_id = 0 orelse Exn.is_interrupt exn then () + else + (status task [Markup.failed]; + Output.report (Markup.markup_only Markup.bad); + List.app (Future.error_msg pos) (ML_Compiler.exn_messages_ids exn))); + in Exn.release result end); + val _ = status (Future.task_of future) [Markup.forked]; + in future end)) (); end; + +(* cleanup *) + +fun purge exec_ids = + (change_state o apsnd) (fn execs => + let + val execs' = fold Inttab.delete_safe exec_ids execs; + val () = + (execs', ()) |-> Inttab.fold (fn (exec_id, groups) => fn () => + if Inttab.defined execs' exec_id then () + else groups |> List.app (fn group => + if Task_Queue.is_canceled group then () + else error ("Attempt to purge valid execution: " ^ Document_ID.print exec_id))); + in execs' end); + +fun reset () = + Synchronized.change_result state (fn State {execs, ...} => + let val groups = Inttab.fold (append o #2) execs [] + in (groups, init_state) end); + +fun shutdown () = + (Future.shutdown (); + (case maps Task_Queue.group_status (reset ()) of + [] => () + | exns => raise Par_Exn.make exns)); + +end; + diff -r 14ab2f821e1d -r 04df1d236e1c src/Pure/PIDE/protocol.ML --- a/src/Pure/PIDE/protocol.ML Sun Aug 25 17:17:48 2013 +0200 +++ b/src/Pure/PIDE/protocol.ML Sun Aug 25 20:32:26 2013 +0200 @@ -67,7 +67,7 @@ val (removed, assign_update, state') = Document.update old_id new_id edits state; val _ = List.app Execution.terminate removed; - val _ = Goal.purge_futures removed; + val _ = Execution.purge removed; val _ = List.app Isabelle_Process.reset_tracing removed; val _ = diff -r 14ab2f821e1d -r 04df1d236e1c src/Pure/ROOT.ML --- a/src/Pure/ROOT.ML Sun Aug 25 17:17:48 2013 +0200 +++ b/src/Pure/ROOT.ML Sun Aug 25 20:32:26 2013 +0200 @@ -185,6 +185,9 @@ if ML_System.is_polyml then use "ML/ml_compiler_polyml.ML" else (); if ML_System.name = "polyml-5.5.1" then use "ML/exn_trace_polyml-5.5.1.ML" else (); +(*global execution*) +use "PIDE/document_id.ML"; +use "PIDE/execution.ML"; use "skip_proof.ML"; use "goal.ML"; @@ -257,7 +260,6 @@ (*toplevel transactions*) use "Isar/proof_node.ML"; -use "PIDE/document_id.ML"; use "Isar/toplevel.ML"; (*theory documents*) @@ -267,7 +269,6 @@ use "Isar/outer_syntax.ML"; use "General/graph_display.ML"; use "Thy/present.ML"; -use "PIDE/execution.ML"; use "PIDE/command.ML"; use "PIDE/query_operation.ML"; use "Thy/thy_load.ML"; diff -r 14ab2f821e1d -r 04df1d236e1c src/Pure/System/isabelle_process.ML --- a/src/Pure/System/isabelle_process.ML Sun Aug 25 17:17:48 2013 +0200 +++ b/src/Pure/System/isabelle_process.ML Sun Aug 25 20:32:26 2013 +0200 @@ -173,7 +173,7 @@ | exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true); in if continue then loop channel - else (Future.shutdown (); Goal.reset_futures (); Execution.reset ()) + else (Future.shutdown (); Execution.reset (); ()) end; end; diff -r 14ab2f821e1d -r 04df1d236e1c src/Pure/System/session.ML --- a/src/Pure/System/session.ML Sun Aug 25 17:17:48 2013 +0200 +++ b/src/Pure/System/session.ML Sun Aug 25 20:32:26 2013 +0200 @@ -51,7 +51,7 @@ (* finish *) fun finish () = - (Goal.shutdown_futures (); + (Execution.shutdown (); Thy_Info.finish (); Present.finish (); Outer_Syntax.check_syntax (); diff -r 14ab2f821e1d -r 04df1d236e1c src/Pure/Thy/thy_info.ML --- a/src/Pure/Thy/thy_info.ML Sun Aug 25 17:17:48 2013 +0200 +++ b/src/Pure/Thy/thy_info.ML Sun Aug 25 20:32:26 2013 +0200 @@ -176,7 +176,7 @@ fun join_theory (Result {theory, id, ...}) = Exn.capture Thm.join_theory_proofs theory :: - map Exn.Exn (maps Task_Queue.group_status (Goal.peek_futures id)); + map Exn.Exn (maps Task_Queue.group_status (Execution.peek id)); datatype task = @@ -235,7 +235,7 @@ |> map (fn future => Exn.capture (fn () => result_commit (Future.join future) ()) ()); (* FIXME avoid global reset_futures (!??) *) - val results4 = map Exn.Exn (maps Task_Queue.group_status (Goal.reset_futures ())); + val results4 = map Exn.Exn (maps Task_Queue.group_status (Execution.reset ())); val _ = Par_Exn.release_all (results1 @ results2 @ results3 @ results4); in () end); diff -r 14ab2f821e1d -r 04df1d236e1c src/Pure/goal.ML --- a/src/Pure/goal.ML Sun Aug 25 17:17:48 2013 +0200 +++ b/src/Pure/goal.ML Sun Aug 25 20:32:26 2013 +0200 @@ -24,12 +24,6 @@ val check_finished: Proof.context -> thm -> thm val finish: Proof.context -> thm -> thm val norm_result: thm -> thm - val fork_params: {name: string, pos: Position.T, pri: int} -> (unit -> 'a) -> 'a future - val fork: int -> (unit -> 'a) -> 'a future - val peek_futures: int -> Future.group list - val purge_futures: int list -> unit - val reset_futures: unit -> Future.group list - val shutdown_futures: unit -> unit val skip_proofs_enabled: unit -> bool val future_enabled: int -> bool val future_enabled_timing: Time.time -> bool @@ -111,86 +105,6 @@ #> Drule.zero_var_indexes; -(* forked proofs *) - -local - -val forked_proofs = - Synchronized.var "forked_proofs" - (Inttab.empty: Future.group list Inttab.table); - -fun register_forked id future = - Synchronized.change forked_proofs - (Inttab.cons_list (id, Task_Queue.group_of_task (Future.task_of future))); - -fun status task markups = - let val props = Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)] - in Output.status (implode (map (Markup.markup_only o props) markups)) end; - -in - -fun fork_params {name, pos, pri} e = - uninterruptible (fn _ => Position.setmp_thread_data pos (fn () => - let - val id = the_default 0 (Position.parse_id pos); - - val future = - (singleton o Future.forks) - {name = name, group = NONE, deps = [], pri = pri, interrupts = false} - (fn () => - let - val task = the (Future.worker_task ()); - val _ = status task [Markup.running]; - val result = - Exn.capture (Future.interruptible_task e) () - |> Future.identify_result pos; - val _ = status task [Markup.finished, Markup.joined]; - val _ = - (case result of - Exn.Res _ => () - | Exn.Exn exn => - if id = 0 orelse Exn.is_interrupt exn then () - else - (status task [Markup.failed]; - Output.report (Markup.markup_only Markup.bad); - List.app (Future.error_msg pos) (ML_Compiler.exn_messages_ids exn))); - in Exn.release result end); - val _ = status (Future.task_of future) [Markup.forked]; - val _ = register_forked id future; - in future end)) (); - -fun fork pri e = - fork_params {name = "Goal.fork", pos = Position.thread_data (), pri = pri} e; - -fun peek_futures id = - Inttab.lookup_list (Synchronized.value forked_proofs) id; - -fun purge_futures ids = - Synchronized.change forked_proofs (fn tab => - let - val tab' = fold Inttab.delete_safe ids tab; - val () = - Inttab.fold (fn (id, groups) => fn () => - if Inttab.defined tab' id then () - else groups |> List.app (fn group => - if Task_Queue.is_canceled group then () - else raise Fail ("Attempt to purge valid execution " ^ string_of_int id))) tab (); - in tab' end); - -fun reset_futures () = - Synchronized.change_result forked_proofs (fn tab => - let val groups = Inttab.fold (fold cons o #2) tab [] - in (groups, Inttab.empty) end); - -fun shutdown_futures () = - (Future.shutdown (); - (case maps Task_Queue.group_status (reset_futures ()) of - [] => () - | exns => raise Par_Exn.make exns)); - -end; - - (* scheduling parameters *) fun skip_proofs_enabled () = @@ -307,9 +221,11 @@ else err ("Proved a different theorem: " ^ string_of_term (Thm.prop_of res)) end); val res = - if immediate orelse schematic orelse not future orelse skip - then result () - else future_result ctxt' (fork pri result) (Thm.term_of stmt); + if immediate orelse schematic orelse not future orelse skip then result () + else + future_result ctxt' + (Execution.fork {name = "Goal.prove", pos = Position.thread_data (), pri = pri} result) + (Thm.term_of stmt); in Conjunction.elim_balanced (length props) res |> map (Assumption.export false ctxt' ctxt)