28 sig |
28 sig |
29 val enabled: unit -> bool |
29 val enabled: unit -> bool |
30 type task = Task_Queue.task |
30 type task = Task_Queue.task |
31 type group = Task_Queue.group |
31 type group = Task_Queue.group |
32 val is_worker: unit -> bool |
32 val is_worker: unit -> bool |
|
33 val worker_group: unit -> Task_Queue.group option |
33 type 'a future |
34 type 'a future |
34 val task_of: 'a future -> task |
35 val task_of: 'a future -> task |
35 val group_of: 'a future -> group |
36 val group_of: 'a future -> group |
36 val peek: 'a future -> 'a Exn.result option |
37 val peek: 'a future -> 'a Exn.result option |
37 val is_finished: 'a future -> bool |
38 val is_finished: 'a future -> bool |
38 val value: 'a -> 'a future |
39 val value: 'a -> 'a future |
39 val fork: (unit -> 'a) -> 'a future |
40 val fork: (unit -> 'a) -> 'a future |
40 val fork_group: group -> (unit -> 'a) -> 'a future |
41 val fork_group: group -> (unit -> 'a) -> 'a future |
41 val fork_deps: 'b future list -> (unit -> 'a) -> 'a future |
42 val fork_deps: 'b future list -> (unit -> 'a) -> 'a future |
42 val fork_pri: int -> (unit -> 'a) -> 'a future |
43 val fork_pri: int -> (unit -> 'a) -> 'a future |
43 val fork_local: int -> (unit -> 'a) -> 'a future |
|
44 val join_results: 'a future list -> 'a Exn.result list |
44 val join_results: 'a future list -> 'a Exn.result list |
45 val join_result: 'a future -> 'a Exn.result |
45 val join_result: 'a future -> 'a Exn.result |
46 val join: 'a future -> 'a |
46 val join: 'a future -> 'a |
47 val map: ('a -> 'b) -> 'a future -> 'b future |
47 val map: ('a -> 'b) -> 'a future -> 'b future |
48 val interruptible_task: ('a -> 'b) -> 'a -> 'b |
48 val interruptible_task: ('a -> 'b) -> 'a -> 'b |
74 fun setmp_thread_data data f x = |
74 fun setmp_thread_data data f x = |
75 Library.setmp_thread_data tag (thread_data ()) (SOME data) f x; |
75 Library.setmp_thread_data tag (thread_data ()) (SOME data) f x; |
76 end; |
76 end; |
77 |
77 |
78 val is_worker = is_some o thread_data; |
78 val is_worker = is_some o thread_data; |
|
79 val worker_group = Option.map #3 o thread_data; |
79 |
80 |
80 |
81 |
81 (* datatype future *) |
82 (* datatype future *) |
82 |
83 |
83 datatype 'a future = Future of |
84 datatype 'a future = Future of |
91 fun peek (Future {result, ...}) = ! result; |
92 fun peek (Future {result, ...}) = ! result; |
92 fun is_finished x = is_some (peek x); |
93 fun is_finished x = is_some (peek x); |
93 |
94 |
94 fun value x = Future |
95 fun value x = Future |
95 {task = Task_Queue.new_task 0, |
96 {task = Task_Queue.new_task 0, |
96 group = Task_Queue.new_group (), |
97 group = Task_Queue.new_group NONE, |
97 result = ref (SOME (Exn.Result x))}; |
98 result = ref (SOME (Exn.Result x))}; |
98 |
99 |
99 |
100 |
100 |
101 |
101 (** scheduling **) |
102 (** scheduling **) |
170 change canceled (insert Task_Queue.eq_group group); |
171 change canceled (insert Task_Queue.eq_group group); |
171 |
172 |
172 fun execute name (task, group, jobs) = |
173 fun execute name (task, group, jobs) = |
173 let |
174 let |
174 val _ = trace_active (); |
175 val _ = trace_active (); |
175 val valid = null (Task_Queue.group_exns group); |
176 val valid = not (Task_Queue.is_canceled group); |
176 val ok = setmp_thread_data (name, task, group) (fn () => |
177 val ok = setmp_thread_data (name, task, group) (fn () => |
177 fold (fn job => fn ok => job valid andalso ok) jobs true) (); |
178 fold (fn job => fn ok => job valid andalso ok) jobs true) (); |
178 val _ = SYNCHRONIZED "execute" (fn () => |
179 val _ = SYNCHRONIZED "execute" (fn () => |
179 (change queue (Task_Queue.finish task); |
180 (change queue (Task_Queue.finish task); |
180 if ok then () |
181 if ok then () |
277 |
278 |
278 fun fork_future opt_group deps pri e = |
279 fun fork_future opt_group deps pri e = |
279 let |
280 let |
280 val _ = scheduler_check "future check"; |
281 val _ = scheduler_check "future check"; |
281 |
282 |
282 val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ()); |
283 val group = |
|
284 (case opt_group of |
|
285 SOME group => group |
|
286 | NONE => Task_Queue.new_group (worker_group ())); |
283 val (result, job) = future_job group e; |
287 val (result, job) = future_job group e; |
284 val task = SYNCHRONIZED "future" (fn () => |
288 val task = SYNCHRONIZED "future" (fn () => |
285 change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ()); |
289 change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ()); |
286 in Future {task = task, group = group, result = result} end; |
290 in Future {task = task, group = group, result = result} end; |
287 |
291 |
288 fun fork e = fork_future NONE [] 0 e; |
292 fun fork e = fork_future NONE [] 0 e; |
289 fun fork_group group e = fork_future (SOME group) [] 0 e; |
293 fun fork_group group e = fork_future (SOME group) [] 0 e; |
290 fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e; |
294 fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e; |
291 fun fork_pri pri e = fork_future NONE [] pri e; |
295 fun fork_pri pri e = fork_future NONE [] pri e; |
292 fun fork_local pri e = fork_future (Option.map #3 (thread_data ())) [] pri e; |
|
293 |
296 |
294 |
297 |
295 (* join *) |
298 (* join *) |
296 |
299 |
297 local |
300 local |
298 |
301 |
299 fun get_result x = |
302 fun get_result x = |
300 (case peek x of |
303 (case peek x of |
301 SOME (Exn.Exn Exn.Interrupt) => Exn.Exn (Exn.EXCEPTIONS (Task_Queue.group_exns (group_of x))) |
304 NONE => Exn.Exn (SYS_ERROR "unfinished future") |
302 | SOME res => res |
305 | SOME (Exn.Exn Exn.Interrupt) => |
303 | NONE => Exn.Exn (SYS_ERROR "unfinished future")); |
306 Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x)))) |
|
307 | SOME res => res); |
304 |
308 |
305 fun join_next deps = (*requires SYNCHRONIZED*) |
309 fun join_next deps = (*requires SYNCHRONIZED*) |
306 if overloaded () then (worker_wait (); join_next deps) |
310 if overloaded () then (worker_wait (); join_next deps) |
307 else change_result queue (Task_Queue.dequeue_towards deps); |
311 else change_result queue (Task_Queue.dequeue_towards deps); |
308 |
312 |
343 fun map_future f x = |
347 fun map_future f x = |
344 let |
348 let |
345 val _ = scheduler_check "map_future check"; |
349 val _ = scheduler_check "map_future check"; |
346 |
350 |
347 val task = task_of x; |
351 val task = task_of x; |
348 val group = Task_Queue.new_group (); |
352 val group = Task_Queue.new_group (SOME (group_of x)); |
349 val (result, job) = future_job group (fn () => f (join x)); |
353 val (result, job) = future_job group (fn () => f (join x)); |
350 |
354 |
351 val extended = SYNCHRONIZED "map_future" (fn () => |
355 val extended = SYNCHRONIZED "map_future" (fn () => |
352 (case Task_Queue.extend task job (! queue) of |
356 (case Task_Queue.extend task job (! queue) of |
353 SOME queue' => (queue := queue'; true) |
357 SOME queue' => (queue := queue'; true) |