# HG changeset patch # User wenzelm # Date 1313768377 -7200 # Node ID 65f60d9ac4bff4f026bed40d5f6762ab301524e7 # Parent 349cc426d9292de36f7fc233bc1d650d12b9ad02 tuned signature (again); tuned; diff -r 349cc426d929 -r 65f60d9ac4bf src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Fri Aug 19 16:13:26 2011 +0200 +++ b/src/Pure/Concurrent/future.ML Fri Aug 19 17:39:37 2011 +0200 @@ -52,6 +52,9 @@ val peek: 'a future -> 'a Exn.result option val is_finished: 'a future -> bool val get_finished: 'a future -> 'a + val interruptible_task: ('a -> 'b) -> 'a -> 'b + val cancel_group: group -> task list + val cancel: 'a future -> task list type fork_params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool} val forks: fork_params -> (unit -> 'a) list -> 'a future list @@ -60,12 +63,10 @@ val join_results: 'a future list -> 'a Exn.result list val join_result: 'a future -> 'a Exn.result val join: 'a future -> 'a + val join_tasks: task list -> unit val value_result: 'a Exn.result -> 'a future val value: 'a -> 'a future val map: ('a -> 'b) -> 'a future -> 'b future - val cancel_group: group -> unit future - val cancel: 'a future -> unit future - val interruptible_task: ('a -> 'b) -> 'a -> 'b val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list val promise_group: group -> (unit -> unit) -> 'a future val promise: (unit -> unit) -> 'a future @@ -191,6 +192,17 @@ broadcast scheduler_event); +fun interruptible_task f x = + (if Multithreading.available then + Multithreading.with_attributes + (if is_some (worker_task ()) + then Multithreading.private_interrupts + else Multithreading.public_interrupts) + (fn _ => f x) + else interruptible f x) + before Multithreading.interrupted (); + + (* worker threads *) fun worker_exec (task, jobs) = @@ -386,7 +398,13 @@ if scheduler_active () then () else scheduler := SOME (Simple_Thread.fork false scheduler_loop)); -fun scheduler_cancel group = SYNCHRONIZED "scheduler_cancel" (fn () => + + +(** futures **) + +(* cancel *) + +fun cancel_group group = SYNCHRONIZED "cancel_group" (fn () => let val running = cancel_now group; val _ = @@ -394,9 +412,8 @@ else (cancel_later group; signal work_available; scheduler_check ()); in running end); - +fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); -(** futures **) (* future jobs *) @@ -467,7 +484,7 @@ end; fun fork_pri pri e = - singleton (forks {name = "", group = NONE, deps = [], pri = pri, interrupts = true}) e; + (singleton o forks) {name = "fork", group = NONE, deps = [], pri = pri, interrupts = true} e; fun fork e = fork_pri 0 e; @@ -519,6 +536,13 @@ fun join_result x = singleton join_results x; fun join x = Exn.release (join_result x); +fun join_tasks [] = () + | join_tasks tasks = + (singleton o forks) + {name = "join_tasks", group = SOME (new_group NONE), + deps = tasks, pri = 0, interrupts = false} I + |> join; + (* fast-path versions -- bypassing task queue *) @@ -545,9 +569,9 @@ in if extended then Future {promised = false, task = task, result = result} else - singleton - (forks {name = "Future.map", group = SOME group, deps = [task], - pri = Task_Queue.pri_of_task task, interrupts = true}) + (singleton o forks) + {name = "map_future", group = SOME group, deps = [task], + pri = Task_Queue.pri_of_task task, interrupts = true} (fn () => f (join x)) end; @@ -556,29 +580,6 @@ else map (fn e => value_result (Exn.interruptible_capture e ())) es; -(* cancel *) - -fun cancel_group group = - (case scheduler_cancel group of - [] => value () - | running => - singleton - (forks {name = "cancel_group", group = SOME (new_group NONE), - deps = running, pri = 0, interrupts = false}) I); - -fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); - -fun interruptible_task f x = - (if Multithreading.available then - Multithreading.with_attributes - (if is_some (worker_task ()) - then Multithreading.private_interrupts - else Multithreading.public_interrupts) - (fn _ => f x) - else interruptible f x) - before Multithreading.interrupted (); - - (* promised futures -- fulfilled by external means *) fun promise_group group abort : 'a future = diff -r 349cc426d929 -r 65f60d9ac4bf src/Pure/PIDE/document.ML --- a/src/Pure/PIDE/document.ML Fri Aug 19 16:13:26 2011 +0200 +++ b/src/Pure/PIDE/document.ML Fri Aug 19 17:39:37 2011 +0200 @@ -24,7 +24,7 @@ type state val init_state: state val join_commands: state -> unit - val cancel_execution: state -> unit future + val cancel_execution: state -> Future.task list val define_command: command_id -> string -> state -> state val edit: version_id -> version_id -> edit list -> state -> (command_id * exec_id) list * state val execute: version_id -> state -> state diff -r 349cc426d929 -r 65f60d9ac4bf src/Pure/PIDE/isar_document.ML --- a/src/Pure/PIDE/isar_document.ML Fri Aug 19 16:13:26 2011 +0200 +++ b/src/Pure/PIDE/isar_document.ML Fri Aug 19 17:39:37 2011 +0200 @@ -30,9 +30,9 @@ fn ([a], []) => Document.Header (Exn.Exn (ERROR a))])) end; - val cancellation = Document.cancel_execution state; + val running = Document.cancel_execution state; val (updates, state') = Document.edit old_id new_id edits state; - val _ = Future.join cancellation; + val _ = Future.join_tasks running; val _ = Document.join_commands state'; val _ = Output.status (Markup.markup (Markup.assign new_id)