src/Pure/Concurrent/task_queue.ML
changeset 28185 0f20cbce4935
parent 28184 5ed5cb73a2e9
child 28190 0a2434cf38c9
equal deleted inserted replaced
28184:5ed5cb73a2e9 28185:0f20cbce4935
    13   val str_of_task: task -> string
    13   val str_of_task: task -> string
    14   val str_of_group: group -> string
    14   val str_of_group: group -> string
    15   type queue
    15   type queue
    16   val empty: queue
    16   val empty: queue
    17   val enqueue: group -> task list -> (bool -> bool) -> queue -> task * queue
    17   val enqueue: group -> task list -> (bool -> bool) -> queue -> task * queue
    18   val dequeue: Thread.thread -> queue -> (task * group * (unit -> bool)) option * queue
    18   val depend: task list -> task -> queue -> queue
    19   val cancel: group -> queue -> Thread.thread list * queue
    19   val dequeue: queue -> (task * group * (unit -> bool)) option * queue
       
    20   val dequeue_towards: task list -> queue -> (task * group * (unit -> bool)) option * queue
       
    21   val cancel: group -> queue -> bool * queue
    20   val finish: task -> queue -> queue
    22   val finish: task -> queue -> queue
    21 end;
    23 end;
    22 
    24 
    23 structure TaskQueue: TASK_QUEUE =
    25 structure TaskQueue: TASK_QUEUE =
    24 struct
    26 struct
    40   Job of bool * (bool -> bool) |
    42   Job of bool * (bool -> bool) |
    41   Running of Thread.thread;
    43   Running of Thread.thread;
    42 
    44 
    43 type jobs = (group * job) IntGraph.T;
    45 type jobs = (group * job) IntGraph.T;
    44 
    46 
       
    47 fun defined_job (jobs: jobs) (Task id) = can (IntGraph.get_node jobs) id;
    45 fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id);
    48 fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id);
    46 fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id);
    49 fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id);
    47 fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs;
    50 fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs;
       
    51 fun add_job (Task id) (Task dep) (jobs: jobs) =
       
    52   IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
    48 
    53 
    49 
    54 
    50 (* queue of grouped jobs *)
    55 (* queue of grouped jobs *)
    51 
    56 
    52 datatype queue = Queue of
    57 datatype queue = Queue of
    55 
    60 
    56 fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
    61 fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
    57 val empty = make_queue Inttab.empty IntGraph.empty;
    62 val empty = make_queue Inttab.empty IntGraph.empty;
    58 
    63 
    59 
    64 
    60 (* queue operations *)
    65 (* enqueue *)
    61 
    66 
    62 fun enqueue (group as Group gid) deps job (Queue {groups, jobs}) =
    67 fun enqueue (group as Group gid) deps job (Queue {groups, jobs}) =
    63   let
    68   let
    64     val id = serial ();
    69     val id = serial ();
    65     val task = Task id;
    70     val task = Task id;
    66     val groups' = Inttab.cons_list (gid, task) groups;
    71     val groups' = Inttab.cons_list (gid, task) groups;
    67 
    72     val jobs' = jobs
    68     fun add_dep (Task dep) G = IntGraph.add_edge_acyclic (dep, id) G
    73       |> IntGraph.new_node (id, (group, Job (true, job))) |> fold (add_job task) deps;
    69       handle IntGraph.UNDEF _ => G;
       
    70     val jobs' = jobs |> IntGraph.new_node (id, (group, Job (true, job))) |> fold add_dep deps;
       
    71   in (task, make_queue groups' jobs') end;
    74   in (task, make_queue groups' jobs') end;
    72 
    75 
    73 fun dequeue thread (queue as Queue {groups, jobs}) =
    76 fun depend deps task (Queue {groups, jobs}) =
       
    77   make_queue groups (fold (add_job task) deps jobs);
       
    78 
       
    79 
       
    80 (* dequeue *)
       
    81 
       
    82 fun dequeue_if P (queue as Queue {groups, jobs}) =
    74   let
    83   let
    75     fun ready (id, ((group, Job (ok, job)), ([], _))) = SOME (Task id, group, (fn () => job ok))
    84     fun ready (id, ((group, Job (ok, job)), ([], _))) =
       
    85           if P id then SOME (Task id, group, (fn () => job ok)) else NONE
    76       | ready _ = NONE;
    86       | ready _ = NONE;
    77   in
    87   in
    78     (case IntGraph.get_first ready jobs of
    88     (case IntGraph.get_first ready jobs of
    79       NONE => (NONE, queue)
    89       NONE => (NONE, queue)
    80     | SOME result =>
    90     | SOME result =>
    81         let val jobs' = map_job (#1 result) (K (Running thread)) jobs
    91         let val jobs' = map_job (#1 result) (K (Running (Thread.self ()))) jobs
    82         in (SOME result, make_queue groups jobs') end)
    92         in (SOME result, make_queue groups jobs') end)
    83   end;
    93   end;
       
    94 
       
    95 val dequeue = dequeue_if (K true);
       
    96 
       
    97 fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
       
    98   let val ids = tasks
       
    99     |> map_filter (fn task as Task id => if defined_job jobs task then SOME id else NONE)
       
   100   in dequeue_if (member (op =) (IntGraph.all_preds jobs ids)) queue end;
    84 
   101 
    85 
   102 
    86 (* termination *)
   103 (* termination *)
    87 
   104 
    88 fun cancel (group as Group gid) (Queue {groups, jobs}) =
   105 fun cancel (group as Group gid) (Queue {groups, jobs}) =
    91     val running = fold (get_job jobs #> (fn Running thread => cons thread | _ => I)) tasks [];
   108     val running = fold (get_job jobs #> (fn Running thread => cons thread | _ => I)) tasks [];
    92     val jobs' = fold (fn task =>
   109     val jobs' = fold (fn task =>
    93         (case get_job jobs task of
   110         (case get_job jobs task of
    94           Job (true, job) => map_job task (K (Job (false, job)))
   111           Job (true, job) => map_job task (K (Job (false, job)))
    95         | _ => I)) tasks jobs;
   112         | _ => I)) tasks jobs;
    96   in (running, make_queue groups jobs') end;
   113     val _ = List.app (fn thread => Thread.interrupt thread handle Thread _ => ()) running;
       
   114   in (null running, make_queue groups jobs') end;
    97 
   115 
    98 fun finish (task as Task id) (Queue {groups, jobs}) =
   116 fun finish (task as Task id) (Queue {groups, jobs}) =
    99   let
   117   let
   100     val Group gid = get_group jobs task;
   118     val Group gid = get_group jobs task;
   101     val groups' = Inttab.remove_list (op =) (gid, task) groups;
   119     val groups' = Inttab.remove_list (op =) (gid, task) groups;