clarified worker_wait;
clarified join_next/join_loop, with proper thread attributes to allow interrupting of join (e.g. via Lazy.force);
--- a/src/Pure/Concurrent/future.ML Wed Jan 28 22:50:00 2015 +0100
+++ b/src/Pure/Concurrent/future.ML Thu Jan 29 13:49:03 2015 +0100
@@ -236,13 +236,10 @@
in () end);
in () end;
-fun worker_wait active cond = (*requires SYNCHRONIZED*)
+fun worker_wait worker_state cond = (*requires SYNCHRONIZED*)
(case AList.lookup Thread.equal (! workers) (Thread.self ()) of
- SOME state =>
- (state := (if active then Waiting else Sleeping);
- wait cond;
- state := Working)
- | NONE => ignore (wait cond));
+ SOME state => Unsynchronized.setmp state worker_state wait cond
+ | NONE => wait cond);
fun worker_next () = (*requires SYNCHRONIZED*)
if length (! workers) > ! max_workers then
@@ -250,10 +247,10 @@
signal work_available;
NONE)
else if count_workers Working > ! max_active then
- (worker_wait false work_available; worker_next ())
+ (worker_wait Sleeping work_available; worker_next ())
else
(case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
- NONE => (worker_wait false work_available; worker_next ())
+ NONE => (worker_wait Sleeping work_available; worker_next ())
| some => (signal work_available; some));
fun worker_loop name =
@@ -487,21 +484,22 @@
local
-fun join_next deps = (*requires SYNCHRONIZED*)
+fun join_next atts deps = (*requires SYNCHRONIZED*)
if null deps then NONE
else
(case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
(NONE, []) => NONE
| (NONE, deps') =>
- (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
+ (worker_waiting deps' (fn () =>
+ Multithreading.with_attributes atts (fn _ =>
+ Exn.release (worker_wait Waiting work_finished)));
+ join_next atts deps')
| (SOME work, deps') => SOME (work, deps'));
-fun execute_work NONE = ()
- | execute_work (SOME (work, deps')) =
- (worker_joining (fn () => worker_exec work); join_work deps')
-and join_work deps =
- Multithreading.with_attributes Multithreading.no_interrupts
- (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
+fun join_loop atts deps =
+ (case SYNCHRONIZED "join" (fn () => join_next atts deps) of
+ NONE => ()
+ | SOME (work, deps') => (worker_joining (fn () => worker_exec work); join_loop atts deps'));
in
@@ -509,7 +507,9 @@
let
val _ =
if forall is_finished xs then ()
- else if is_some (worker_task ()) then join_work (map task_of xs)
+ else if is_some (worker_task ()) then
+ Multithreading.with_attributes Multithreading.no_interrupts
+ (fn orig_atts => join_loop orig_atts (map task_of xs))
else List.app (ignore o Single_Assignment.await o result_of) xs;
in map get_result xs end;