moved thread data to future.ML (again);
authorwenzelm
Mon, 08 Sep 2008 20:33:27 +0200
changeset 28168 ba410235ff04
parent 28167 27e2ca41b58c
child 28169 356fc8734741
moved thread data to future.ML (again); dequeue: include group; more interrupt operations; misc tuning;
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/task_queue.ML	Mon Sep 08 20:33:24 2008 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Mon Sep 08 20:33:27 2008 +0200
@@ -8,31 +8,25 @@
 signature TASK_QUEUE =
 sig
   eqtype task
-  val get_thread_data: unit -> task option
-  val set_thread_data: task option -> unit
   eqtype group
   val new_group: unit -> group
   type queue
   val empty: queue
-  val dequeue: Thread.thread -> queue -> (task * (unit -> unit)) option * queue
-  val enqueue: group option -> task list -> (unit -> unit) -> queue -> task * queue
+  val enqueue: group option -> task list -> (unit -> bool) -> queue -> task * queue
+  val dequeue: Thread.thread -> queue -> (task * group option * (unit -> bool)) option * queue
   val finished: task -> queue -> queue
-  val interrupt: queue -> task -> unit
+  val interrupt_task: queue -> task -> unit
   val interrupt_group: queue -> group -> unit
+  val interrupt_task_group: queue -> task -> unit
 end;
 
-structure TaskQueue: TASK_QUEUE =
+structure TaskQueue (* : TASK_QUEUE *) =
 struct
 
-(* identified tasks *)
+(* identifiers *)
 
 datatype task = Task of serial;
 
-local val tag = Universal.tag () : task option Universal.tag in
-  fun get_thread_data () = the_default NONE (Thread.getLocal tag);
-  fun set_thread_data x = Thread.setLocal (tag, x);
-end;
-
 datatype group = Group of serial;
 fun new_group () = Group (serial ());
 
@@ -40,13 +34,13 @@
 (* queue of dependent jobs *)
 
 datatype job =
-  Job of unit -> unit |
+  Job of unit -> bool |
   Running of Thread.thread;
 
 datatype queue = Queue of
-  {jobs: (job * group option) IntGraph.T,             (*job dependency graph*)
-   cache: (task * (unit -> unit)) Queue.T option,     (*cache of ready tasks*)
-   groups: task list Inttab.table};                   (*active group members*)
+  {jobs: (group option * job) IntGraph.T,                         (*job dependency graph*)
+   cache: (task * group option * (unit -> bool)) Queue.T option,  (*cache of ready tasks*)
+   groups: task list Inttab.table};                               (*active group members*)
 
 fun make_queue jobs cache groups =
   Queue {jobs = jobs, cache = cache, groups = groups};
@@ -63,7 +57,7 @@
 
     fun add_dep (Task dep) G = IntGraph.add_edge_acyclic (dep, id) G
       handle IntGraph.UNDEF _ => G;
-    val jobs' = jobs |> IntGraph.new_node (id, (Job job, group)) |> fold add_dep deps;
+    val jobs' = jobs |> IntGraph.new_node (id, (group, Job job)) |> fold add_dep deps;
 
     val groups' =
       (case group of
@@ -77,36 +71,43 @@
       (case cache of
         SOME ready => ready
       | NONE =>
-          let val add = fn (id, ((Job job, _), ([], _))) => Queue.enqueue (Task id, job) | _ => I
+          let
+            fun add (id, ((group, Job job), ([], _))) = Queue.enqueue (Task id, group, job)
+              | add _ = I;
           in IntGraph.fold add jobs Queue.empty end);
   in
     if Queue.is_empty ready then (NONE, make_queue jobs (SOME ready) groups)
     else
       let
-        val (task as (Task id, _), ready') = Queue.dequeue ready;
-        val jobs' = IntGraph.map_node id (fn (_, group) => (Running thread, group)) jobs;
-      in (SOME task, make_queue jobs' (SOME ready') groups) end
+        val (result as (Task id, _, _), ready') = Queue.dequeue ready;
+        val jobs' = IntGraph.map_node id (fn (group, _) => (group, Running thread)) jobs;
+      in (SOME result, make_queue jobs' (SOME ready') groups) end
   end;
 
 fun finished (task as Task id) (Queue {jobs, groups, ...}) =
   let
     val groups' =
-      (case #2 (IntGraph.get_node jobs id) of
+      (case #1 (IntGraph.get_node jobs id) of
         NONE => groups
       | SOME (Group gid) => Inttab.remove_list (op =) (gid, task) groups);
     val jobs' = IntGraph.del_nodes [id] jobs;
   in make_queue jobs' NONE groups' end;
 
 
-(* interrupts *)
+(* interrupt *)
 
-fun interrupt (Queue {jobs, ...}) (Task id) =
+fun interrupt_task (Queue {jobs, ...}) (Task id) =
   (case IntGraph.get_node jobs id of
-    (Running thread, _) => (Thread.interrupt thread handle Thread _ => ())
+    (_, Running thread) => (Thread.interrupt thread handle Thread _ => ())
   | _ => ())
   handle IntGraph.UNDEF _ => ();
 
 fun interrupt_group (queue as Queue {groups, ...}) (Group gid) =
-  List.app (interrupt queue) (Inttab.lookup_list groups gid);
+  List.app (interrupt_task queue) (Inttab.lookup_list groups gid);
+
+fun interrupt_task_group (queue as Queue {jobs, ...}) (task as Task id) =
+  (case IntGraph.get_node jobs id of
+    (NONE, _) => interrupt_task queue task
+  | (SOME group, _) => interrupt_group queue group);
 
 end;