src/Pure/Concurrent/task_queue.ML
author haftmann
Thu Oct 22 13:48:06 2009 +0200 (2009-10-22)
changeset 33063 4d462963a7db
parent 32814 81897d30b97f
child 34277 7325a5e3587f
permissions -rw-r--r--
map_range (and map_index) combinator
wenzelm@28165
     1
(*  Title:      Pure/Concurrent/task_queue.ML
wenzelm@28165
     2
    Author:     Makarius
wenzelm@28165
     3
wenzelm@28165
     4
Ordered queue of grouped tasks.
wenzelm@28165
     5
*)
wenzelm@28165
     6
wenzelm@28165
     7
signature TASK_QUEUE =
wenzelm@28165
     8
sig
wenzelm@29340
     9
  type task
wenzelm@29121
    10
  val new_task: int -> task
wenzelm@29121
    11
  val pri_of_task: task -> int
wenzelm@28196
    12
  val str_of_task: task -> string
wenzelm@29340
    13
  type group
wenzelm@32221
    14
  val new_group: group option -> group
wenzelm@32052
    15
  val group_id: group -> int
wenzelm@29340
    16
  val eq_group: group * group -> bool
wenzelm@32221
    17
  val cancel_group: group -> exn -> unit
wenzelm@32221
    18
  val is_canceled: group -> bool
wenzelm@32101
    19
  val group_status: group -> exn list
wenzelm@28179
    20
  val str_of_group: group -> string
wenzelm@28165
    21
  type queue
wenzelm@28165
    22
  val empty: queue
wenzelm@28204
    23
  val is_empty: queue -> bool
wenzelm@32052
    24
  val status: queue -> {ready: int, pending: int, running: int}
wenzelm@32221
    25
  val cancel: queue -> group -> bool
wenzelm@32221
    26
  val cancel_all: queue -> group list
wenzelm@32218
    27
  val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
wenzelm@29365
    28
  val extend: task -> (bool -> bool) -> queue -> queue option
wenzelm@32249
    29
  val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
wenzelm@32814
    30
  val depend: task -> task list -> queue -> queue
wenzelm@32249
    31
  val dequeue_towards: Thread.thread -> task list -> queue ->
wenzelm@32224
    32
    (((task * group * (bool -> bool) list) option * task list) * queue)
wenzelm@32221
    33
  val finish: task -> queue -> bool * queue
wenzelm@28165
    34
end;
wenzelm@28165
    35
wenzelm@29340
    36
structure Task_Queue:> TASK_QUEUE =
wenzelm@28165
    37
struct
wenzelm@28165
    38
wenzelm@29121
    39
(* tasks *)
wenzelm@29121
    40
wenzelm@29121
    41
datatype task = Task of int * serial;
wenzelm@29121
    42
fun new_task pri = Task (pri, serial ());
wenzelm@28165
    43
wenzelm@29121
    44
fun pri_of_task (Task (pri, _)) = pri;
wenzelm@29121
    45
fun str_of_task (Task (_, i)) = string_of_int i;
wenzelm@28998
    46
wenzelm@29121
    47
fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2);
wenzelm@31971
    48
structure Task_Graph = Graph(type key = task val ord = task_ord);
wenzelm@28165
    49
wenzelm@28998
    50
wenzelm@32101
    51
(* nested groups *)
wenzelm@32101
    52
wenzelm@32101
    53
datatype group = Group of
wenzelm@32101
    54
 {parent: group option,
wenzelm@32101
    55
  id: serial,
wenzelm@32251
    56
  status: exn list Synchronized.var};
wenzelm@29121
    57
wenzelm@32101
    58
fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status};
wenzelm@32052
    59
wenzelm@32251
    60
fun new_group parent = make_group (parent, serial (), Synchronized.var "group" []);
wenzelm@32101
    61
wenzelm@32101
    62
fun group_id (Group {id, ...}) = id;
wenzelm@32101
    63
fun eq_group (group1, group2) = group_id group1 = group_id group2;
wenzelm@28551
    64
wenzelm@32101
    65
fun group_ancestry (Group {parent, id, ...}) =
wenzelm@32101
    66
  id :: (case parent of NONE => [] | SOME group => group_ancestry group);
wenzelm@32101
    67
wenzelm@32101
    68
wenzelm@32221
    69
(* group status *)
wenzelm@32221
    70
wenzelm@32251
    71
fun cancel_group (Group {status, ...}) exn =
wenzelm@32251
    72
  Synchronized.change status
wenzelm@32251
    73
    (fn exns =>
wenzelm@32251
    74
      (case exn of
wenzelm@32251
    75
        Exn.Interrupt => if null exns then [exn] else exns
wenzelm@32251
    76
      | _ => exn :: exns));
wenzelm@29121
    77
wenzelm@32251
    78
fun is_canceled (Group {parent, status, ...}) =
wenzelm@32251
    79
  not (null (Synchronized.value status)) orelse
wenzelm@32251
    80
    (case parent of NONE => false | SOME group => is_canceled group);
