src/Pure/Concurrent/future.ML
changeset 41683 73dde8006820
parent 41681 b5d7b15166bf
child 41695 afdbec23b92b
--- a/src/Pure/Concurrent/future.ML	Wed Feb 02 13:44:40 2011 +0100
+++ b/src/Pure/Concurrent/future.ML	Wed Feb 02 15:04:09 2011 +0100
@@ -32,19 +32,16 @@
 
 signature FUTURE =
 sig
-  type task = Task_Queue.task
-  type group = Task_Queue.group
-  val is_worker: unit -> bool
   val worker_task: unit -> Task_Queue.task option
   val worker_group: unit -> Task_Queue.group option
   val worker_subgroup: unit -> Task_Queue.group
   type 'a future
-  val task_of: 'a future -> task
-  val group_of: 'a future -> group
+  val task_of: 'a future -> Task_Queue.task
   val peek: 'a future -> 'a Exn.result option
   val is_finished: 'a future -> bool
-  val forks: {name: string, group: group option, deps: task list, pri: int} ->
-    (unit -> 'a) list -> 'a future list
+  val forks:
+    {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} ->
+      (unit -> 'a) list -> 'a future list
   val fork_pri: int -> (unit -> 'a) -> 'a future
   val fork: (unit -> 'a) -> 'a future
   val join_results: 'a future list -> 'a Exn.result list
@@ -52,12 +49,12 @@
   val join: 'a future -> 'a
   val value: 'a -> 'a future
   val map: ('a -> 'b) -> 'a future -> 'b future
-  val promise_group: group -> 'a future
+  val promise_group: Task_Queue.group -> 'a future
   val promise: unit -> 'a future
   val fulfill_result: 'a future -> 'a Exn.result -> unit
   val fulfill: 'a future -> 'a -> unit
   val interruptible_task: ('a -> 'b) -> 'a -> 'b
-  val cancel_group: group -> unit
+  val cancel_group: Task_Queue.group -> unit
   val cancel: 'a future -> unit
   val shutdown: unit -> unit
   val status: (unit -> 'a) -> 'a
@@ -70,20 +67,15 @@
 
 (* identifiers *)
 
-type task = Task_Queue.task;
-type group = Task_Queue.group;
-
 local
-  val tag = Universal.tag () : (task * group) option Universal.tag;
+  val tag = Universal.tag () : Task_Queue.task option Universal.tag;
 in
-  fun thread_data () = the_default NONE (Thread.getLocal tag);
-  fun setmp_thread_data data f x =
-    Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
+  fun worker_task () = the_default NONE (Thread.getLocal tag);
+  fun setmp_worker_task data f x =
+    Library.setmp_thread_data tag (worker_task ()) (SOME data) f x;
 end;
 
-val is_worker = is_some o thread_data;
-val worker_task = Option.map #1 o thread_data;
-val worker_group = Option.map #2 o thread_data;
+val worker_group = Option.map Task_Queue.group_of_task o worker_task;
 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
 
 fun worker_joining e =
@@ -103,12 +95,10 @@
 
 datatype 'a future = Future of
  {promised: bool,
-  task: task,
-  group: group,
+  task: Task_Queue.task,
   result: 'a result};
 
 fun task_of (Future {task, ...}) = task;
-fun group_of (Future {group, ...}) = group;
 fun result_of (Future {result, ...}) = result;
 
 fun peek x = Single_Assignment.peek (result_of x);
@@ -204,12 +194,13 @@
  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   broadcast scheduler_event);
 
-fun execute (task, group, jobs) =
+fun execute (task, jobs) =
   let
+    val group = Task_Queue.group_of_task task;
     val valid = not (Task_Queue.is_canceled group);
     val ok =
       Task_Queue.running task (fn () =>
-        setmp_thread_data (task, group) (fn () =>
+        setmp_worker_task task (fn () =>
           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
     val _ = Multithreading.tracing 1 (fn () =>
       let
@@ -416,7 +407,7 @@
         let
           val (result, job) = future_job grp e;
           val ((task, minimal'), queue') = Task_Queue.enqueue name grp deps pri job queue;
-          val future = Future {promised = false, task = task, group = grp, result = result};
+          val future = Future {promised = false, task = task, result = result};
         in (future, (minimal orelse minimal', queue')) end;
     in
       SYNCHRONIZED "enqueue" (fn () =>
@@ -443,7 +434,7 @@
     NONE => Exn.Exn (Fail "Unfinished future")
   | SOME res =>
       if Exn.is_interrupt_exn res then
-        (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
+        (case Exn.flatten_list (Task_Queue.group_status (Task_Queue.group_of_task (task_of x))) of
           [] => res
         | exns => Exn.Exn (Exn.EXCEPTIONS exns))
       else res);
@@ -470,7 +461,7 @@
       if forall is_finished xs then ()
       else if Multithreading.self_critical () then
         error "Cannot join future values within critical section"
-      else if is_some (thread_data ()) then
+      else if is_some (worker_task ()) then
         join_work (Task_Queue.init_deps (map task_of xs))
       else List.app (ignore o Single_Assignment.await o result_of) xs;
   in map get_result xs end;
@@ -485,15 +476,16 @@
 
 fun value (x: 'a) =
   let
-    val group = Task_Queue.new_group NONE;
+    val task = Task_Queue.dummy_task ();
+    val group = Task_Queue.group_of_task task;
     val result = Single_Assignment.var "value" : 'a result;
     val _ = assign_result group result (Exn.Result x);
-  in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
+  in Future {promised = false, task = task, result = result} end;
 
 fun map_future f x =
   let
     val task = task_of x;
-    val group = Task_Queue.new_group (SOME (group_of x));
+    val group = Task_Queue.new_group (SOME (Task_Queue.group_of_task task));
     val (result, job) = future_job group (fn () => f (join x));
 
     val extended = SYNCHRONIZED "extend" (fn () =>
@@ -501,7 +493,7 @@
         SOME queue' => (queue := queue'; true)
       | NONE => false));
   in
-    if extended then Future {promised = false, task = task, group = group, result = result}
+    if extended then Future {promised = false, task = task, result = result}
     else
       singleton
         (forks {name = "Future.map", group = SOME group,
@@ -522,14 +514,15 @@
             else reraise exn;
     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
-  in Future {promised = true, task = task, group = group, result = result} end;
+  in Future {promised = true, task = task, result = result} end;
 
 fun promise () = promise_group (worker_subgroup ());
 
-fun fulfill_result (Future {promised, task, group, result}) res =
+fun fulfill_result (Future {promised, task, result}) res =
   if not promised then raise Fail "Not a promised future"
   else
     let
+      val group = Task_Queue.group_of_task task;
       fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
       val _ =
         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
@@ -538,7 +531,7 @@
               SYNCHRONIZED "fulfill_result" (fn () =>
                 Unsynchronized.change_result queue
                   (Task_Queue.dequeue_passive (Thread.self ()) task));
-          in if still_passive then execute (task, group, [job]) else () end);
+          in if still_passive then execute (task, [job]) else () end);
       val _ =
         worker_waiting (Task_Queue.init_deps [task])
           (fn () => Single_Assignment.await result);
@@ -552,7 +545,7 @@
 fun interruptible_task f x =
   if Multithreading.available then
     Multithreading.with_attributes
-      (if is_worker ()
+      (if is_some (worker_task ())
        then Multithreading.private_interrupts
        else Multithreading.public_interrupts)
       (fn _ => f x)
@@ -563,7 +556,7 @@
  (if cancel_now group then () else cancel_later group;
   signal work_available; scheduler_check ()));
 
-fun cancel x = cancel_group (group_of x);
+fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
 
 
 (* shutdown *)