more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
authorwenzelm
Thu, 28 Nov 2013 12:54:39 +0100
changeset 54649 99b9249b3e05
parent 54648 f38b113697a2
child 54650 d206c93c0267
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec); tuned signature;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
src/Pure/PIDE/command.ML
src/Pure/System/isabelle_process.ML
--- a/src/Pure/Concurrent/future.ML	Mon Nov 25 21:36:10 2013 +0100
+++ b/src/Pure/Concurrent/future.ML	Thu Nov 28 12:54:39 2013 +0100
@@ -48,7 +48,6 @@
   val worker_group: unit -> group option
   val the_worker_group: unit -> group
   val worker_subgroup: unit -> group
-  val worker_context: string -> group -> ('a -> 'b) -> 'a -> 'b
   type 'a future
   val task_of: 'a future -> task
   val peek: 'a future -> 'a Exn.result option
@@ -68,6 +67,7 @@
   val joins: 'a future list -> 'a list
   val join: 'a future -> 'a
   val join_tasks: task list -> unit
+  val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b
   val value_result: 'a Exn.result -> 'a future
   val value: 'a -> 'a future
   val cond_forks: params -> (unit -> 'a) list -> 'a future list
@@ -109,9 +109,6 @@
 
 fun worker_subgroup () = new_group (worker_group ());
 
-fun worker_context name group f x =
-  setmp_worker_task (Task_Queue.new_task group name NONE) f x;
-
 fun worker_joining e =
   (case worker_task () of
     NONE => e ()
@@ -471,7 +468,7 @@
 
 (* future jobs *)
 
-fun future_job group interrupts (e: unit -> 'a) =
+fun future_job group atts (e: unit -> 'a) =
   let
     val result = Single_Assignment.var "future" : 'a result;
     val pos = Position.thread_data ();
@@ -480,10 +477,7 @@
         val res =
           if ok then
             Exn.capture (fn () =>
-              Multithreading.with_attributes
-                (if interrupts
-                 then Multithreading.private_interrupts else Multithreading.no_interrupts)
-                (fn _ => Position.setmp_thread_data pos e ())) ()
+              Multithreading.with_attributes atts (fn _ => Position.setmp_thread_data pos e ())) ()
           else Exn.interrupt_exn;
       in assign_result group result (identify_result pos res) end;
   in (result, job) end;
@@ -504,7 +498,11 @@
         | SOME grp => grp);
       fun enqueue e queue =
         let
-          val (result, job) = future_job grp interrupts e;
+          val atts =
+            if interrupts
+            then Multithreading.private_interrupts
+            else Multithreading.no_interrupts;
+          val (result, job) = future_job grp atts e;
           val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
           val future = Future {promised = false, task = task, result = result};
         in (future, queue') end;
@@ -580,6 +578,23 @@
     |> join;
 
 
+(* task context for running thread *)
+
+fun task_context name group f x =
+  Multithreading.with_attributes Multithreading.no_interrupts (fn orig_atts =>
+    let
+      val (result, job) = future_job group orig_atts (fn () => f x);
+      val task =
+        SYNCHRONIZED "enroll" (fn () =>
+          Unsynchronized.change_result queue (Task_Queue.enroll (Thread.self ()) name group));
+      val _ = worker_exec (task, [job]);
+    in
+      (case Single_Assignment.peek result of
+        NONE => raise Fail "Missing task context result"
+      | SOME res => Exn.release res)
+    end);
+
+
 (* fast-path operations -- bypass task queue if possible *)
 
 fun value_result (res: 'a Exn.result) =
@@ -602,7 +617,8 @@
     let
       val task = task_of x;
       val group = Task_Queue.group_of_task task;
-      val (result, job) = future_job group true (fn () => f (join x));
+      val (result, job) =
+        future_job group Multithreading.private_interrupts (fn () => f (join x));
 
       val extended = SYNCHRONIZED "extend" (fn () =>
         (case Task_Queue.extend task job (! queue) of
--- a/src/Pure/Concurrent/task_queue.ML	Mon Nov 25 21:36:10 2013 +0100
+++ b/src/Pure/Concurrent/task_queue.ML	Thu Nov 28 12:54:39 2013 +0100
@@ -17,7 +17,6 @@
   val str_of_groups: group -> string
   type task
   val dummy_task: task
-  val new_task: group -> string -> int option -> task
   val group_of_task: task -> group
   val name_of_task: task -> string
   val pri_of_task: task -> int
@@ -36,6 +35,7 @@
   val cancel: queue -> group -> Thread.thread list
   val cancel_all: queue -> group list * Thread.thread list
   val finish: task -> queue -> bool * queue
+  val enroll: Thread.thread -> string -> group -> queue -> task * queue
   val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
   val enqueue: string -> group -> task list -> int -> (bool -> bool) -> queue -> task * queue
   val extend: task -> (bool -> bool) -> queue -> queue option
@@ -295,6 +295,16 @@
   in (maximal, make_queue groups' jobs') end;
 
 
+(* enroll *)
+
+fun enroll thread name group (Queue {groups, jobs}) =
+  let
+    val task = new_task group name NONE;
+    val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups;
+    val jobs' = jobs |> Task_Graph.new_node (task, Running thread);
+  in (task, make_queue groups' jobs') end;
+
+
 (* enqueue *)
 
 fun enqueue_passive group abort (Queue {groups, jobs}) =
--- a/src/Pure/PIDE/command.ML	Mon Nov 25 21:36:10 2013 +0100
+++ b/src/Pure/PIDE/command.ML	Thu Nov 28 12:54:39 2013 +0100
@@ -63,7 +63,7 @@
                   val res =
                     (body
                       |> restore_attributes
-                      |> Future.worker_context "Command.memo_exec" group
+                      |> Future.task_context "Command.memo_exec" group
                       |> Exn.interruptible_capture) ();
                 in SOME ((), Result res) end
               else SOME ((), expr)
--- a/src/Pure/System/isabelle_process.ML	Mon Nov 25 21:36:10 2013 +0100
+++ b/src/Pure/System/isabelle_process.ML	Thu Nov 28 12:54:39 2013 +0100
@@ -159,8 +159,8 @@
     NONE => raise Runtime.TERMINATE
   | SOME line => map (read_chunk channel) (space_explode "," line));
 
-fun worker_context e =
-  Future.worker_context "Isabelle_Process.loop" (Future.new_group NONE) e ();
+fun task_context e =
+  Future.task_context "Isabelle_Process.loop" (Future.new_group NONE) e ();
 
 in
 
@@ -168,7 +168,7 @@
   let val continue =
     (case read_command channel of
       [] => (Output.error_msg "Isabelle process: no input"; true)
-    | name :: args => (worker_context (fn () => run_command name args); true))
+    | name :: args => (task_context (fn () => run_command name args); true))
     handle Runtime.TERMINATE => false
       | exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true);
   in