more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
tuned signature;
--- a/src/Pure/Concurrent/future.ML Mon Nov 25 21:36:10 2013 +0100
+++ b/src/Pure/Concurrent/future.ML Thu Nov 28 12:54:39 2013 +0100
@@ -48,7 +48,6 @@
val worker_group: unit -> group option
val the_worker_group: unit -> group
val worker_subgroup: unit -> group
- val worker_context: string -> group -> ('a -> 'b) -> 'a -> 'b
type 'a future
val task_of: 'a future -> task
val peek: 'a future -> 'a Exn.result option
@@ -68,6 +67,7 @@
val joins: 'a future list -> 'a list
val join: 'a future -> 'a
val join_tasks: task list -> unit
+ val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b
val value_result: 'a Exn.result -> 'a future
val value: 'a -> 'a future
val cond_forks: params -> (unit -> 'a) list -> 'a future list
@@ -109,9 +109,6 @@
fun worker_subgroup () = new_group (worker_group ());
-fun worker_context name group f x =
- setmp_worker_task (Task_Queue.new_task group name NONE) f x;
-
fun worker_joining e =
(case worker_task () of
NONE => e ()
@@ -471,7 +468,7 @@
(* future jobs *)
-fun future_job group interrupts (e: unit -> 'a) =
+fun future_job group atts (e: unit -> 'a) =
let
val result = Single_Assignment.var "future" : 'a result;
val pos = Position.thread_data ();
@@ -480,10 +477,7 @@
val res =
if ok then
Exn.capture (fn () =>
- Multithreading.with_attributes
- (if interrupts
- then Multithreading.private_interrupts else Multithreading.no_interrupts)
- (fn _ => Position.setmp_thread_data pos e ())) ()
+ Multithreading.with_attributes atts (fn _ => Position.setmp_thread_data pos e ())) ()
else Exn.interrupt_exn;
in assign_result group result (identify_result pos res) end;
in (result, job) end;
@@ -504,7 +498,11 @@
| SOME grp => grp);
fun enqueue e queue =
let
- val (result, job) = future_job grp interrupts e;
+ val atts =
+ if interrupts
+ then Multithreading.private_interrupts
+ else Multithreading.no_interrupts;
+ val (result, job) = future_job grp atts e;
val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
val future = Future {promised = false, task = task, result = result};
in (future, queue') end;
@@ -580,6 +578,23 @@
|> join;
+(* task context for running thread *)
+
+fun task_context name group f x =
+ Multithreading.with_attributes Multithreading.no_interrupts (fn orig_atts =>
+ let
+ val (result, job) = future_job group orig_atts (fn () => f x);
+ val task =
+ SYNCHRONIZED "enroll" (fn () =>
+ Unsynchronized.change_result queue (Task_Queue.enroll (Thread.self ()) name group));
+ val _ = worker_exec (task, [job]);
+ in
+ (case Single_Assignment.peek result of
+ NONE => raise Fail "Missing task context result"
+ | SOME res => Exn.release res)
+ end);
+
+
(* fast-path operations -- bypass task queue if possible *)
fun value_result (res: 'a Exn.result) =
@@ -602,7 +617,8 @@
let
val task = task_of x;
val group = Task_Queue.group_of_task task;
- val (result, job) = future_job group true (fn () => f (join x));
+ val (result, job) =
+ future_job group Multithreading.private_interrupts (fn () => f (join x));
val extended = SYNCHRONIZED "extend" (fn () =>
(case Task_Queue.extend task job (! queue) of
--- a/src/Pure/Concurrent/task_queue.ML Mon Nov 25 21:36:10 2013 +0100
+++ b/src/Pure/Concurrent/task_queue.ML Thu Nov 28 12:54:39 2013 +0100
@@ -17,7 +17,6 @@
val str_of_groups: group -> string
type task
val dummy_task: task
- val new_task: group -> string -> int option -> task
val group_of_task: task -> group
val name_of_task: task -> string
val pri_of_task: task -> int
@@ -36,6 +35,7 @@
val cancel: queue -> group -> Thread.thread list
val cancel_all: queue -> group list * Thread.thread list
val finish: task -> queue -> bool * queue
+ val enroll: Thread.thread -> string -> group -> queue -> task * queue
val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
val enqueue: string -> group -> task list -> int -> (bool -> bool) -> queue -> task * queue
val extend: task -> (bool -> bool) -> queue -> queue option
@@ -295,6 +295,16 @@
in (maximal, make_queue groups' jobs') end;
+(* enroll *)
+
+fun enroll thread name group (Queue {groups, jobs}) =
+ let
+ val task = new_task group name NONE;
+ val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups;
+ val jobs' = jobs |> Task_Graph.new_node (task, Running thread);
+ in (task, make_queue groups' jobs') end;
+
+
(* enqueue *)
fun enqueue_passive group abort (Queue {groups, jobs}) =
--- a/src/Pure/PIDE/command.ML Mon Nov 25 21:36:10 2013 +0100
+++ b/src/Pure/PIDE/command.ML Thu Nov 28 12:54:39 2013 +0100
@@ -63,7 +63,7 @@
val res =
(body
|> restore_attributes
- |> Future.worker_context "Command.memo_exec" group
+ |> Future.task_context "Command.memo_exec" group
|> Exn.interruptible_capture) ();
in SOME ((), Result res) end
else SOME ((), expr)
--- a/src/Pure/System/isabelle_process.ML Mon Nov 25 21:36:10 2013 +0100
+++ b/src/Pure/System/isabelle_process.ML Thu Nov 28 12:54:39 2013 +0100
@@ -159,8 +159,8 @@
NONE => raise Runtime.TERMINATE
| SOME line => map (read_chunk channel) (space_explode "," line));
-fun worker_context e =
- Future.worker_context "Isabelle_Process.loop" (Future.new_group NONE) e ();
+fun task_context e =
+ Future.task_context "Isabelle_Process.loop" (Future.new_group NONE) e ();
in
@@ -168,7 +168,7 @@
let val continue =
(case read_command channel of
[] => (Output.error_msg "Isabelle process: no input"; true)
- | name :: args => (worker_context (fn () => run_command name args); true))
+ | name :: args => (task_context (fn () => run_command name args); true))
handle Runtime.TERMINATE => false
| exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true);
in