src/Pure/Concurrent/task_queue.ML
changeset 34279 02936e77a07c
parent 34277 7325a5e3587f
child 34280 16bf3e9786a3
equal deleted inserted replaced
34278:228f27469139 34279:02936e77a07c
    20   val str_of_group: group -> string
    20   val str_of_group: group -> string
    21   type queue
    21   type queue
    22   val empty: queue
    22   val empty: queue
    23   val all_passive: queue -> bool
    23   val all_passive: queue -> bool
    24   val status: queue -> {ready: int, pending: int, running: int, passive: int}
    24   val status: queue -> {ready: int, pending: int, running: int, passive: int}
    25   val cancel: queue -> group -> bool
    25   val cancel: group -> queue -> bool * queue
    26   val cancel_all: queue -> group list
    26   val cancel_all: queue -> group list * queue
    27   val enqueue_passive: group -> queue -> task * queue
    27   val enqueue_passive: group -> queue -> task * queue
    28   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
    28   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
    29   val extend: task -> (bool -> bool) -> queue -> queue option
    29   val extend: task -> (bool -> bool) -> queue -> queue option
    30   val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
    30   val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
    31   val depend: task -> task list -> queue -> queue
    31   val depend: task -> task list -> queue -> queue
    79 
    79 
    80 fun is_canceled (Group {parent, status, ...}) =
    80 fun is_canceled (Group {parent, status, ...}) =
    81   not (null (Synchronized.value status)) orelse
    81   not (null (Synchronized.value status)) orelse
    82     (case parent of NONE => false | SOME group => is_canceled group);
    82     (case parent of NONE => false | SOME group => is_canceled group);
    83 
    83 
       
    84 fun is_ready deps group = null deps orelse is_canceled group;
       
    85 
    84 fun group_status (Group {parent, status, ...}) =
    86 fun group_status (Group {parent, status, ...}) =
    85   Synchronized.value status @
    87   Synchronized.value status @
    86     (case parent of NONE => [] | SOME group => group_status group);
    88     (case parent of NONE => [] | SOME group => group_status group);
    87 
    89 
    88 fun str_of_group group =
    90 fun str_of_group group =
   135 (* queue status *)
   137 (* queue status *)
   136 
   138 
   137 fun status (Queue {jobs, ...}) =
   139 fun status (Queue {jobs, ...}) =
   138   let
   140   let
   139     val (x, y, z, w) =
   141     val (x, y, z, w) =
   140       Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) =>
   142       Task_Graph.fold (fn (_, ((group, job), (deps, _))) => fn (x, y, z, w) =>
   141           (case job of
   143           (case job of
   142             Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w)
   144             Job _ => if is_ready deps group then (x + 1, y, z, w) else (x, y + 1, z, w)
   143           | Running _ => (x, y, z + 1, w)
   145           | Running _ => (x, y, z + 1, w)
   144           | Passive => (x, y, z, w + 1)))
   146           | Passive => (x, y, z, w + 1)))
   145         jobs (0, 0, 0, 0);
   147         jobs (0, 0, 0, 0);
   146   in {ready = x, pending = y, running = z, passive = w} end;
   148   in {ready = x, pending = y, running = z, passive = w} end;
   147 
   149 
   148 
   150 
   149 (* cancel -- peers and sub-groups *)
   151 (* cancel -- peers and sub-groups *)
   150 
   152 
   151 fun cancel (Queue {groups, jobs, ...}) group =
   153 fun cancel group (Queue {groups, jobs, ...}) =
   152   let
   154   let
   153     val _ = cancel_group group Exn.Interrupt;
   155     val _ = cancel_group group Exn.Interrupt;
   154     val tasks = Inttab.lookup_list groups (group_id group);
   156     val tasks = Inttab.lookup_list groups (group_id group);
   155     val running =
   157     val running =
   156       fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
   158       fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
   157     val _ = List.app SimpleThread.interrupt running;
   159     val _ = List.app SimpleThread.interrupt running;
   158   in null running end;
   160   in (null running, make_queue groups jobs Unknown) end;
   159 
   161 
   160 fun cancel_all (Queue {jobs, ...}) =
   162 fun cancel_all (Queue {groups, jobs, ...}) =
   161   let
   163   let
   162     fun cancel_job (group, job) (groups, running) =
   164     fun cancel_job (group, job) (groups, running) =
   163       (cancel_group group Exn.Interrupt;
   165       (cancel_group group Exn.Interrupt;
   164         (case job of
   166         (case job of
   165           Running t => (insert eq_group group groups, insert Thread.equal t running)
   167           Running t => (insert eq_group group groups, insert Thread.equal t running)
   166         | _ => (groups, running)));
   168         | _ => (groups, running)));
   167     val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
   169     val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
   168     val _ = List.app SimpleThread.interrupt running;
   170     val _ = List.app SimpleThread.interrupt running;
   169   in running_groups end;
   171   in (running_groups, make_queue groups jobs Unknown) end;
   170 
   172 
   171 
   173 
   172 (* enqueue *)
   174 (* enqueue *)
   173 
   175 
   174 fun enqueue_passive group (Queue {groups, jobs, cache}) =
   176 fun enqueue_passive group (Queue {groups, jobs, cache}) =
   205 
   207 
   206 (* dequeue *)
   208 (* dequeue *)
   207 
   209 
   208 fun dequeue thread (queue as Queue {groups, jobs, cache}) =
   210 fun dequeue thread (queue as Queue {groups, jobs, cache}) =
   209   let
   211   let
   210     fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
   212     fun ready (task, ((group, Job list), (deps, _))) =
       
   213           if is_ready deps group then SOME (task, group, rev list) else NONE
   211       | ready _ = NONE;
   214       | ready _ = NONE;
   212     fun deq boundary =
   215     fun deq boundary =
   213       (case Task_Graph.get_first boundary ready jobs of
   216       (case Task_Graph.get_first boundary ready jobs of
   214         NONE => (NONE, make_queue groups jobs No_Result)
   217         NONE => (NONE, make_queue groups jobs No_Result)
   215       | SOME (result as (task, _, _)) =>
   218       | SOME (result as (task, _, _)) =>
   233 fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) =
   236 fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) =
   234   let
   237   let
   235     fun ready task =
   238     fun ready task =
   236       (case Task_Graph.get_node jobs task of
   239       (case Task_Graph.get_node jobs task of
   237         (group, Job list) =>
   240         (group, Job list) =>
   238           if null (get_deps jobs task)
   241           if is_ready (get_deps jobs task) group
   239           then SOME (task, group, rev list)
   242           then SOME (task, group, rev list)
   240           else NONE
   243           else NONE
   241       | _ => NONE);
   244       | _ => NONE);
   242     val tasks = filter (can (Task_Graph.get_node jobs)) deps;
   245     val tasks = filter (can (Task_Graph.get_node jobs)) deps;
   243     fun result (res as (task, _, _)) =
   246     fun result (res as (task, _, _)) =