src/Pure/Concurrent/task_queue.ML
author wenzelm
Thu Sep 09 17:20:27 2010 +0200 (2010-09-09 ago)
changeset 39232 69c6d3e87660
parent 37854 047c96f41455
child 39243 307e3d07d19f
permissions -rw-r--r--
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
     1 (*  Title:      Pure/Concurrent/task_queue.ML
     2     Author:     Makarius
     3 
     4 Ordered queue of grouped tasks.
     5 *)
     6 
     7 signature TASK_QUEUE =
     8 sig
     9   type task
    10   val dummy_task: task
    11   val pri_of_task: task -> int
    12   val str_of_task: task -> string
    13   type group
    14   val new_group: group option -> group
    15   val group_id: group -> int
    16   val eq_group: group * group -> bool
    17   val cancel_group: group -> exn -> unit
    18   val is_canceled: group -> bool
    19   val group_status: group -> exn list
    20   val str_of_group: group -> string
    21   type queue
    22   val empty: queue
    23   val all_passive: queue -> bool
    24   val status: queue -> {ready: int, pending: int, running: int, passive: int}
    25   val cancel: queue -> group -> bool
    26   val cancel_all: queue -> group list
    27   val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
    28   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
    29   val extend: task -> (bool -> bool) -> queue -> queue option
    30   val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
    31   val depend: task -> task list -> queue -> queue
    32   val dequeue_towards: Thread.thread -> task list -> queue ->
    33     (((task * group * (bool -> bool) list) option * task list) * queue)
    34   val finish: task -> queue -> bool * queue
    35 end;
    36 
    37 structure Task_Queue: TASK_QUEUE =
    38 struct
    39 
    40 (* tasks *)
    41 
    42 abstype task = Task of int option * serial
    43 with
    44 
    45 val dummy_task = Task (NONE, ~1);
    46 fun new_task pri = Task (pri, serial ());
    47 
    48 fun pri_of_task (Task (pri, _)) = the_default 0 pri;
    49 fun str_of_task (Task (_, i)) = string_of_int i;
    50 
    51 fun task_ord (Task t1, Task t2) = prod_ord (rev_order o option_ord int_ord) int_ord (t1, t2);
    52 val eq_task = is_equal o task_ord;
    53 
    54 end;
    55 
    56 structure Task_Graph = Graph(type key = task val ord = task_ord);
    57 
    58 
    59 (* nested groups *)
    60 
    61 abstype group = Group of
    62  {parent: group option,
    63   id: serial,
    64   status: exn list Synchronized.var}
    65 with
    66 
    67 fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status};
    68 
    69 fun new_group parent = make_group (parent, serial (), Synchronized.var "group" []);
    70 
    71 fun group_id (Group {id, ...}) = id;
    72 fun eq_group (group1, group2) = group_id group1 = group_id group2;
    73 
    74 fun group_ancestry (Group {parent, id, ...}) =
    75   id :: (case parent of NONE => [] | SOME group => group_ancestry group);
    76 
    77 
    78 (* group status *)
    79 
    80 fun cancel_group (Group {status, ...}) exn =
    81   Synchronized.change status
    82     (fn exns =>
    83       if not (Exn.is_interrupt exn) orelse null exns then exn :: exns
    84       else exns);
    85 
    86 fun is_canceled (Group {parent, status, ...}) =
    87   not (null (Synchronized.value status)) orelse
    88     (case parent of NONE => false | SOME group => is_canceled group);
    89 
    90 fun group_status (Group {parent, status, ...}) =
    91   Synchronized.value status @
    92     (case parent of NONE => [] | SOME group => group_status group);
    93 
    94 fun str_of_group group =
    95   (is_canceled group ? enclose "(" ")") (string_of_int (group_id group));
    96 
    97 end;
    98 
    99 
   100 (* jobs *)
   101 
   102 datatype job =
   103   Job of (bool -> bool) list |
   104   Running of Thread.thread |
   105   Passive of unit -> bool;
   106 
   107 type jobs = (group * job) Task_Graph.T;
   108 
   109 fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
   110 fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
   111 fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs;
   112 
   113 fun add_job task dep (jobs: jobs) =
   114   Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
   115 
   116 fun add_dep task dep (jobs: jobs) =
   117   if Task_Graph.is_edge jobs (task, dep) then
   118     raise Fail "Cyclic dependency of future tasks"
   119   else add_job task dep jobs;
   120 
   121 fun get_deps (jobs: jobs) task =
   122   Task_Graph.imm_preds jobs task handle Task_Graph.UNDEF _ => [];
   123 
   124 
   125 (* queue of grouped jobs *)
   126 
   127 datatype queue = Queue of
   128  {groups: task list Inttab.table,   (*groups with presently active members*)
   129   jobs: jobs};                      (*job dependency graph*)
   130 
   131 fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
   132 
   133 val empty = make_queue Inttab.empty Task_Graph.empty;
   134 
   135 
   136 (* job status *)
   137 
   138 fun ready_job task ((group, Job list), ([], _)) = SOME (task, group, rev list)
   139   | ready_job task ((group, Passive abort), ([], _)) =
   140       if is_canceled group then SOME (task, group, [fn _ => abort ()])
   141       else NONE
   142   | ready_job _ _ = NONE;
   143 
   144 fun active_job (_, Job _) = SOME ()
   145   | active_job (_, Running _) = SOME ()
   146   | active_job (group, Passive _) = if is_canceled group then SOME () else NONE;
   147 
   148 fun all_passive (Queue {jobs, ...}) =
   149   is_none (Task_Graph.get_first (active_job o #1 o #2) jobs);
   150 
   151 
   152 (* queue status *)
   153 
   154 fun status (Queue {jobs, ...}) =
   155   let
   156     val (x, y, z, w) =
   157       Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) =>
   158           (case job of
   159             Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w)
   160           | Running _ => (x, y, z + 1, w)
   161           | Passive _ => (x, y, z, w + 1)))
   162         jobs (0, 0, 0, 0);
   163   in {ready = x, pending = y, running = z, passive = w} end;
   164 
   165 
   166 (* cancel -- peers and sub-groups *)
   167 
   168 fun cancel (Queue {groups, jobs}) group =
   169   let
   170     val _ = cancel_group group Exn.Interrupt;
   171     val tasks = Inttab.lookup_list groups (group_id group);
   172     val running =
   173       fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
   174     val _ = List.app Simple_Thread.interrupt running;
   175   in null running end;
   176 
   177 fun cancel_all (Queue {jobs, ...}) =
   178   let
   179     fun cancel_job (group, job) (groups, running) =
   180       (cancel_group group Exn.Interrupt;
   181         (case job of
   182           Running t => (insert eq_group group groups, insert Thread.equal t running)
   183         | _ => (groups, running)));
   184     val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
   185     val _ = List.app Simple_Thread.interrupt running;
   186   in running_groups end;
   187 
   188 
   189 (* enqueue *)
   190 
   191 fun enqueue_passive group abort (Queue {groups, jobs}) =
   192   let
   193     val task = new_task NONE;
   194     val groups' = groups
   195       |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
   196     val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive abort));
   197   in (task, make_queue groups' jobs') end;
   198 
   199 fun enqueue group deps pri job (Queue {groups, jobs}) =
   200   let
   201     val task = new_task (SOME pri);
   202     val groups' = groups
   203       |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
   204     val jobs' = jobs
   205       |> Task_Graph.new_node (task, (group, Job [job]))
   206       |> fold (add_job task) deps
   207       |> fold (fold (add_job task) o get_deps jobs) deps;
   208     val minimal = null (get_deps jobs' task);
   209   in ((task, minimal), make_queue groups' jobs') end;
   210 
   211 fun extend task job (Queue {groups, jobs}) =
   212   (case try (get_job jobs) task of
   213     SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs))
   214   | _ => NONE);
   215 
   216 
   217 (* dequeue *)
   218 
   219 fun dequeue thread (queue as Queue {groups, jobs}) =
   220   (case Task_Graph.get_first (uncurry ready_job) jobs of
   221     NONE => (NONE, queue)
   222   | SOME (result as (task, _, _)) =>
   223       let val jobs' = set_job task (Running thread) jobs
   224       in (SOME result, make_queue groups jobs') end);
   225 
   226 
   227 (* dequeue_towards -- adhoc dependencies *)
   228 
   229 fun depend task deps (Queue {groups, jobs}) =
   230   make_queue groups (fold (add_dep task) deps jobs);
   231 
   232 fun dequeue_towards thread deps (queue as Queue {groups, jobs}) =
   233   let
   234     fun ready task = ready_job task (Task_Graph.get_entry jobs task);
   235     val tasks = filter (can (Task_Graph.get_node jobs)) deps;
   236     fun result (res as (task, _, _)) =
   237       let val jobs' = set_job task (Running thread) jobs
   238       in ((SOME res, tasks), make_queue groups jobs') end;
   239   in
   240     (case get_first ready tasks of
   241       SOME res => result res
   242     | NONE =>
   243         (case get_first (get_first ready o get_deps jobs) tasks of
   244           SOME res => result res
   245         | NONE => ((NONE, tasks), queue)))
   246   end;
   247 
   248 
   249 (* finish *)
   250 
   251 fun finish task (Queue {groups, jobs}) =
   252   let
   253     val group = get_group jobs task;
   254     val groups' = groups
   255       |> fold (fn gid => Inttab.remove_list eq_task (gid, task)) (group_ancestry group);
   256     val jobs' = Task_Graph.del_node task jobs;
   257     val maximal = null (Task_Graph.imm_succs jobs task);
   258   in (maximal, make_queue groups' jobs') end;
   259 
   260 end;