tuned source structure;
authorwenzelm
Wed, 10 Aug 2011 14:04:45 +0200
changeset 44110 058520fa03a8
parent 44109 7a44005dc2ec
child 44111 2d16c693d536
tuned source structure;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Wed Aug 10 10:59:37 2011 +0200
+++ b/src/Pure/Concurrent/future.ML	Wed Aug 10 14:04:45 2011 +0200
@@ -39,6 +39,8 @@
   val task_of: 'a future -> Task_Queue.task
   val peek: 'a future -> 'a Exn.result option
   val is_finished: 'a future -> bool
+  val cancel_group: Task_Queue.group -> unit
+  val cancel: 'a future -> unit
   val forks:
     {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} ->
       (unit -> 'a) list -> 'a future list
@@ -57,8 +59,6 @@
   val fulfill_result: 'a future -> 'a Exn.result -> unit
   val fulfill: 'a future -> 'a -> unit
   val interruptible_task: ('a -> 'b) -> 'a -> 'b
-  val cancel_group: Task_Queue.group -> unit
-  val cancel: 'a future -> unit
   val shutdown: unit -> unit
   val status: (unit -> 'a) -> 'a
 end;
@@ -74,8 +74,7 @@
   val tag = Universal.tag () : Task_Queue.task option Universal.tag;
 in
   fun worker_task () = the_default NONE (Thread.getLocal tag);
-  fun setmp_worker_task data f x =
-    Library.setmp_thread_data tag (worker_task ()) (SOME data) f x;
+  fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
 end;
 
 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
@@ -107,19 +106,6 @@
 fun peek x = Single_Assignment.peek (result_of x);
 fun is_finished x = is_some (peek x);
 
-fun assign_result group result res =
-  let
-    val _ = Single_Assignment.assign result res
-      handle exn as Fail _ =>
-        (case Single_Assignment.peek result of
-          SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
-        | _ => reraise exn);
-    val ok =
-      (case the (Single_Assignment.peek result) of
-        Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
-      | Exn.Res _ => true);
-  in ok end;
-
 
 
 (** scheduling **)
@@ -173,23 +159,16 @@
   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
 
 
-(* execute future jobs *)
+(* cancellation primitives *)
 
-fun future_job group (e: unit -> 'a) =
-  let
-    val result = Single_Assignment.var "future" : 'a result;
-    val pos = Position.thread_data ();
-    fun job ok =
-      let
-        val res =
-          if ok then
-            Exn.capture (fn () =>
-              Multithreading.with_attributes Multithreading.private_interrupts
-                (fn _ => Position.setmp_thread_data pos e ()) before
-              Multithreading.interrupted ()) ()
-          else Exn.interrupt_exn;
-      in assign_result group result res end;
-  in (result, job) end;
+fun interruptible_task f x =
+  if Multithreading.available then
+    Multithreading.with_attributes
+      (if is_some (worker_task ())
+       then Multithreading.private_interrupts
+       else Multithreading.public_interrupts)
+      (fn _ => f x)
+  else interruptible f x;
 
 fun cancel_now group = (*requires SYNCHRONIZED*)
   Task_Queue.cancel (! queue) group;
@@ -198,7 +177,10 @@
  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   broadcast scheduler_event);
 
-fun execute (task, jobs) =
+
+(* worker threads *)
+
+fun worker_exec (task, jobs) =
   let
     val group = Task_Queue.group_of_task task;
     val valid = not (Task_Queue.is_canceled group);
@@ -224,9 +206,6 @@
       in () end);
   in () end;
 
-
-(* worker threads *)
-
 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   let
     val state =
@@ -253,7 +232,7 @@
 fun worker_loop name =
   (case SYNCHRONIZED name (fn () => worker_next ()) of
     NONE => ()
-  | SOME work => (Exn.capture Multithreading.interrupted (); execute work; worker_loop name));
+  | SOME work => (Exn.capture Multithreading.interrupted (); worker_exec work; worker_loop name));
 
 fun worker_start name = (*requires SYNCHRONIZED*)
   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
@@ -397,6 +376,48 @@
 
 (** futures **)
 
+(* cancellation *)
+
+(*cancel: present and future group members will be interrupted eventually*)
+fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
+ (if cancel_now group then () else cancel_later group;
+  signal work_available; scheduler_check ()));
+
+fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
+
+
+(* future jobs *)
+
+fun assign_result group result res =
+  let
+    val _ = Single_Assignment.assign result res
+      handle exn as Fail _ =>
+        (case Single_Assignment.peek result of
+          SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
+        | _ => reraise exn);
+    val ok =
+      (case the (Single_Assignment.peek result) of
+        Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
+      | Exn.Res _ => true);
+  in ok end;
+
+fun future_job group (e: unit -> 'a) =
+  let
+    val result = Single_Assignment.var "future" : 'a result;
+    val pos = Position.thread_data ();
+    fun job ok =
+      let
+        val res =
+          if ok then
+            Exn.capture (fn () =>
+              Multithreading.with_attributes Multithreading.private_interrupts
+                (fn _ => Position.setmp_thread_data pos e ()) before
+              Multithreading.interrupted ()) ()
+          else Exn.interrupt_exn;
+      in assign_result group result res end;
+  in (result, job) end;
+
+
 (* fork *)
 
 fun forks {name, group, deps, pri} es =
@@ -452,7 +473,8 @@
     | (SOME work, deps') => SOME (work, deps'));
 
 fun execute_work NONE = ()
-  | execute_work (SOME (work, deps')) = (worker_joining (fn () => execute work); join_work deps')
+  | execute_work (SOME (work, deps')) =
+      (worker_joining (fn () => worker_exec work); join_work deps')
 and join_work deps =
   Multithreading.with_attributes Multithreading.no_interrupts
     (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
@@ -475,7 +497,7 @@
 fun join x = Exn.release (join_result x);
 
 
-(* fast-path versions -- bypassing full task management *)
+(* fast-path versions -- bypassing task queue *)
 
 fun value (x: 'a) =
   let
@@ -538,7 +560,7 @@
               SYNCHRONIZED "fulfill_result" (fn () =>
                 Unsynchronized.change_result queue
                   (Task_Queue.dequeue_passive (Thread.self ()) task));
-          in if still_passive then execute (task, [job]) else () end);
+          in if still_passive then worker_exec (task, [job]) else () end);
       val _ =
         if is_some (Single_Assignment.peek result) then ()
         else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
@@ -547,25 +569,6 @@
 fun fulfill x res = fulfill_result x (Exn.Res res);
 
 
-(* cancellation *)
-
-fun interruptible_task f x =
-  if Multithreading.available then
-    Multithreading.with_attributes
-      (if is_some (worker_task ())
-       then Multithreading.private_interrupts
-       else Multithreading.public_interrupts)
-      (fn _ => f x)
-  else interruptible f x;
-
-(*cancel: present and future group members will be interrupted eventually*)
-fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
- (if cancel_now group then () else cancel_later group;
-  signal work_available; scheduler_check ()));
-
-fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
-
-
 (* shutdown *)
 
 fun shutdown () =