src/Pure/Concurrent/task_queue.ML
changeset 31617 bb7b5a5942c7
parent 29365 5c5bc17d9135
child 31632 fae680e35958
equal deleted inserted replaced
31616:63893e3a50a6 31617:bb7b5a5942c7
    21   val is_empty: queue -> bool
    21   val is_empty: queue -> bool
    22   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue
    22   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue
    23   val extend: task -> (bool -> bool) -> queue -> queue option
    23   val extend: task -> (bool -> bool) -> queue -> queue option
    24   val depend: task list -> task -> queue -> queue
    24   val depend: task list -> task -> queue -> queue
    25   val dequeue: queue -> (task * group * (bool -> bool) list) option * queue
    25   val dequeue: queue -> (task * group * (bool -> bool) list) option * queue
    26   val dequeue_towards: task list -> queue ->
       
    27     (((task * group * (bool -> bool) list) * task list) option * queue)
       
    28   val interrupt: queue -> task -> unit
    26   val interrupt: queue -> task -> unit
    29   val interrupt_external: queue -> string -> unit
    27   val interrupt_external: queue -> string -> unit
    30   val cancel: queue -> group -> bool
    28   val cancel: queue -> group -> bool
    31   val cancel_all: queue -> group list
    29   val cancel_all: queue -> group list
    32   val finish: task -> queue -> queue
    30   val finish: task -> queue -> queue
    80   Task_Graph.add_edge_acyclic (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
    78   Task_Graph.add_edge_acyclic (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
    81 
    79 
    82 
    80 
    83 (* queue of grouped jobs *)
    81 (* queue of grouped jobs *)
    84 
    82 
       
    83 datatype result = Unknown | Result of task | No_Result;
       
    84 
    85 datatype queue = Queue of
    85 datatype queue = Queue of
    86  {groups: task list Inttab.table,   (*groups with presently active members*)
    86  {groups: task list Inttab.table,   (*groups with presently active members*)
    87   jobs: jobs};                      (*job dependency graph*)
    87   jobs: jobs,                       (*job dependency graph*)
       
    88   cache: result};                   (*last dequeue result*)
    88 
    89 
    89 fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
    90 fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache};
    90 
    91 
    91 val empty = make_queue Inttab.empty Task_Graph.empty;
    92 val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
    92 fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
    93 fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
    93 
    94 
    94 
    95 
    95 (* enqueue *)
    96 (* enqueue *)
    96 
    97 
    97 fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs}) =
    98 fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, ...}) =
    98   let
    99   let
    99     val task = new_task pri;
   100     val task = new_task pri;
   100     val groups' = Inttab.cons_list (gid, task) groups;
   101     val groups' = Inttab.cons_list (gid, task) groups;
   101     val jobs' = jobs
   102     val jobs' = jobs
   102       |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps;
   103       |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps;
   103   in (task, make_queue groups' jobs') end;
   104   in (task, make_queue groups' jobs' Unknown) end;
   104 
   105 
   105 fun extend task job (Queue {groups, jobs}) =
   106 fun extend task job (Queue {groups, jobs, cache}) =
   106   (case try (get_job jobs) task of
   107   (case try (get_job jobs) task of
   107     SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs))
   108     SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs) cache)
   108   | _ => NONE);
   109   | _ => NONE);
   109 
   110 
   110 fun depend deps task (Queue {groups, jobs}) =
   111 fun depend deps task (Queue {groups, jobs, ...}) =
   111   make_queue groups (fold (add_job_acyclic task) deps jobs);
   112   make_queue groups (fold (add_job_acyclic task) deps jobs) Unknown;
   112 
   113 
   113 
   114 
   114 (* dequeue *)
   115 (* dequeue *)
   115 
   116 
   116 local
   117 fun dequeue (queue as Queue {groups, jobs, cache}) =
   117 
       
   118 fun dequeue_result NONE queue = (NONE, queue)
       
   119   | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) =
       
   120       (SOME result, make_queue groups (set_job task (Running (Thread.self ())) jobs));
       
   121 
       
   122 in
       
   123 
       
   124 fun dequeue (queue as Queue {jobs, ...}) =
       
   125   let
   118   let
   126     fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
   119     fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
   127       | ready _ = NONE;
   120       | ready _ = NONE;
   128   in dequeue_result (Task_Graph.get_first ready jobs) queue end;
   121     fun deq boundary =
   129 
   122       (case Task_Graph.get_first boundary ready jobs of
   130 fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
   123         NONE => (NONE, make_queue groups jobs No_Result)
   131   let
   124       | SOME (result as (task, _, _)) =>
   132     val tasks' = filter (can (Task_Graph.get_node jobs)) tasks;
   125           let
   133 
   126             val jobs' = set_job task (Running (Thread.self ())) jobs;
   134     fun ready task =
   127             val cache' = Result task;
   135       (case Task_Graph.get_node jobs task of
   128           in (SOME result, make_queue groups jobs' cache') end);
   136         (group, Job list) =>
       
   137           if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list)
       
   138           else NONE
       
   139       | _ => NONE);
       
   140   in
   129   in
   141     (case dequeue_result (get_first ready (Task_Graph.all_preds jobs tasks')) queue of
   130     (case cache of
   142       (NONE, queue') => (NONE, queue')
   131       Unknown => deq NONE
   143     | (SOME work, queue') => (SOME (work, tasks'), queue'))
   132     | Result last => deq (SOME last)
       
   133     | No_Result => (NONE, queue))
   144   end;
   134   end;
   145 
       
   146 end;
       
   147 
   135 
   148 
   136 
   149 (* sporadic interrupts *)
   137 (* sporadic interrupts *)
   150 
   138 
   151 fun interrupt (Queue {jobs, ...}) task =
   139 fun interrupt (Queue {jobs, ...}) task =
   152   (case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ());
   140   (case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ());
   153 
   141 
   154 fun interrupt_external (queue as Queue {jobs, ...}) str =
   142 fun interrupt_external (queue as Queue {jobs, ...}) str =
   155   (case Int.fromString str of
   143   (case Int.fromString str of
   156     SOME i =>
   144     SOME i =>
   157       (case Task_Graph.get_first
   145       (case Task_Graph.get_first NONE
   158           (fn (task as Task (_, j), _) => if i = j then SOME task else NONE) jobs
   146           (fn (task as Task (_, j), _) => if i = j then SOME task else NONE) jobs
   159         of SOME task => interrupt queue task | NONE => ())
   147         of SOME task => interrupt queue task | NONE => ())
   160   | NONE => ());
   148   | NONE => ());
   161 
   149 
   162 
   150 
   178         | _ => (groups, running)));
   166         | _ => (groups, running)));
   179     val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
   167     val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
   180     val _ = List.app SimpleThread.interrupt running;
   168     val _ = List.app SimpleThread.interrupt running;
   181   in groups end;
   169   in groups end;
   182 
   170 
   183 fun finish task (Queue {groups, jobs}) =
   171 fun finish task (Queue {groups, jobs, ...}) =
   184   let
   172   let
   185     val Group (gid, _) = get_group jobs task;
   173     val Group (gid, _) = get_group jobs task;
   186     val groups' = Inttab.remove_list (op =) (gid, task) groups;
   174     val groups' = Inttab.remove_list (op =) (gid, task) groups;
   187     val jobs' = Task_Graph.del_node task jobs;
   175     val jobs' = Task_Graph.del_node task jobs;
   188   in make_queue groups' jobs' end;
   176   in make_queue groups' jobs' Unknown end;
   189 
   177 
   190 end;
   178 end;