wenzelm@32221
    81
wenzelm@32251
    82
fun group_status (Group {parent, status, ...}) =
wenzelm@32251
    83
  Synchronized.value status @
wenzelm@32251
    84
    (case parent of NONE => [] | SOME group => group_status group);
wenzelm@28165
    85
wenzelm@32101
    86
fun str_of_group group =
wenzelm@32101
    87
  (is_canceled group ? enclose "(" ")") (string_of_int (group_id group));
wenzelm@28179
    88
wenzelm@28165
    89
wenzelm@28176
    90
(* jobs *)
wenzelm@28165
    91
wenzelm@28165
    92
datatype job =
wenzelm@29365
    93
  Job of (bool -> bool) list |
wenzelm@28165
    94
  Running of Thread.thread;
wenzelm@28165
    95
wenzelm@29121
    96
type jobs = (group * job) Task_Graph.T;
wenzelm@28176
    97
wenzelm@29121
    98
fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
wenzelm@29121
    99
fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
wenzelm@29365
   100
fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs;
wenzelm@28202
   101
wenzelm@32250
   102
fun add_job task dep (jobs: jobs) =
wenzelm@32250
   103
  Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
wenzelm@32250
   104
wenzelm@32814
   105
fun add_dep task dep (jobs: jobs) =
wenzelm@32814
   106
  if Task_Graph.is_edge jobs (task, dep) then
wenzelm@32814
   107
    raise Fail "Cyclic dependency of future tasks"
wenzelm@32814
   108
  else add_job task dep jobs;
wenzelm@32814
   109
wenzelm@32250
   110
fun get_deps (jobs: jobs) task =
wenzelm@32250
   111
  Task_Graph.imm_preds jobs task handle Task_Graph.UNDEF _ => [];
wenzelm@32250
   112
wenzelm@28176
   113
wenzelm@28176
   114
(* queue of grouped jobs *)
wenzelm@28176
   115
wenzelm@31617
   116
datatype result = Unknown | Result of task | No_Result;
wenzelm@31617
   117
wenzelm@28165
   118
datatype queue = Queue of
wenzelm@28184
   119
 {groups: task list Inttab.table,   (*groups with presently active members*)
wenzelm@31617
   120
  jobs: jobs,                       (*job dependency graph*)
wenzelm@31617
   121
  cache: result};                   (*last dequeue result*)
wenzelm@28165
   122
wenzelm@31617
   123
fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache};
wenzelm@28204
   124
wenzelm@31617
   125
val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
wenzelm@29121
   126
fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
wenzelm@28165
   127
wenzelm@28165
   128
wenzelm@32101
   129
(* queue status *)
wenzelm@32052
   130
wenzelm@32052
   131
fun status (Queue {jobs, ...}) =
wenzelm@32052
   132
  let
wenzelm@32052
   133
    val (x, y, z) =
wenzelm@32784
   134
      Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z) =>
wenzelm@32052
   135
          (case job of
wenzelm@32052
   136
            Job _ => if null deps then (x + 1, y, z) else (x, y + 1, z)
wenzelm@32052
   137
          | Running _ => (x, y, z + 1)))
wenzelm@32052
   138
        jobs (0, 0, 0);
wenzelm@32052
   139
  in {ready = x, pending = y, running = z} end;
wenzelm@32052
   140
wenzelm@32052
   141
wenzelm@32101
   142
(* cancel -- peers and sub-groups *)
wenzelm@32101
   143
wenzelm@32101
   144
fun cancel (Queue {groups, jobs, ...}) group =
wenzelm@32101
   145
  let
wenzelm@32101
   146
    val _ = cancel_group group Exn.Interrupt;
wenzelm@32101
   147
    val tasks = Inttab.lookup_list groups (group_id group);
wenzelm@32101
   148
    val running =
wenzelm@32101
   149
      fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
wenzelm@32101
   150
    val _ = List.app SimpleThread.interrupt running;
wenzelm@32101
   151
  in null running end;
wenzelm@32101
   152
wenzelm@32101
   153
fun cancel_all (Queue {jobs, ...}) =
wenzelm@32101
   154
  let
wenzelm@32101
   155
    fun cancel_job (group, job) (groups, running) =
wenzelm@32101
   156
      (cancel_group group Exn.Interrupt;
wenzelm@32101
   157
        (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
wenzelm@32101
   158
        | _ => (groups, running)));
wenzelm@32101
   159
    val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
wenzelm@32101
   160
    val _ = List.app SimpleThread.interrupt running;
wenzelm@32101
   161
  in groups end;
wenzelm@32101
   162
wenzelm@32101
   163
wenzelm@28185
   164
(* enqueue *)
wenzelm@28165
   165
wenzelm@32101
   166
fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
wenzelm@28165
   167
  let
wenzelm@29121
   168
    val task = new_task pri;
wenzelm@32101
   169
    val groups' = groups
wenzelm@32101
   170
      |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
wenzelm@28185
   171
    val jobs' = jobs
