src/Pure/Concurrent/task_queue.ML
changeset 29365 5c5bc17d9135
parent 29342 23504001c4fb
child 31617 bb7b5a5942c7
equal deleted inserted replaced
29355:642cac18e155 29365:5c5bc17d9135
    11   val pri_of_task: task -> int
    11   val pri_of_task: task -> int
    12   val str_of_task: task -> string
    12   val str_of_task: task -> string
    13   type group
    13   type group
    14   val eq_group: group * group -> bool
    14   val eq_group: group * group -> bool
    15   val new_group: unit -> group
    15   val new_group: unit -> group
       
    16   val is_valid: group -> bool
    16   val invalidate_group: group -> unit
    17   val invalidate_group: group -> unit
    17   val str_of_group: group -> string
    18   val str_of_group: group -> string
    18   type queue
    19   type queue
    19   val empty: queue
    20   val empty: queue
    20   val is_empty: queue -> bool
    21   val is_empty: queue -> bool
    21   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue
    22   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue
       
    23   val extend: task -> (bool -> bool) -> queue -> queue option
    22   val depend: task list -> task -> queue -> queue
    24   val depend: task list -> task -> queue -> queue
    23   val dequeue: queue -> (task * group * (unit -> bool)) option * queue
    25   val dequeue: queue -> (task * group * (bool -> bool) list) option * queue
    24   val dequeue_towards: task list -> queue ->
    26   val dequeue_towards: task list -> queue ->
    25     (((task * group * (unit -> bool)) * task list) option * queue)
    27     (((task * group * (bool -> bool) list) * task list) option * queue)
    26   val interrupt: queue -> task -> unit
    28   val interrupt: queue -> task -> unit
    27   val interrupt_external: queue -> string -> unit
    29   val interrupt_external: queue -> string -> unit
    28   val cancel: queue -> group -> bool
    30   val cancel: queue -> group -> bool
    29   val cancel_all: queue -> group list
    31   val cancel_all: queue -> group list
    30   val finish: task -> queue -> queue
    32   val finish: task -> queue -> queue
    50 datatype group = Group of serial * bool ref;
    52 datatype group = Group of serial * bool ref;
    51 fun eq_group (Group (gid1, _), Group (gid2, _)) = gid1 = gid2;
    53 fun eq_group (Group (gid1, _), Group (gid2, _)) = gid1 = gid2;
    52 
    54 
    53 fun new_group () = Group (serial (), ref true);
    55 fun new_group () = Group (serial (), ref true);
    54 
    56 
       
    57 fun is_valid (Group (_, ref ok)) = ok;
    55 fun invalidate_group (Group (_, ok)) = ok := false;
    58 fun invalidate_group (Group (_, ok)) = ok := false;
    56 
    59 
    57 fun str_of_group (Group (i, ref ok)) =
    60 fun str_of_group (Group (i, ref ok)) =
    58   if ok then string_of_int i else enclose "(" ")" (string_of_int i);
    61   if ok then string_of_int i else enclose "(" ")" (string_of_int i);
    59 
    62 
    60 
    63 
    61 (* jobs *)
    64 (* jobs *)
    62 
    65 
    63 datatype job =
    66 datatype job =
    64   Job of bool -> bool |
    67   Job of (bool -> bool) list |
    65   Running of Thread.thread;
    68   Running of Thread.thread;
    66 
    69 
    67 type jobs = (group * job) Task_Graph.T;
    70 type jobs = (group * job) Task_Graph.T;
    68 
    71 
    69 fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
    72 fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
    70 fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
    73 fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
    71 fun map_job task f (jobs: jobs) = Task_Graph.map_node task (apsnd f) jobs;
    74 fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs;
    72 
    75 
    73 fun add_job task dep (jobs: jobs) =
    76 fun add_job task dep (jobs: jobs) =
    74   Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
    77   Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
    75 
    78 
    76 fun add_job_acyclic task dep (jobs: jobs) =
    79 fun add_job_acyclic task dep (jobs: jobs) =
    94 fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs}) =
    97 fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs}) =
    95   let
    98   let
    96     val task = new_task pri;
    99     val task = new_task pri;
    97     val groups' = Inttab.cons_list (gid, task) groups;
   100     val groups' = Inttab.cons_list (gid, task) groups;
    98     val jobs' = jobs
   101     val jobs' = jobs
    99       |> Task_Graph.new_node (task, (group, Job job)) |> fold (add_job task) deps;
   102       |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps;
   100   in (task, make_queue groups' jobs') end;
   103   in (task, make_queue groups' jobs') end;
       
   104 
       
   105 fun extend task job (Queue {groups, jobs}) =
       
   106   (case try (get_job jobs) task of
       
   107     SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs))
       
   108   | _ => NONE);
   101 
   109 
   102 fun depend deps task (Queue {groups, jobs}) =
   110 fun depend deps task (Queue {groups, jobs}) =
   103   make_queue groups (fold (add_job_acyclic task) deps jobs);
   111   make_queue groups (fold (add_job_acyclic task) deps jobs);
   104 
   112 
   105 
   113 
   107 
   115 
   108 local
   116 local
   109 
   117 
   110 fun dequeue_result NONE queue = (NONE, queue)
   118 fun dequeue_result NONE queue = (NONE, queue)
   111   | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) =
   119   | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) =
   112       (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs));
   120       (SOME result, make_queue groups (set_job task (Running (Thread.self ())) jobs));
   113 
   121 
   114 in
   122 in
   115 
   123 
   116 fun dequeue (queue as Queue {jobs, ...}) =
   124 fun dequeue (queue as Queue {jobs, ...}) =
   117   let
   125   let
   118     fun ready (task, ((group as Group (_, ref ok), Job job), ([], _))) =
   126     fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
   119           SOME (task, group, (fn () => job ok))
       
   120       | ready _ = NONE;
   127       | ready _ = NONE;
   121   in dequeue_result (Task_Graph.get_first ready jobs) queue end;
   128   in dequeue_result (Task_Graph.get_first ready jobs) queue end;
   122 
   129 
   123 fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
   130 fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
   124   let
   131   let
   125     val tasks' = filter (can (Task_Graph.get_node jobs)) tasks;
   132     val tasks' = filter (can (Task_Graph.get_node jobs)) tasks;
   126 
   133 
   127     fun ready task =
   134     fun ready task =
   128       (case Task_Graph.get_node jobs task of
   135       (case Task_Graph.get_node jobs task of
   129         (group as Group (_, ref ok), Job job) =>
   136         (group, Job list) =>
   130           if null (Task_Graph.imm_preds jobs task) then SOME (task, group, (fn () => job ok))
   137           if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list)
   131           else NONE
   138           else NONE
   132       | _ => NONE);
   139       | _ => NONE);
   133   in
   140   in
   134     (case dequeue_result (get_first ready (Task_Graph.all_preds jobs tasks')) queue of
   141     (case dequeue_result (get_first ready (Task_Graph.all_preds jobs tasks')) queue of
   135       (NONE, queue') => (NONE, queue')
   142       (NONE, queue') => (NONE, queue')