# HG changeset patch # User wenzelm # Date 1385639679 -3600 # Node ID 99b9249b3e05c52f62ee6b479e723b44dfd0be77 # Parent f38b113697a20a182f28904c2ab25bde9a90b61e more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec); tuned signature; diff -r f38b113697a2 -r 99b9249b3e05 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Mon Nov 25 21:36:10 2013 +0100 +++ b/src/Pure/Concurrent/future.ML Thu Nov 28 12:54:39 2013 +0100 @@ -48,7 +48,6 @@ val worker_group: unit -> group option val the_worker_group: unit -> group val worker_subgroup: unit -> group - val worker_context: string -> group -> ('a -> 'b) -> 'a -> 'b type 'a future val task_of: 'a future -> task val peek: 'a future -> 'a Exn.result option @@ -68,6 +67,7 @@ val joins: 'a future list -> 'a list val join: 'a future -> 'a val join_tasks: task list -> unit + val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b val value_result: 'a Exn.result -> 'a future val value: 'a -> 'a future val cond_forks: params -> (unit -> 'a) list -> 'a future list @@ -109,9 +109,6 @@ fun worker_subgroup () = new_group (worker_group ()); -fun worker_context name group f x = - setmp_worker_task (Task_Queue.new_task group name NONE) f x; - fun worker_joining e = (case worker_task () of NONE => e () @@ -471,7 +468,7 @@ (* future jobs *) -fun future_job group interrupts (e: unit -> 'a) = +fun future_job group atts (e: unit -> 'a) = let val result = Single_Assignment.var "future" : 'a result; val pos = Position.thread_data (); @@ -480,10 +477,7 @@ val res = if ok then Exn.capture (fn () => - Multithreading.with_attributes - (if interrupts - then Multithreading.private_interrupts else Multithreading.no_interrupts) - (fn _ => Position.setmp_thread_data pos e ())) () + Multithreading.with_attributes atts (fn _ => Position.setmp_thread_data pos e ())) () else Exn.interrupt_exn; in assign_result group result (identify_result pos res) end; in (result, job) end; @@ -504,7 +498,11 @@ | SOME grp => grp); fun enqueue e queue = let - val (result, job) = future_job grp interrupts e; + val atts = + if interrupts + then Multithreading.private_interrupts + else Multithreading.no_interrupts; + val (result, job) = future_job grp atts e; val (task, queue') = Task_Queue.enqueue name grp deps pri job queue; val future = Future {promised = false, task = task, result = result}; in (future, queue') end; @@ -580,6 +578,23 @@ |> join; +(* task context for running thread *) + +fun task_context name group f x = + Multithreading.with_attributes Multithreading.no_interrupts (fn orig_atts => + let + val (result, job) = future_job group orig_atts (fn () => f x); + val task = + SYNCHRONIZED "enroll" (fn () => + Unsynchronized.change_result queue (Task_Queue.enroll (Thread.self ()) name group)); + val _ = worker_exec (task, [job]); + in + (case Single_Assignment.peek result of + NONE => raise Fail "Missing task context result" + | SOME res => Exn.release res) + end); + + (* fast-path operations -- bypass task queue if possible *) fun value_result (res: 'a Exn.result) = @@ -602,7 +617,8 @@ let val task = task_of x; val group = Task_Queue.group_of_task task; - val (result, job) = future_job group true (fn () => f (join x)); + val (result, job) = + future_job group Multithreading.private_interrupts (fn () => f (join x)); val extended = SYNCHRONIZED "extend" (fn () => (case Task_Queue.extend task job (! queue) of diff -r f38b113697a2 -r 99b9249b3e05 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Mon Nov 25 21:36:10 2013 +0100 +++ b/src/Pure/Concurrent/task_queue.ML Thu Nov 28 12:54:39 2013 +0100 @@ -17,7 +17,6 @@ val str_of_groups: group -> string type task val dummy_task: task - val new_task: group -> string -> int option -> task val group_of_task: task -> group val name_of_task: task -> string val pri_of_task: task -> int @@ -36,6 +35,7 @@ val cancel: queue -> group -> Thread.thread list val cancel_all: queue -> group list * Thread.thread list val finish: task -> queue -> bool * queue + val enroll: Thread.thread -> string -> group -> queue -> task * queue val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue val enqueue: string -> group -> task list -> int -> (bool -> bool) -> queue -> task * queue val extend: task -> (bool -> bool) -> queue -> queue option @@ -295,6 +295,16 @@ in (maximal, make_queue groups' jobs') end; +(* enroll *) + +fun enroll thread name group (Queue {groups, jobs}) = + let + val task = new_task group name NONE; + val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups; + val jobs' = jobs |> Task_Graph.new_node (task, Running thread); + in (task, make_queue groups' jobs') end; + + (* enqueue *) fun enqueue_passive group abort (Queue {groups, jobs}) = diff -r f38b113697a2 -r 99b9249b3e05 src/Pure/PIDE/command.ML --- a/src/Pure/PIDE/command.ML Mon Nov 25 21:36:10 2013 +0100 +++ b/src/Pure/PIDE/command.ML Thu Nov 28 12:54:39 2013 +0100 @@ -63,7 +63,7 @@ val res = (body |> restore_attributes - |> Future.worker_context "Command.memo_exec" group + |> Future.task_context "Command.memo_exec" group |> Exn.interruptible_capture) (); in SOME ((), Result res) end else SOME ((), expr) diff -r f38b113697a2 -r 99b9249b3e05 src/Pure/System/isabelle_process.ML --- a/src/Pure/System/isabelle_process.ML Mon Nov 25 21:36:10 2013 +0100 +++ b/src/Pure/System/isabelle_process.ML Thu Nov 28 12:54:39 2013 +0100 @@ -159,8 +159,8 @@ NONE => raise Runtime.TERMINATE | SOME line => map (read_chunk channel) (space_explode "," line)); -fun worker_context e = - Future.worker_context "Isabelle_Process.loop" (Future.new_group NONE) e (); +fun task_context e = + Future.task_context "Isabelle_Process.loop" (Future.new_group NONE) e (); in @@ -168,7 +168,7 @@ let val continue = (case read_command channel of [] => (Output.error_msg "Isabelle process: no input"; true) - | name :: args => (worker_context (fn () => run_command name args); true)) + | name :: args => (task_context (fn () => run_command name args); true)) handle Runtime.TERMINATE => false | exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true); in