# HG changeset patch # User wenzelm # Date 1313762186 -7200 # Node ID 061599cb6eb0fb95757da9b03771ca7de14d56e8 # Parent b8f8488704e203dfc100dfde1b6bbc2d738f5cc0 refined Future.cancel: explicit future allows to join actual cancellation; Document.cancel_execution: join nested future groups as well; diff -r b8f8488704e2 -r 061599cb6eb0 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Fri Aug 19 14:01:20 2011 +0200 +++ b/src/Pure/Concurrent/future.ML Fri Aug 19 15:56:26 2011 +0200 @@ -31,6 +31,9 @@ that lack regular result information, will pick up parallel exceptions from the cumulative group context (as Par_Exn). + * Future task groups may be canceled: present and future group + members will be interrupted eventually. + * Promised "passive" futures are fulfilled by external means. There is no associated evaluation task, but other futures can depend on them via regular join operations. @@ -46,9 +49,6 @@ val peek: 'a future -> 'a Exn.result option val is_finished: 'a future -> bool val get_finished: 'a future -> 'a - val interruptible_task: ('a -> 'b) -> 'a -> 'b - val cancel_group: Task_Queue.group -> unit - val cancel: 'a future -> unit type fork_params = {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int, interrupts: bool} @@ -61,6 +61,9 @@ val value_result: 'a Exn.result -> 'a future val value: 'a -> 'a future val map: ('a -> 'b) -> 'a future -> 'b future + val cancel_group: Task_Queue.group -> unit future + val cancel: 'a future -> unit future + val interruptible_task: ('a -> 'b) -> 'a -> 'b val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list val promise_group: Task_Queue.group -> (unit -> unit) -> 'a future val promise: (unit -> unit) -> 'a future @@ -173,16 +176,6 @@ (* cancellation primitives *) -fun interruptible_task f x = - (if Multithreading.available then - Multithreading.with_attributes - (if is_some (worker_task ()) - then Multithreading.private_interrupts - else Multithreading.public_interrupts) - (fn _ => f x) - else interruptible f x) - before Multithreading.interrupted (); - fun cancel_now group = (*requires SYNCHRONIZED*) Task_Queue.cancel (! queue) group; @@ -213,7 +206,7 @@ val test = Exn.capture Multithreading.interrupted (); val _ = if ok andalso not (Exn.is_interrupt_exn test) then () - else if cancel_now group then () + else if null (cancel_now group) then () else cancel_later group; val _ = broadcast work_finished; val _ = if maximal then () else signal work_available; @@ -347,7 +340,7 @@ else (Multithreading.tracing 1 (fn () => string_of_int (length (! canceled)) ^ " canceled groups"); - Unsynchronized.change canceled (filter_out cancel_now); + Unsynchronized.change canceled (filter_out (null o cancel_now)); broadcast_work ()); @@ -386,20 +379,18 @@ if scheduler_active () then () else scheduler := SOME (Simple_Thread.fork false scheduler_loop)); +fun scheduler_cancel group = SYNCHRONIZED "scheduler_cancel" (fn () => + let + val running = cancel_now group; + val _ = + if null running then () + else (cancel_later group; signal work_available; scheduler_check ()); + in running end); + (** futures **) -(* cancellation *) - -(*cancel: present and future group members will be interrupted eventually*) -fun cancel_group group = SYNCHRONIZED "cancel" (fn () => - (if cancel_now group then () else cancel_later group; - signal work_available; scheduler_check ())); - -fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); - - (* future jobs *) fun assign_result group result raw_res = @@ -559,6 +550,29 @@ else map (fn e => value_result (Exn.interruptible_capture e ())) es; +(* cancel *) + +fun cancel_group group = + (case scheduler_cancel group of + [] => value () + | running => + singleton + (forks {name = "cancel_group", group = SOME (Task_Queue.new_group NONE), + deps = running, pri = 0, interrupts = false}) I); + +fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); + +fun interruptible_task f x = + (if Multithreading.available then + Multithreading.with_attributes + (if is_some (worker_task ()) + then Multithreading.private_interrupts + else Multithreading.public_interrupts) + (fn _ => f x) + else interruptible f x) + before Multithreading.interrupted (); + + (* promised futures -- fulfilled by external means *) fun promise_group group abort : 'a future = diff -r b8f8488704e2 -r 061599cb6eb0 src/Pure/Concurrent/par_list.ML --- a/src/Pure/Concurrent/par_list.ML Fri Aug 19 14:01:20 2011 +0200 +++ b/src/Pure/Concurrent/par_list.ML Fri Aug 19 15:56:26 2011 +0200 @@ -39,7 +39,8 @@ Future.forks {name = name, group = SOME group, deps = [], pri = 0, interrupts = true} (map (fn x => fn () => f x) xs); val results = Future.join_results futures - handle exn => (if Exn.is_interrupt exn then Future.cancel_group group else (); reraise exn); + handle exn => + (if Exn.is_interrupt exn then ignore (Future.cancel_group group) else (); reraise exn); in results end; fun map_name name f xs = Par_Exn.release_first (managed_results name f xs); diff -r b8f8488704e2 -r 061599cb6eb0 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Fri Aug 19 14:01:20 2011 +0200 +++ b/src/Pure/Concurrent/task_queue.ML Fri Aug 19 15:56:26 2011 +0200 @@ -31,7 +31,7 @@ val known_task: queue -> task -> bool val all_passive: queue -> bool val status: queue -> {ready: int, pending: int, running: int, passive: int} - val cancel: queue -> group -> bool + val cancel: queue -> group -> task list val cancel_all: queue -> group list val finish: task -> queue -> bool * queue val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue @@ -248,10 +248,12 @@ let val _ = cancel_group group Exn.Interrupt; val running = - Tasks.fold (#1 #> get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) + Tasks.fold (fn (task, _) => + (case get_job jobs task of Running thread => cons (task, thread) | _ => I)) (get_tasks groups (group_id group)) []; - val _ = List.app Simple_Thread.interrupt_unsynchronized running; - in null running end; + val threads = fold (insert Thread.equal o #2) running []; + val _ = List.app Simple_Thread.interrupt_unsynchronized threads; + in map #1 running end; fun cancel_all (Queue {jobs, ...}) = let diff -r b8f8488704e2 -r 061599cb6eb0 src/Pure/PIDE/document.ML --- a/src/Pure/PIDE/document.ML Fri Aug 19 14:01:20 2011 +0200 +++ b/src/Pure/PIDE/document.ML Fri Aug 19 15:56:26 2011 +0200 @@ -24,7 +24,7 @@ type state val init_state: state val join_commands: state -> unit - val cancel_execution: state -> unit -> unit + val cancel_execution: state -> unit future val define_command: command_id -> string -> state -> state val edit: version_id -> version_id -> edit list -> state -> (command_id * exec_id) list * state val execute: version_id -> state -> state @@ -164,7 +164,7 @@ {versions: version Inttab.table, (*version_id -> document content*) commands: Toplevel.transition future Inttab.table, (*command_id -> transition (future parsing)*) execs: Toplevel.state lazy Inttab.table, (*exec_id -> execution process*) - execution: unit future list} (*global execution process*) + execution: Task_Queue.group} (*global execution process*) with fun make_state (versions, commands, execs, execution) = @@ -177,7 +177,7 @@ make_state (Inttab.make [(no_id, empty_version)], Inttab.make [(no_id, Future.value Toplevel.empty)], Inttab.make [(no_id, empty_exec)], - []); + Task_Queue.new_group NONE); (* document versions *) @@ -233,9 +233,7 @@ (* document execution *) -fun cancel_execution (State {execution, ...}) = - (List.app Future.cancel execution; - fn () => ignore (Future.join_results execution)); +fun cancel_execution (State {execution, ...}) = Future.cancel_group execution; end; @@ -393,17 +391,18 @@ fun force_exec NONE = () | force_exec (SOME exec_id) = ignore (Lazy.force (the_exec state exec_id)); - val execution' = + val execution = Task_Queue.new_group NONE; + val _ = nodes_of version |> Graph.schedule (fn deps => fn (name, node) => singleton (Future.forks - {name = "theory:" ^ name, group = NONE, + {name = "theory:" ^ name, group = SOME (Task_Queue.new_group (SOME execution)), deps = map (Future.task_of o #2) deps, pri = 1, interrupts = true}) (fold_entries NONE (fn (_, exec) => fn () => force_exec exec) node)); - in (versions, commands, execs, execution') end); + in (versions, commands, execs, execution) end); diff -r b8f8488704e2 -r 061599cb6eb0 src/Pure/PIDE/isar_document.ML --- a/src/Pure/PIDE/isar_document.ML Fri Aug 19 14:01:20 2011 +0200 +++ b/src/Pure/PIDE/isar_document.ML Fri Aug 19 15:56:26 2011 +0200 @@ -30,9 +30,9 @@ fn ([a], []) => Document.Header (Exn.Exn (ERROR a))])) end; - val await_cancellation = Document.cancel_execution state; + val cancellation = Document.cancel_execution state; val (updates, state') = Document.edit old_id new_id edits state; - val _ = await_cancellation (); + val _ = Future.join cancellation; val _ = Document.join_commands state'; val _ = Output.status (Markup.markup (Markup.assign new_id)