src/Pure/Concurrent/task_queue.ML
changeset 41684 b828d4b53386
parent 41683 73dde8006820
child 41695 afdbec23b92b
equal deleted inserted replaced
41683:73dde8006820 41684:b828d4b53386
   260     val task = new_task group name (SOME pri);
   260     val task = new_task group name (SOME pri);
   261     val groups' = groups
   261     val groups' = groups
   262       |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
   262       |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
   263     val jobs' = jobs
   263     val jobs' = jobs
   264       |> Task_Graph.new_node (task, Job [job])
   264       |> Task_Graph.new_node (task, Job [job])
   265       |> fold (add_job task) deps
   265       |> fold (add_job task) deps;
   266       |> fold (fold (add_job task) o get_deps jobs) deps;
       
   267     val minimal = null (get_deps jobs' task);
   266     val minimal = null (get_deps jobs' task);
   268   in ((task, minimal), make_queue groups' jobs') end;
   267   in ((task, minimal), make_queue groups' jobs') end;
   269 
   268 
   270 fun extend task job (Queue {groups, jobs}) =
   269 fun extend task job (Queue {groups, jobs}) =
   271   (case try (get_job jobs) task of
   270   (case try (get_job jobs) task of
   300 
   299 
   301 fun insert_deps (Deps tasks) = fold (insert (op =) o name_of_task) tasks;
   300 fun insert_deps (Deps tasks) = fold (insert (op =) o name_of_task) tasks;
   302 
   301 
   303 fun dequeue_deps thread (Deps deps) (queue as Queue {groups, jobs}) =
   302 fun dequeue_deps thread (Deps deps) (queue as Queue {groups, jobs}) =
   304   let
   303   let
   305     fun ready task = ready_job task (Task_Graph.get_entry jobs task);
   304     fun ready [] rest = (NONE, rev rest)
   306     val tasks = filter (can (Task_Graph.get_node jobs)) deps;
   305       | ready (task :: tasks) rest =
   307     fun result (res as (task, _)) =
   306           (case try (Task_Graph.get_entry jobs) task of
       
   307             NONE => ready tasks rest
       
   308           | SOME entry =>
       
   309               (case ready_job task entry of
       
   310                 NONE => ready tasks (task :: rest)
       
   311               | some => (some, List.revAppend (rest, tasks))));
       
   312 
       
   313     fun ready_dep _ [] = NONE
       
   314       | ready_dep seen (task :: tasks) =
       
   315           if member eq_task seen task then ready_dep seen tasks
       
   316           else
       
   317             let val entry as (_, (ds, _)) = Task_Graph.get_entry jobs task in
       
   318               (case ready_job task entry of
       
   319                 NONE => ready_dep (task :: seen) (ds @ tasks)
       
   320               | some => some)
       
   321             end;
       
   322 
       
   323     fun result (res as (task, _)) deps' =
   308       let val jobs' = set_job task (Running thread) jobs
   324       let val jobs' = set_job task (Running thread) jobs
   309       in ((SOME res, Deps tasks), make_queue groups jobs') end;
   325       in ((SOME res, Deps deps'), make_queue groups jobs') end;
   310   in
   326   in
   311     (case get_first ready tasks of
   327     (case ready deps [] of
   312       SOME res => result res
   328       (SOME res, deps') => result res deps'
   313     | NONE =>
   329     | (NONE, deps') =>
   314         (case get_first (get_first ready o get_deps jobs) tasks of
   330         (case ready_dep [] deps' of
   315           SOME res => result res
   331           SOME res => result res deps'
   316         | NONE => ((NONE, Deps tasks), queue)))
   332         | NONE => ((NONE, Deps deps'), queue)))
   317   end;
   333   end;
   318 
   334 
   319 end;
   335 end;
   320 
   336 
   321 
   337