Ordered queue of grouped tasks.
authorwenzelm
Mon, 08 Sep 2008 16:08:18 +0200
changeset 28165 26bb048f463c
parent 28164 a6bdc9b31477
child 28166 43087721a66e
Ordered queue of grouped tasks. formerly in future.ML; added thread data; added group; more robust dequeue: change into running here; misc tuning;
src/Pure/Concurrent/task_queue.ML
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/Concurrent/task_queue.ML	Mon Sep 08 16:08:18 2008 +0200
@@ -0,0 +1,112 @@
+(*  Title:      Pure/Concurrent/task_queue.ML
+    ID:         $Id$
+    Author:     Makarius
+
+Ordered queue of grouped tasks.
+*)
+
+signature TASK_QUEUE =
+sig
+  eqtype task
+  val get_thread_data: unit -> task option
+  val set_thread_data: task option -> unit
+  eqtype group
+  val new_group: unit -> group
+  type queue
+  val empty: queue
+  val dequeue: Thread.thread -> queue -> (task * (unit -> unit)) option * queue
+  val enqueue: group option -> task list -> (unit -> unit) -> queue -> task * queue
+  val finished: task -> queue -> queue
+  val interrupt: queue -> task -> unit
+  val interrupt_group: queue -> group -> unit
+end;
+
+structure TaskQueue: TASK_QUEUE =
+struct
+
+(* identified tasks *)
+
+datatype task = Task of serial;
+
+local val tag = Universal.tag () : task option Universal.tag in
+  fun get_thread_data () = the_default NONE (Thread.getLocal tag);
+  fun set_thread_data x = Thread.setLocal (tag, x);
+end;
+
+datatype group = Group of serial;
+fun new_group () = Group (serial ());
+
+
+(* queue of dependent jobs *)
+
+datatype job =
+  Job of unit -> unit |
+  Running of Thread.thread;
+
+datatype queue = Queue of
+  {jobs: (job * group option) IntGraph.T,             (*job dependency graph*)
+   cache: (task * (unit -> unit)) Queue.T option,     (*cache of ready tasks*)
+   groups: task list Inttab.table};                   (*active group members*)
+
+fun make_queue jobs cache groups =
+  Queue {jobs = jobs, cache = cache, groups = groups};
+
+
+(* queue operations *)
+
+val empty = make_queue IntGraph.empty NONE Inttab.empty;
+
+fun enqueue group deps job (Queue {jobs, groups, ...}) =
+  let
+    val id = serial ();
+    val task = Task id;
+
+    fun add_dep (Task dep) G = IntGraph.add_edge_acyclic (dep, id) G
+      handle IntGraph.UNDEF _ => G;
+    val jobs' = jobs |> IntGraph.new_node (id, (Job job, group)) |> fold add_dep deps;
+
+    val groups' =
+      (case group of
+        NONE => groups
+      | SOME (Group gid) => Inttab.cons_list (gid, task) groups);
+  in (task, make_queue jobs' NONE groups') end;
+
+fun dequeue thread (queue as Queue {jobs, cache, groups}) =
+  let
+    val ready =
+      (case cache of
+        SOME ready => ready
+      | NONE =>
+          let val add = fn (id, ((Job job, _), ([], _))) => Queue.enqueue (Task id, job) | _ => I
+          in IntGraph.fold add jobs Queue.empty end);
+  in
+    if Queue.is_empty ready then (NONE, make_queue jobs (SOME ready) groups)
+    else
+      let
+        val (task as (Task id, _), ready') = Queue.dequeue ready;
+        val jobs' = IntGraph.map_node id (fn (_, group) => (Running thread, group)) jobs;
+      in (SOME task, make_queue jobs' (SOME ready') groups) end
+  end;
+
+fun finished (task as Task id) (Queue {jobs, groups, ...}) =
+  let
+    val groups' =
+      (case #2 (IntGraph.get_node jobs id) of
+        NONE => groups
+      | SOME (Group gid) => Inttab.remove_list (op =) (gid, task) groups);
+    val jobs' = IntGraph.del_nodes [id] jobs;
+  in make_queue jobs' NONE groups' end;
+
+
+(* interrupts *)
+
+fun interrupt (Queue {jobs, ...}) (Task id) =
+  (case IntGraph.get_node jobs id of
+    (Running thread, _) => (Thread.interrupt thread handle Thread _ => ())
+  | _ => ())
+  handle IntGraph.UNDEF _ => ();
+
+fun interrupt_group (queue as Queue {groups, ...}) (Group gid) =
+  List.app (interrupt queue) (Inttab.lookup_list groups gid);
+
+end;