42 type 'a future |
42 type 'a future |
43 val task_of: 'a future -> task |
43 val task_of: 'a future -> task |
44 val group_of: 'a future -> group |
44 val group_of: 'a future -> group |
45 val peek: 'a future -> 'a Exn.result option |
45 val peek: 'a future -> 'a Exn.result option |
46 val is_finished: 'a future -> bool |
46 val is_finished: 'a future -> bool |
47 val fork_group: group -> (unit -> 'a) -> 'a future |
47 val bulk: {group: group option, deps: task list, pri: int} -> (unit -> 'a) list -> 'a future list |
48 val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future |
|
49 val fork_deps: 'b future list -> (unit -> 'a) -> 'a future |
|
50 val fork_pri: int -> (unit -> 'a) -> 'a future |
48 val fork_pri: int -> (unit -> 'a) -> 'a future |
51 val fork: (unit -> 'a) -> 'a future |
49 val fork: (unit -> 'a) -> 'a future |
52 val join_results: 'a future list -> 'a Exn.result list |
50 val join_results: 'a future list -> 'a Exn.result list |
53 val join_result: 'a future -> 'a Exn.result |
51 val join_result: 'a future -> 'a Exn.result |
54 val join: 'a future -> 'a |
52 val join: 'a future -> 'a |
399 |
397 |
400 (** futures **) |
398 (** futures **) |
401 |
399 |
402 (* fork *) |
400 (* fork *) |
403 |
401 |
404 fun fork_future opt_group deps pri e = |
402 fun bulk {group, deps, pri} es = |
405 let |
403 let |
406 val group = |
404 val grp = |
407 (case opt_group of |
405 (case group of |
408 NONE => worker_subgroup () |
406 NONE => worker_subgroup () |
409 | SOME group => group); |
407 | SOME grp => grp); |
410 val (result, job) = future_job group e; |
408 fun enqueue e (minimal, queue) = |
411 val task = SYNCHRONIZED "enqueue" (fn () => |
|
412 let |
409 let |
413 val (task, minimal) = |
410 val (result, job) = future_job grp e; |
414 Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job); |
411 val ((task, minimal'), queue') = Task_Queue.enqueue grp deps pri job queue; |
|
412 val future = Future {promised = false, task = task, group = grp, result = result}; |
|
413 in (future, (minimal orelse minimal', queue')) end; |
|
414 in |
|
415 SYNCHRONIZED "enqueue" (fn () => |
|
416 let |
|
417 val (futures, minimal) = |
|
418 Unsynchronized.change_result queue (fn q => |
|
419 let val (futures, (minimal, q')) = fold_map enqueue es (false, q) |
|
420 in ((futures, minimal), q') end); |
415 val _ = if minimal then signal work_available else (); |
421 val _ = if minimal then signal work_available else (); |
416 val _ = scheduler_check (); |
422 val _ = scheduler_check (); |
417 in task end); |
423 in futures end) |
418 in Future {promised = false, task = task, group = group, result = result} end; |
424 end; |
419 |
425 |
420 fun fork_group group e = fork_future (SOME group) [] 0 e; |
426 fun fork_pri pri e = singleton (bulk {group = NONE, deps = [], pri = pri}) e; |
421 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e; |
427 fun fork e = fork_pri 0 e; |
422 fun fork_deps deps e = fork_deps_pri deps 0 e; |
|
423 fun fork_pri pri e = fork_deps_pri [] pri e; |
|
424 fun fork e = fork_deps [] e; |
|
425 |
428 |
426 |
429 |
427 (* join *) |
430 (* join *) |
428 |
431 |
429 local |
432 local |
493 (case Task_Queue.extend task job (! queue) of |
496 (case Task_Queue.extend task job (! queue) of |
494 SOME queue' => (queue := queue'; true) |
497 SOME queue' => (queue := queue'; true) |
495 | NONE => false)); |
498 | NONE => false)); |
496 in |
499 in |
497 if extended then Future {promised = false, task = task, group = group, result = result} |
500 if extended then Future {promised = false, task = task, group = group, result = result} |
498 else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) |
501 else |
|
502 singleton (bulk {group = SOME group, deps = [task], pri = Task_Queue.pri_of_task task}) |
|
503 (fn () => f (join x)) |
499 end; |
504 end; |
500 |
505 |
501 |
506 |
502 (* promised futures -- fulfilled by external means *) |
507 (* promised futures -- fulfilled by external means *) |
503 |
508 |