src/Pure/Concurrent/task_queue.ML
changeset 60610 f52b4b0c10c4
parent 59333 4ef80efc36c8
child 62663 bea354f6ff21
equal deleted inserted replaced
60609:15620ae824c0 60610:f52b4b0c10c4
    13   val cancel_group: group -> exn -> unit
    13   val cancel_group: group -> exn -> unit
    14   val is_canceled: group -> bool
    14   val is_canceled: group -> bool
    15   val group_status: group -> exn list
    15   val group_status: group -> exn list
    16   val str_of_group: group -> string
    16   val str_of_group: group -> string
    17   val str_of_groups: group -> string
    17   val str_of_groups: group -> string
       
    18   val urgent_pri: int
    18   type task
    19   type task
    19   val dummy_task: task
    20   val dummy_task: task
    20   val group_of_task: task -> group
    21   val group_of_task: task -> group
    21   val name_of_task: task -> string
    22   val name_of_task: task -> string
    22   val pri_of_task: task -> int
    23   val pri_of_task: task -> int
    29   type queue
    30   type queue
    30   val empty: queue
    31   val empty: queue
    31   val group_tasks: queue -> group -> task list
    32   val group_tasks: queue -> group -> task list
    32   val known_task: queue -> task -> bool
    33   val known_task: queue -> task -> bool
    33   val all_passive: queue -> bool
    34   val all_passive: queue -> bool
    34   val status: queue -> {ready: int, pending: int, running: int, passive: int}
    35   val status: queue -> {ready: int, pending: int, running: int, passive: int, urgent: int}
    35   val cancel: queue -> group -> Thread.thread list
    36   val cancel: queue -> group -> Thread.thread list
    36   val cancel_all: queue -> group list * Thread.thread list
    37   val cancel_all: queue -> group list * Thread.thread list
    37   val finish: task -> queue -> bool * queue
    38   val finish: task -> queue -> bool * queue
    38   val enroll: Thread.thread -> string -> group -> queue -> task * queue
    39   val enroll: Thread.thread -> string -> group -> queue -> task * queue
    39   val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
    40   val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
    40   val enqueue: string -> group -> task list -> int -> (bool -> bool) -> queue -> task * queue
    41   val enqueue: string -> group -> task list -> int -> (bool -> bool) -> queue -> task * queue
    41   val extend: task -> (bool -> bool) -> queue -> queue option
    42   val extend: task -> (bool -> bool) -> queue -> queue option
    42   val dequeue_passive: Thread.thread -> task -> queue -> bool option * queue
    43   val dequeue_passive: Thread.thread -> task -> queue -> bool option * queue
    43   val dequeue: Thread.thread -> queue -> (task * (bool -> bool) list) option * queue
    44   val dequeue: Thread.thread -> bool -> queue -> (task * (bool -> bool) list) option * queue
    44   val dequeue_deps: Thread.thread -> task list -> queue ->
    45   val dequeue_deps: Thread.thread -> task list -> queue ->
    45     (((task * (bool -> bool) list) option * task list) * queue)
    46     (((task * (bool -> bool) list) option * task list) * queue)
    46 end;
    47 end;
    47 
    48 
    48 structure Task_Queue: TASK_QUEUE =
    49 structure Task_Queue: TASK_QUEUE =
    94 
    95 
    95 end;
    96 end;
    96 
    97 
    97 
    98 
    98 (* tasks *)
    99 (* tasks *)
       
   100 
       
   101 val urgent_pri = 1000;
    99 
   102 
   100 type timing = Time.time * Time.time * string list;  (*run, wait, wait dependencies*)
   103 type timing = Time.time * Time.time * string list;  (*run, wait, wait dependencies*)
   101 
   104 
   102 val timing_start = (Time.zeroTime, Time.zeroTime, []): timing;
   105 val timing_start = (Time.zeroTime, Time.zeroTime, []): timing;
   103 
   106 
   212   Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
   215   Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
   213 
   216 
   214 
   217 
   215 (* queue *)
   218 (* queue *)
   216 
   219 
   217 datatype queue = Queue of {groups: groups, jobs: jobs};
   220 datatype queue = Queue of {groups: groups, jobs: jobs, urgent: int};
   218 
   221 
   219 fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
   222 fun make_queue groups jobs urgent = Queue {groups = groups, jobs = jobs, urgent = urgent};
   220 val empty = make_queue Inttab.empty Task_Graph.empty;
   223 val empty = make_queue Inttab.empty Task_Graph.empty 0;
   221 
   224 
   222 fun group_tasks (Queue {groups, ...}) group = Tasks.keys (get_tasks groups (group_id group));
   225 fun group_tasks (Queue {groups, ...}) group = Tasks.keys (get_tasks groups (group_id group));
   223 fun known_task (Queue {jobs, ...}) task = can (Task_Graph.get_entry jobs) task;
   226 fun known_task (Queue {jobs, ...}) task = can (Task_Graph.get_entry jobs) task;
   224 
   227 
   225 
   228 
   231       if Task_Graph.Keys.is_empty deps andalso is_canceled (group_of_task task)
   234       if Task_Graph.Keys.is_empty deps andalso is_canceled (group_of_task task)
   232       then SOME (task, [fn _ => abort ()])
   235       then SOME (task, [fn _ => abort ()])
   233       else NONE
   236       else NONE
   234   | ready_job _ = NONE;
   237   | ready_job _ = NONE;
   235 
   238 
       
   239 fun ready_job_urgent false = ready_job
       
   240   | ready_job_urgent true = (fn entry as (task, _) =>
       
   241       if pri_of_task task >= urgent_pri then ready_job entry else NONE);
       
   242 
   236 fun active_job (task, (Running _, _)) = SOME (task, [])
   243 fun active_job (task, (Running _, _)) = SOME (task, [])
   237   | active_job arg = ready_job arg;
   244   | active_job arg = ready_job arg;
   238 
   245 
   239 fun all_passive (Queue {jobs, ...}) = is_none (Task_Graph.get_first active_job jobs);
   246 fun all_passive (Queue {jobs, ...}) = is_none (Task_Graph.get_first active_job jobs);
   240 
   247 
   241 
   248 
   242 (* queue status *)
   249 (* queue status *)
   243 
   250 
   244 fun status (Queue {jobs, ...}) =
   251 fun status (Queue {jobs, urgent, ...}) =
   245   let
   252   let
   246     val (x, y, z, w) =
   253     val (x, y, z, w) =
   247       Task_Graph.fold (fn (_, (job, (deps, _))) => fn (x, y, z, w) =>
   254       Task_Graph.fold (fn (_, (job, (deps, _))) => fn (x, y, z, w) =>
   248           (case job of
   255           (case job of
   249             Job _ => if Task_Graph.Keys.is_empty deps then (x + 1, y, z, w) else (x, y + 1, z, w)
   256             Job _ => if Task_Graph.Keys.is_empty deps then (x + 1, y, z, w) else (x, y + 1, z, w)
   250           | Running _ => (x, y, z + 1, w)
   257           | Running _ => (x, y, z + 1, w)
   251           | Passive _ => (x, y, z, w + 1)))
   258           | Passive _ => (x, y, z, w + 1)))
   252         jobs (0, 0, 0, 0);
   259         jobs (0, 0, 0, 0);
   253   in {ready = x, pending = y, running = z, passive = w} end;
   260   in {ready = x, pending = y, running = z, passive = w, urgent = urgent} end;
   254 
   261 
   255 
   262 
   256 
   263 
   257 (** task queue operations **)
   264 (** task queue operations **)
   258 
   265 
   259 (* cancel -- peers and sub-groups *)
   266 (* cancel -- peers and sub-groups *)
   260 
   267 
   261 fun cancel (Queue {groups, jobs}) group =
   268 fun cancel (Queue {groups, jobs, ...}) group =
   262   let
   269   let
   263     val _ = cancel_group group Exn.Interrupt;
   270     val _ = cancel_group group Exn.Interrupt;
   264     val running =
   271     val running =
   265       Tasks.fold (fn (task, _) =>
   272       Tasks.fold (fn (task, _) =>
   266           (case get_job jobs task of Running thread => insert Thread.equal thread | _ => I))
   273           (case get_job jobs task of Running thread => insert Thread.equal thread | _ => I))
   282   in running end;
   289   in running end;
   283 
   290 
   284 
   291 
   285 (* finish *)
   292 (* finish *)
   286 
   293 
   287 fun finish task (Queue {groups, jobs}) =
   294 fun finish task (Queue {groups, jobs, urgent}) =
   288   let
   295   let
   289     val group = group_of_task task;
   296     val group = group_of_task task;
   290     val groups' = fold_groups (fn g => del_task (group_id g, task)) group groups;
   297     val groups' = fold_groups (fn g => del_task (group_id g, task)) group groups;
   291     val jobs' = Task_Graph.del_node task jobs;
   298     val jobs' = Task_Graph.del_node task jobs;
   292     val maximal = Task_Graph.is_maximal jobs task;
   299     val maximal = Task_Graph.is_maximal jobs task;
   293   in (maximal, make_queue groups' jobs') end;
   300   in (maximal, make_queue groups' jobs' urgent) end;
   294 
   301 
   295 
   302 
   296 (* enroll *)
   303 (* enroll *)
   297 
   304 
   298 fun enroll thread name group (Queue {groups, jobs}) =
   305 fun enroll thread name group (Queue {groups, jobs, urgent}) =
   299   let
   306   let
   300     val task = new_task group name NONE;
   307     val task = new_task group name NONE;
   301     val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups;
   308     val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups;
   302     val jobs' = jobs |> Task_Graph.new_node (task, Running thread);
   309     val jobs' = jobs |> Task_Graph.new_node (task, Running thread);
   303   in (task, make_queue groups' jobs') end;
   310   in (task, make_queue groups' jobs' urgent) end;
   304 
   311 
   305 
   312 
   306 (* enqueue *)
   313 (* enqueue *)
   307 
   314 
   308 fun enqueue_passive group abort (Queue {groups, jobs}) =
   315 fun enqueue_passive group abort (Queue {groups, jobs, urgent}) =
   309   let
   316   let
   310     val task = new_task group "passive" NONE;
   317     val task = new_task group "passive" NONE;
   311     val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups;
   318     val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups;
   312     val jobs' = jobs |> Task_Graph.new_node (task, Passive abort);
   319     val jobs' = jobs |> Task_Graph.new_node (task, Passive abort);
   313   in (task, make_queue groups' jobs') end;
   320   in (task, make_queue groups' jobs' urgent) end;
   314 
   321 
   315 fun enqueue name group deps pri job (Queue {groups, jobs}) =
   322 fun enqueue name group deps pri job (Queue {groups, jobs, urgent}) =
   316   let
   323   let
   317     val task = new_task group name (SOME pri);
   324     val task = new_task group name (SOME pri);
   318     val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups;
   325     val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups;
   319     val jobs' = jobs
   326     val jobs' = jobs
   320       |> Task_Graph.new_node (task, Job [job])
   327       |> Task_Graph.new_node (task, Job [job])
   321       |> fold (add_job task) deps;
   328       |> fold (add_job task) deps;
   322   in (task, make_queue groups' jobs') end;
   329     val urgent' = if pri >= urgent_pri then urgent + 1 else urgent;
   323 
   330   in (task, make_queue groups' jobs' urgent') end;
   324 fun extend task job (Queue {groups, jobs}) =
   331 
       
   332 fun extend task job (Queue {groups, jobs, urgent}) =
   325   (case try (get_job jobs) task of
   333   (case try (get_job jobs) task of
   326     SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs))
   334     SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs) urgent)
   327   | _ => NONE);
   335   | _ => NONE);
   328 
   336 
   329 
   337 
   330 (* dequeue *)
   338 (* dequeue *)
   331 
   339 
   332 fun dequeue_passive thread task (queue as Queue {groups, jobs}) =
   340 fun dequeue_passive thread task (queue as Queue {groups, jobs, urgent}) =
   333   (case try (get_job jobs) task of
   341   (case try (get_job jobs) task of
   334     SOME (Passive _) =>
   342     SOME (Passive _) =>
   335       let val jobs' = set_job task (Running thread) jobs
   343       let val jobs' = set_job task (Running thread) jobs
   336       in (SOME true, make_queue groups jobs') end
   344       in (SOME true, make_queue groups jobs' urgent) end
   337   | SOME _ => (SOME false, queue)
   345   | SOME _ => (SOME false, queue)
   338   | NONE => (NONE, queue));
   346   | NONE => (NONE, queue));
   339 
   347 
   340 fun dequeue thread (queue as Queue {groups, jobs}) =
   348 fun dequeue thread urgent_only (queue as Queue {groups, jobs, urgent}) =
   341   (case Task_Graph.get_first ready_job jobs of
   349   if not urgent_only orelse urgent > 0 then
   342     SOME (result as (task, _)) =>
   350     (case Task_Graph.get_first (ready_job_urgent urgent_only) jobs of
   343       let val jobs' = set_job task (Running thread) jobs
   351       SOME (result as (task, _)) =>
   344       in (SOME result, make_queue groups jobs') end
   352         let
   345   | NONE => (NONE, queue));
   353           val jobs' = set_job task (Running thread) jobs;
       
   354           val urgent' = if pri_of_task task >= urgent_pri then urgent - 1 else urgent;
       
   355         in (SOME result, make_queue groups jobs' urgent') end
       
   356     | NONE => (NONE, queue))
       
   357   else (NONE, queue);
   346 
   358 
   347 
   359 
   348 (* dequeue wrt. dynamic dependencies *)
   360 (* dequeue wrt. dynamic dependencies *)
   349 
   361 
   350 fun dequeue_deps thread deps (queue as Queue {groups, jobs}) =
   362 fun dequeue_deps thread deps (queue as Queue {groups, jobs, urgent}) =
   351   let
   363   let
   352     fun ready [] rest = (NONE, rev rest)
   364     fun ready [] rest = (NONE, rev rest)
   353       | ready (task :: tasks) rest =
   365       | ready (task :: tasks) rest =
   354           (case try (Task_Graph.get_entry jobs) task of
   366           (case try (Task_Graph.get_entry jobs) task of
   355             NONE => ready tasks rest
   367             NONE => ready tasks rest
   367                 NONE => ready_dep (Tasks.update (task, ()) seen) (Task_Graph.Keys.dest ds @ tasks)
   379                 NONE => ready_dep (Tasks.update (task, ()) seen) (Task_Graph.Keys.dest ds @ tasks)
   368               | some => some)
   380               | some => some)
   369             end;
   381             end;
   370 
   382 
   371     fun result (res as (task, _)) deps' =
   383     fun result (res as (task, _)) deps' =
   372       let val jobs' = set_job task (Running thread) jobs
   384       let
   373       in ((SOME res, deps'), make_queue groups jobs') end;
   385         val jobs' = set_job task (Running thread) jobs;
       
   386         val urgent' = if pri_of_task task >= urgent_pri then urgent - 1 else urgent;
       
   387       in ((SOME res, deps'), make_queue groups jobs' urgent') end;
   374   in
   388   in
   375     (case ready deps [] of
   389     (case ready deps [] of
   376       (SOME res, deps') => result res deps'
   390       (SOME res, deps') => result res deps'
   377     | (NONE, deps') =>
   391     | (NONE, deps') =>
   378         (case ready_dep Tasks.empty deps' of
   392         (case ready_dep Tasks.empty deps' of