# HG changeset patch # User wenzelm # Date 1248699993 -7200 # Node ID a46f5e9b1718afdd61a8cb89ebc88034d2060116 # Parent f5f46d6eb95b28f4a480154d5a40660b27254f7f dequeue_towards: always return active tasks; join_work: imitate worker more closely, keep active if queue appears to be blocked for the moment -- it may become free again after some worker_finished event; diff -r f5f46d6eb95b -r a46f5e9b1718 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Mon Jul 27 13:32:29 2009 +0200 +++ b/src/Pure/Concurrent/future.ML Mon Jul 27 15:06:33 2009 +0200 @@ -328,14 +328,24 @@ Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x)))) | SOME res => res); +fun join_wait x = + if SYNCHRONIZED "join_wait" (fn () => + is_finished x orelse (wait work_finished; false)) + then () else join_wait x; + fun join_next deps = (*requires SYNCHRONIZED*) - if overloaded () then (worker_wait scheduler_event; join_next deps) - else change_result queue (Task_Queue.dequeue_towards deps); + if null deps then NONE + else if overloaded () then (worker_wait scheduler_event; join_next deps) + else + (case change_result queue (Task_Queue.dequeue_towards deps) of + (NONE, []) => NONE + | (NONE, deps') => (worker_wait work_finished; join_next deps') + | (SOME work, deps') => SOME (work, deps')); -fun join_deps deps = +fun join_work deps = (case SYNCHRONIZED "join" (fn () => join_next deps) of NONE => () - | SOME (work, deps') => (execute "join" work; join_deps deps')); + | SOME (work, deps') => (execute "join" work; join_work deps')); in @@ -346,20 +356,9 @@ val _ = scheduler_check "join check"; val _ = Multithreading.self_critical () andalso error "Cannot join future values within critical section"; - - val worker = is_worker (); - val _ = if worker then join_deps (map task_of xs) else (); - - fun join_wait x = - if SYNCHRONIZED "join_wait" (fn () => - is_finished x orelse ((if worker then worker_wait else wait) work_finished; false)) - then () else join_wait x; - - val _ = xs |> List.app (fn x => - let val time = Multithreading.real_time join_wait x in - Multithreading.tracing_time true time - (fn () => "joined after " ^ Time.toString time) - end); + val _ = + if is_worker () then join_work (map task_of xs) + else List.app join_wait xs; in map get_result xs end) (); end; diff -r f5f46d6eb95b -r a46f5e9b1718 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Mon Jul 27 13:32:29 2009 +0200 +++ b/src/Pure/Concurrent/task_queue.ML Mon Jul 27 15:06:33 2009 +0200 @@ -28,7 +28,7 @@ val extend: task -> (bool -> bool) -> queue -> queue option val dequeue: queue -> (task * group * (bool -> bool) list) option * queue val dequeue_towards: task list -> queue -> - (((task * group * (bool -> bool) list) * task list) option * queue) + (((task * group * (bool -> bool) list) option * task list) * queue) val finish: task -> queue -> bool * queue end; @@ -215,14 +215,14 @@ let val jobs' = set_job task (Running (Thread.self ())) jobs; val cache' = Unknown; - in (SOME (res, tasks), make_queue groups jobs' cache') end; + in ((SOME res, tasks), make_queue groups jobs' cache') end; in (case get_first ready tasks of SOME res => result res | NONE => (case get_first (get_first ready o Task_Graph.imm_preds jobs) tasks of SOME res => result res - | NONE => (NONE, queue))) + | NONE => ((NONE, tasks), queue))) end;