src/Pure/Concurrent/task_queue.ML
author haftmann
Thu Oct 22 13:48:06 2009 +0200 (2009-10-22)
changeset 33063 4d462963a7db
parent 32814 81897d30b97f
child 34277 7325a5e3587f
permissions -rw-r--r--
map_range (and map_index) combinator
     1 (*  Title:      Pure/Concurrent/task_queue.ML
     2     Author:     Makarius
     3 
     4 Ordered queue of grouped tasks.
     5 *)
     6 
     7 signature TASK_QUEUE =
     8 sig
     9   type task
    10   val new_task: int -> task
    11   val pri_of_task: task -> int
    12   val str_of_task: task -> string
    13   type group
    14   val new_group: group option -> group
    15   val group_id: group -> int
    16   val eq_group: group * group -> bool
    17   val cancel_group: group -> exn -> unit
    18   val is_canceled: group -> bool
    19   val group_status: group -> exn list
    20   val str_of_group: group -> string
    21   type queue
    22   val empty: queue
    23   val is_empty: queue -> bool
    24   val status: queue -> {ready: int, pending: int, running: int}
    25   val cancel: queue -> group -> bool
    26   val cancel_all: queue -> group list
    27   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
    28   val extend: task -> (bool -> bool) -> queue -> queue option
    29   val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
    30   val depend: task -> task list -> queue -> queue
    31   val dequeue_towards: Thread.thread -> task list -> queue ->
    32     (((task * group * (bool -> bool) list) option * task list) * queue)
    33   val finish: task -> queue -> bool * queue
    34 end;
    35 
    36 structure Task_Queue:> TASK_QUEUE =
    37 struct
    38 
    39 (* tasks *)
    40 
    41 datatype task = Task of int * serial;
    42 fun new_task pri = Task (pri, serial ());
    43 
    44 fun pri_of_task (Task (pri, _)) = pri;
    45 fun str_of_task (Task (_, i)) = string_of_int i;
    46 
    47 fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2);
    48 structure Task_Graph = Graph(type key = task val ord = task_ord);
    49 
    50 
    51 (* nested groups *)
    52 
    53 datatype group = Group of
    54  {parent: group option,
    55   id: serial,
    56   status: exn list Synchronized.var};
    57 
    58 fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status};
    59 
    60 fun new_group parent = make_group (parent, serial (), Synchronized.var "group" []);
    61 
    62 fun group_id (Group {id, ...}) = id;
    63 fun eq_group (group1, group2) = group_id group1 = group_id group2;
    64 
    65 fun group_ancestry (Group {parent, id, ...}) =
    66   id :: (case parent of NONE => [] | SOME group => group_ancestry group);
    67 
    68 
    69 (* group status *)
    70 
    71 fun cancel_group (Group {status, ...}) exn =
    72   Synchronized.change status
    73     (fn exns =>
    74       (case exn of
    75         Exn.Interrupt => if null exns then [exn] else exns
    76       | _ => exn :: exns));
    77 
    78 fun is_canceled (Group {parent, status, ...}) =
    79   not (null (Synchronized.value status)) orelse
    80     (case parent of NONE => false | SOME group => is_canceled group);
    81 
    82 fun group_status (Group {parent, status, ...}) =
    83   Synchronized.value status @
    84     (case parent of NONE => [] | SOME group => group_status group);
    85 
    86 fun str_of_group group =
    87   (is_canceled group ? enclose "(" ")") (string_of_int (group_id group));
    88 
    89 
    90 (* jobs *)
    91 
    92 datatype job =
    93   Job of (bool -> bool) list |
    94   Running of Thread.thread;
    95 
    96 type jobs = (group * job) Task_Graph.T;
    97 
    98 fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
    99 fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
   100 fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs;
   101 
   102 fun add_job task dep (jobs: jobs) =
   103   Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
   104 
   105 fun add_dep task dep (jobs: jobs) =
   106   if Task_Graph.is_edge jobs (task, dep) then
   107     raise Fail "Cyclic dependency of future tasks"
   108   else add_job task dep jobs;
   109 
   110 fun get_deps (jobs: jobs) task =
   111   Task_Graph.imm_preds jobs task handle Task_Graph.UNDEF _ => [];
   112 
   113 
   114 (* queue of grouped jobs *)
   115 
   116 datatype result = Unknown | Result of task | No_Result;
   117 
   118 datatype queue = Queue of
   119  {groups: task list Inttab.table,   (*groups with presently active members*)
   120   jobs: jobs,                       (*job dependency graph*)
   121   cache: result};                   (*last dequeue result*)
   122 
   123 fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache};
   124 
   125 val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
   126 fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
   127 
   128 
   129 (* queue status *)
   130 
   131 fun status (Queue {jobs, ...}) =
   132   let
   133     val (x, y, z) =
   134       Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z) =>
   135           (case job of
   136             Job _ => if null deps then (x + 1, y, z) else (x, y + 1, z)
   137           | Running _ => (x, y, z + 1)))
   138         jobs (0, 0, 0);
   139   in {ready = x, pending = y, running = z} end;
   140 
   141 
   142 (* cancel -- peers and sub-groups *)
   143 
   144 fun cancel (Queue {groups, jobs, ...}) group =
   145   let
   146     val _ = cancel_group group Exn.Interrupt;
   147     val tasks = Inttab.lookup_list groups (group_id group);
   148     val running =
   149       fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
   150     val _ = List.app SimpleThread.interrupt running;
   151   in null running end;
   152 
   153 fun cancel_all (Queue {jobs, ...}) =
   154   let
   155     fun cancel_job (group, job) (groups, running) =
   156       (cancel_group group Exn.Interrupt;
   157         (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
   158         | _ => (groups, running)));
   159     val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
   160     val _ = List.app SimpleThread.interrupt running;
   161   in groups end;
   162 
   163 
   164 (* enqueue *)
   165 
   166 fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
   167   let
   168     val task = new_task pri;
   169     val groups' = groups
   170       |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
   171     val jobs' = jobs
   172       |> Task_Graph.new_node (task, (group, Job [job]))
   173       |> fold (add_job task) deps
   174       |> fold (fold (add_job task) o get_deps jobs) deps;
   175     val minimal = null (get_deps jobs' task);
   176     val cache' =
   177       (case cache of
   178         Result last =>
   179           if task_ord (last, task) = LESS
   180           then cache else Unknown
   181       | _ => Unknown);
   182   in ((task, minimal), make_queue groups' jobs' cache') end;
   183 
   184 fun extend task job (Queue {groups, jobs, cache}) =
   185   (case try (get_job jobs) task of
   186     SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs) cache)
   187   | _ => NONE);
   188 
   189 
   190 (* dequeue *)
   191 
   192 fun dequeue thread (queue as Queue {groups, jobs, cache}) =
   193   let
   194     fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
   195       | ready _ = NONE;
   196     fun deq boundary =
   197       (case Task_Graph.get_first boundary ready jobs of
   198         NONE => (NONE, make_queue groups jobs No_Result)
   199       | SOME (result as (task, _, _)) =>
   200           let
   201             val jobs' = set_job task (Running thread) jobs;
   202             val cache' = Result task;
   203           in (SOME result, make_queue groups jobs' cache') end);
   204   in
   205     (case cache of
   206       Unknown => deq NONE
   207     | Result last => deq (SOME last)
   208     | No_Result => (NONE, queue))
   209   end;
   210 
   211 
   212 (* dequeue_towards -- adhoc dependencies *)
   213 
   214 fun depend task deps (Queue {groups, jobs, ...}) =
   215   make_queue groups (fold (add_dep task) deps jobs) Unknown;
   216 
   217 fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) =
   218   let
   219     fun ready task =
   220       (case Task_Graph.get_node jobs task of
   221         (group, Job list) =>
   222           if null (get_deps jobs task)
   223           then SOME (task, group, rev list)
   224           else NONE
   225       | _ => NONE);
   226     val tasks = filter (can (Task_Graph.get_node jobs)) deps;
   227     fun result (res as (task, _, _)) =
   228       let
   229         val jobs' = set_job task (Running thread) jobs;
   230         val cache' = Unknown;
   231       in ((SOME res, tasks), make_queue groups jobs' cache') end;
   232   in
   233     (case get_first ready tasks of
   234       SOME res => result res
   235     | NONE =>
   236         (case get_first (get_first ready o get_deps jobs) tasks of
   237           SOME res => result res
   238         | NONE => ((NONE, tasks), queue)))
   239   end;
   240 
   241 
   242 (* finish *)
   243 
   244 fun finish task (Queue {groups, jobs, cache}) =
   245   let
   246     val group = get_group jobs task;
   247     val groups' = groups
   248       |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group);
   249     val jobs' = Task_Graph.del_node task jobs;
   250     val maximal = null (Task_Graph.imm_succs jobs task);
   251     val cache' = if maximal then cache else Unknown;
   252   in (maximal, make_queue groups' jobs' cache') end;
   253 
   254 end;