wenzelm@32101
   172
      |> Task_Graph.new_node (task, (group, Job [job]))
wenzelm@32190
   173
      |> fold (add_job task) deps
wenzelm@32190
   174
      |> fold (fold (add_job task) o get_deps jobs) deps;
wenzelm@32218
   175
    val minimal = null (get_deps jobs' task);
wenzelm@31632
   176
    val cache' =
wenzelm@31632
   177
      (case cache of
wenzelm@31632
   178
        Result last =>
wenzelm@31632
   179
          if task_ord (last, task) = LESS
wenzelm@31632
   180
          then cache else Unknown
wenzelm@31632
   181
      | _ => Unknown);
wenzelm@32218
   182
  in ((task, minimal), make_queue groups' jobs' cache') end;
wenzelm@28165
   183
wenzelm@31617
   184
fun extend task job (Queue {groups, jobs, cache}) =
wenzelm@29365
   185
  (case try (get_job jobs) task of
wenzelm@31617
   186
    SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs) cache)
wenzelm@29365
   187
  | _ => NONE);
wenzelm@29365
   188
wenzelm@28185
   189
wenzelm@28185
   190
(* dequeue *)
wenzelm@28185
   191
wenzelm@32249
   192
fun dequeue thread (queue as Queue {groups, jobs, cache}) =
wenzelm@29121
   193
  let
wenzelm@29365
   194
    fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
wenzelm@29121
   195
      | ready _ = NONE;
wenzelm@31617
   196
    fun deq boundary =
wenzelm@31617
   197
      (case Task_Graph.get_first boundary ready jobs of
wenzelm@31617
   198
        NONE => (NONE, make_queue groups jobs No_Result)
wenzelm@31617
   199
      | SOME (result as (task, _, _)) =>
wenzelm@31617
   200
          let
wenzelm@32249
   201
            val jobs' = set_job task (Running thread) jobs;
wenzelm@31617
   202
            val cache' = Result task;
wenzelm@31617
   203
          in (SOME result, make_queue groups jobs' cache') end);
wenzelm@29121
   204
  in
wenzelm@31617
   205
    (case cache of
wenzelm@31617
   206
      Unknown => deq NONE
wenzelm@31617
   207
    | Result last => deq (SOME last)
wenzelm@31617
   208
    | No_Result => (NONE, queue))
wenzelm@28384
   209
  end;
wenzelm@28202
   210
wenzelm@28165
   211
wenzelm@32055
   212
(* dequeue_towards -- adhoc dependencies *)
wenzelm@32055
   213
wenzelm@32814
   214
fun depend task deps (Queue {groups, jobs, ...}) =
wenzelm@32814
   215
  make_queue groups (fold (add_dep task) deps jobs) Unknown;
wenzelm@32814
   216
wenzelm@32249
   217
fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) =
wenzelm@32055
   218
  let
wenzelm@32055
   219
    fun ready task =
wenzelm@32055
   220
      (case Task_Graph.get_node jobs task of
wenzelm@32055
   221
        (group, Job list) =>
wenzelm@32250
   222
          if null (get_deps jobs task)
wenzelm@32101
   223
          then SOME (task, group, rev list)
wenzelm@32055
   224
          else NONE
wenzelm@32055
   225
      | _ => NONE);
wenzelm@32055
   226
    val tasks = filter (can (Task_Graph.get_node jobs)) deps;
wenzelm@32192
   227
    fun result (res as (task, _, _)) =
wenzelm@32192
   228
      let
wenzelm@32249
   229
        val jobs' = set_job task (Running thread) jobs;
wenzelm@32192
   230
        val cache' = Unknown;
wenzelm@32224
   231
      in ((SOME res, tasks), make_queue groups jobs' cache') end;
wenzelm@32055
   232
  in
wenzelm@32093
   233
    (case get_first ready tasks of
wenzelm@32192
   234
      SOME res => result res
wenzelm@32192
   235
    | NONE =>
wenzelm@32250
   236
        (case get_first (get_first ready o get_deps jobs) tasks of
wenzelm@32192
   237
          SOME res => result res
wenzelm@32224
   238
        | NONE => ((NONE, tasks), queue)))
wenzelm@32055
   239
  end;
wenzelm@32055
   240
wenzelm@32055
   241
wenzelm@32221
   242
(* finish *)
wenzelm@32221
   243
wenzelm@32221
   244
fun finish task (Queue {groups, jobs, cache}) =
wenzelm@32221
   245
  let
wenzelm@32221
   246
    val group = get_group jobs task;
wenzelm@32221
   247
    val groups' = groups
wenzelm@32221
   248
      |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group);
wenzelm@32221
   249
    val jobs' = Task_Graph.del_node task jobs;
wenzelm@32221
   250
    val maximal = null (Task_Graph.imm_succs jobs task);
wenzelm@32221
   251
    val cache' = if maximal then cache else Unknown;
wenzelm@32221
   252
  in (maximal, make_queue groups' jobs' cache') end;
wenzelm@32221
   253
wenzelm@28165
   254
end;