src/Pure/Concurrent/task_queue.ML
changeset 32101 e25107ff4f56
parent 32099 5382c93108db
child 32190 4fc7a882b41e
equal deleted inserted replaced
32100:8ac6b1102f16 32101:e25107ff4f56
    11   val pri_of_task: task -> int
    11   val pri_of_task: task -> int
    12   val str_of_task: task -> string
    12   val str_of_task: task -> string
    13   type group
    13   type group
    14   val group_id: group -> int
    14   val group_id: group -> int
    15   val eq_group: group * group -> bool
    15   val eq_group: group * group -> bool
    16   val new_group: unit -> group
    16   val new_group: group option -> group
    17   val group_exns: group -> exn list
    17   val group_status: group -> exn list
    18   val str_of_group: group -> string
    18   val str_of_group: group -> string
    19   type queue
    19   type queue
    20   val empty: queue
    20   val empty: queue
    21   val is_empty: queue -> bool
    21   val is_empty: queue -> bool
    22   val status: queue -> {ready: int, pending: int, running: int}
    22   val status: queue -> {ready: int, pending: int, running: int}
    25   val dequeue: queue -> (task * group * (bool -> bool) list) option * queue
    25   val dequeue: queue -> (task * group * (bool -> bool) list) option * queue
    26   val dequeue_towards: task list -> queue ->
    26   val dequeue_towards: task list -> queue ->
    27     (((task * group * (bool -> bool) list) * task list) option * queue)
    27     (((task * group * (bool -> bool) list) * task list) option * queue)
    28   val interrupt: queue -> task -> unit
    28   val interrupt: queue -> task -> unit
    29   val interrupt_external: queue -> string -> unit
    29   val interrupt_external: queue -> string -> unit
       
    30   val is_canceled: group -> bool
    30   val cancel_group: group -> exn -> unit
    31   val cancel_group: group -> exn -> unit
    31   val cancel: queue -> group -> bool
    32   val cancel: queue -> group -> bool
    32   val cancel_all: queue -> group list
    33   val cancel_all: queue -> group list
    33   val finish: task -> queue -> queue
    34   val finish: task -> queue -> queue
    34 end;
    35 end;
    46 
    47 
    47 fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2);
    48 fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2);
    48 structure Task_Graph = Graph(type key = task val ord = task_ord);
    49 structure Task_Graph = Graph(type key = task val ord = task_ord);
    49 
    50 
    50 
    51 
    51 (* groups *)
    52 (* nested groups *)
    52 
    53 
    53 datatype group = Group of serial * exn list ref;
    54 datatype group = Group of
    54 
    55  {parent: group option,
    55 fun group_id (Group (gid, _)) = gid;
    56   id: serial,
    56 fun eq_group (Group (gid1, _), Group (gid2, _)) = gid1 = gid2;
    57   status: exn list ref};
    57 
    58 
    58 fun new_group () = Group (serial (), ref []);
    59 fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status};
    59 
    60 
    60 fun group_exns (Group (_, ref exns)) = exns;
    61 fun new_group parent = make_group (parent, serial (), ref []);
    61 
    62 
    62 fun str_of_group (Group (i, ref exns)) =
    63 fun group_id (Group {id, ...}) = id;
    63   if null exns then string_of_int i else enclose "(" ")" (string_of_int i);
    64 fun eq_group (group1, group2) = group_id group1 = group_id group2;
       
    65 
       
    66 fun group_ancestry (Group {parent, id, ...}) =
       
    67   id :: (case parent of NONE => [] | SOME group => group_ancestry group);
       
    68 
       
    69 
       
    70 fun cancel_group (Group {status, ...}) exn = CRITICAL (fn () =>
       
    71   (case exn of
       
    72     Exn.Interrupt => if null (! status) then status := [exn] else ()
       
    73   | _ => change status (cons exn)));
       
    74 
       
    75 fun group_status (Group {parent, status, ...}) = (*non-critical*)
       
    76   ! status @ (case parent of NONE => [] | SOME group => group_status group);
       
    77 
       
    78 fun is_canceled (Group {parent, status, ...}) = (*non-critical*)
       
    79   not (null (! status)) orelse (case parent of NONE => false | SOME group => is_canceled group);
       
    80 
       
    81 fun str_of_group group =
       
    82   (is_canceled group ? enclose "(" ")") (string_of_int (group_id group));
    64 
    83 
    65 
    84 
    66 (* jobs *)
    85 (* jobs *)
    67 
    86 
    68 datatype job =
    87 datatype job =
    92 
   111 
    93 val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
   112 val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
    94 fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
   113 fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
    95 
   114 
    96 
   115 
    97 (* status *)
   116 (* queue status *)
    98 
   117 
    99 fun status (Queue {jobs, ...}) =
   118 fun status (Queue {jobs, ...}) =
   100   let
   119   let
   101     val (x, y, z) =
   120     val (x, y, z) =
   102       Task_Graph.fold (fn (task, ((_, job), (deps, _))) => fn (x, y, z) =>
   121       Task_Graph.fold (fn (task, ((_, job), (deps, _))) => fn (x, y, z) =>
   105           | Running _ => (x, y, z + 1)))
   124           | Running _ => (x, y, z + 1)))
   106         jobs (0, 0, 0);
   125         jobs (0, 0, 0);
   107   in {ready = x, pending = y, running = z} end;
   126   in {ready = x, pending = y, running = z} end;
   108 
   127 
   109 
   128 
       
   129 (* cancel -- peers and sub-groups *)
       
   130 
       
   131 fun cancel (Queue {groups, jobs, ...}) group =
       
   132   let
       
   133     val _ = cancel_group group Exn.Interrupt;
       
   134     val tasks = Inttab.lookup_list groups (group_id group);
       
   135     val running =
       
   136       fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
       
   137     val _ = List.app SimpleThread.interrupt running;
       
   138   in null running end;
       
   139 
       
   140 fun cancel_all (Queue {jobs, ...}) =
       
   141   let
       
   142     fun cancel_job (group, job) (groups, running) =
       
   143       (cancel_group group Exn.Interrupt;
       
   144         (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
       
   145         | _ => (groups, running)));
       
   146     val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
       
   147     val _ = List.app SimpleThread.interrupt running;
       
   148   in groups end;
       
   149 
       
   150 
   110 (* enqueue *)
   151 (* enqueue *)
   111 
   152 
   112 fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, cache}) =
   153 fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
   113   let
   154   let
   114     val task = new_task pri;
   155     val task = new_task pri;
   115     val groups' = Inttab.cons_list (gid, task) groups;
   156     val groups' = groups
       
   157       |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
   116     val jobs' = jobs
   158     val jobs' = jobs
   117       |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps;
   159       |> Task_Graph.new_node (task, (group, Job [job]))
       
   160       |> fold (add_job task) deps;
   118     val cache' =
   161     val cache' =
   119       (case cache of
   162       (case cache of
   120         Result last =>
   163         Result last =>
   121           if task_ord (last, task) = LESS
   164           if task_ord (last, task) = LESS
   122           then cache else Unknown
   165           then cache else Unknown
   156 fun dequeue_towards deps (queue as Queue {groups, jobs, ...}) =
   199 fun dequeue_towards deps (queue as Queue {groups, jobs, ...}) =
   157   let
   200   let
   158     fun ready task =
   201     fun ready task =
   159       (case Task_Graph.get_node jobs task of
   202       (case Task_Graph.get_node jobs task of
   160         (group, Job list) =>
   203         (group, Job list) =>
   161           if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list)
   204           if null (Task_Graph.imm_preds jobs task)
       
   205           then SOME (task, group, rev list)
   162           else NONE
   206           else NONE
   163       | _ => NONE);
   207       | _ => NONE);
   164 
   208 
   165     val tasks = filter (can (Task_Graph.get_node jobs)) deps;
   209     val tasks = filter (can (Task_Graph.get_node jobs)) deps;
   166     fun result (res as (task, _, _)) =
   210     fun result (res as (task, _, _)) =
   179 
   223 
   180 
   224 
   181 (* sporadic interrupts *)
   225 (* sporadic interrupts *)
   182 
   226 
   183 fun interrupt (Queue {jobs, ...}) task =
   227 fun interrupt (Queue {jobs, ...}) task =
   184   (case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ());
   228   (case try (get_job jobs) task of
       
   229     SOME (Running thread) => SimpleThread.interrupt thread
       
   230   | _ => ());
   185 
   231 
   186 fun interrupt_external (queue as Queue {jobs, ...}) str =
   232 fun interrupt_external (queue as Queue {jobs, ...}) str =
   187   (case Int.fromString str of
   233   (case Int.fromString str of
   188     SOME i =>
   234     SOME i =>
   189       (case Task_Graph.get_first NONE
   235       (case Task_Graph.get_first NONE
   192   | NONE => ());
   238   | NONE => ());
   193 
   239 
   194 
   240 
   195 (* termination *)
   241 (* termination *)
   196 
   242 
   197 fun cancel_group (Group (_, r)) exn = CRITICAL (fn () =>
       
   198   (case exn of
       
   199     Exn.Interrupt => if null (! r) then r := [exn] else ()
       
   200   | _ => change r (cons exn)));
       
   201 
       
   202 fun cancel (Queue {groups, jobs, ...}) (group as Group (gid, _)) =
       
   203   let
       
   204     val _ = cancel_group group Exn.Interrupt;
       
   205     val tasks = Inttab.lookup_list groups gid;
       
   206     val running = fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
       
   207     val _ = List.app SimpleThread.interrupt running;
       
   208   in null running end;
       
   209 
       
   210 fun cancel_all (Queue {jobs, ...}) =
       
   211   let
       
   212     fun cancel_job (group, job) (groups, running) =
       
   213       (cancel_group group Exn.Interrupt;
       
   214         (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
       
   215         | _ => (groups, running)));
       
   216     val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
       
   217     val _ = List.app SimpleThread.interrupt running;
       
   218   in groups end;
       
   219 
       
   220 fun finish task (Queue {groups, jobs, cache}) =
   243 fun finish task (Queue {groups, jobs, cache}) =
   221   let
   244   let
   222     val Group (gid, _) = get_group jobs task;
   245     val group = get_group jobs task;
   223     val groups' = Inttab.remove_list (op =) (gid, task) groups;
   246     val groups' = groups
       
   247       |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group);
   224     val jobs' = Task_Graph.del_node task jobs;
   248     val jobs' = Task_Graph.del_node task jobs;
   225     val cache' =
   249     val cache' =
   226       if null (Task_Graph.imm_succs jobs task) then cache
   250       if null (Task_Graph.imm_succs jobs task) then cache
   227       else Unknown;
   251       else Unknown;
   228   in make_queue groups' jobs' cache' end;
   252   in make_queue groups' jobs' cache' end;