# HG changeset patch # User wenzelm # Date 1422535743 -3600 # Node ID c21b65a6834bdbba69d9a78e413f30dcf24af1b0 # Parent df5dc24ca71284c4144f86dea4a0ead8cd84596b clarified worker_wait; clarified join_next/join_loop, with proper thread attributes to allow interrupting of join (e.g. via Lazy.force); diff -r df5dc24ca712 -r c21b65a6834b src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Wed Jan 28 22:50:00 2015 +0100 +++ b/src/Pure/Concurrent/future.ML Thu Jan 29 13:49:03 2015 +0100 @@ -236,13 +236,10 @@ in () end); in () end; -fun worker_wait active cond = (*requires SYNCHRONIZED*) +fun worker_wait worker_state cond = (*requires SYNCHRONIZED*) (case AList.lookup Thread.equal (! workers) (Thread.self ()) of - SOME state => - (state := (if active then Waiting else Sleeping); - wait cond; - state := Working) - | NONE => ignore (wait cond)); + SOME state => Unsynchronized.setmp state worker_state wait cond + | NONE => wait cond); fun worker_next () = (*requires SYNCHRONIZED*) if length (! workers) > ! max_workers then @@ -250,10 +247,10 @@ signal work_available; NONE) else if count_workers Working > ! max_active then - (worker_wait false work_available; worker_next ()) + (worker_wait Sleeping work_available; worker_next ()) else (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of - NONE => (worker_wait false work_available; worker_next ()) + NONE => (worker_wait Sleeping work_available; worker_next ()) | some => (signal work_available; some)); fun worker_loop name = @@ -487,21 +484,22 @@ local -fun join_next deps = (*requires SYNCHRONIZED*) +fun join_next atts deps = (*requires SYNCHRONIZED*) if null deps then NONE else (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of (NONE, []) => NONE | (NONE, deps') => - (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps') + (worker_waiting deps' (fn () => + Multithreading.with_attributes atts (fn _ => + Exn.release (worker_wait Waiting work_finished))); + join_next atts deps') | (SOME work, deps') => SOME (work, deps')); -fun execute_work NONE = () - | execute_work (SOME (work, deps')) = - (worker_joining (fn () => worker_exec work); join_work deps') -and join_work deps = - Multithreading.with_attributes Multithreading.no_interrupts - (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps))); +fun join_loop atts deps = + (case SYNCHRONIZED "join" (fn () => join_next atts deps) of + NONE => () + | SOME (work, deps') => (worker_joining (fn () => worker_exec work); join_loop atts deps')); in @@ -509,7 +507,9 @@ let val _ = if forall is_finished xs then () - else if is_some (worker_task ()) then join_work (map task_of xs) + else if is_some (worker_task ()) then + Multithreading.with_attributes Multithreading.no_interrupts + (fn orig_atts => join_loop orig_atts (map task_of xs)) else List.app (ignore o Single_Assignment.await o result_of) xs; in map get_result xs end;