--- a/src/HOL/Tools/res_axioms.ML Tue Jan 06 13:36:42 2009 +0100
+++ b/src/HOL/Tools/res_axioms.ML Tue Jan 06 14:45:45 2009 +0100
@@ -456,7 +456,7 @@
|> fold (mark_seen o #1) new_facts
|> fold_map skolem_def (sort_distinct (Thm.thm_ord o pairself snd) new_thms)
|>> map_filter I;
- val cache_entries = ParList.map skolem_cnfs defs;
+ val cache_entries = Par_List.map skolem_cnfs defs;
in SOME (fold update_cache cache_entries thy') end
end;
--- a/src/Pure/Concurrent/future.ML Tue Jan 06 13:36:42 2009 +0100
+++ b/src/Pure/Concurrent/future.ML Tue Jan 06 14:45:45 2009 +0100
@@ -137,15 +137,16 @@
change workers (AList.update Thread.equal (Thread.self (), active));
-(* execute *)
+(* execute jobs *)
fun do_cancel group = (*requires SYNCHRONIZED*)
change canceled (insert Task_Queue.eq_group group);
-fun execute name (task, group, run) =
+fun execute name (task, group, jobs) =
let
val _ = trace_active ();
- val ok = setmp_thread_data (name, task) run ();
+ val ok = setmp_thread_data (name, task) (fn () =>
+ fold (fn job => fn ok => job ok) jobs (Task_Queue.is_valid group)) ();
val _ = SYNCHRONIZED "execute" (fn () =>
(change queue (Task_Queue.finish task);
if ok then ()
@@ -225,16 +226,15 @@
else ());
-(* future values: fork independent computation *)
+
+(** futures **)
-fun future opt_group deps pri (e: unit -> 'a) =
+(* future job: fill result *)
+
+fun future_job group (e: unit -> 'a) =
let
- val _ = scheduler_check "future check";
-
- val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());
-
val result = ref (NONE: 'a Exn.result option);
- val run = Multithreading.with_attributes (Thread.getAttributes ())
+ val job = Multithreading.with_attributes (Thread.getAttributes ())
(fn _ => fn ok =>
let
val res = if ok then Exn.capture e () else Exn.Exn Exn.Interrupt;
@@ -245,63 +245,88 @@
| Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true)
| _ => false);
in res_ok end);
+ in (result, job) end;
+
+(* fork *)
+
+fun fork_future opt_group deps pri e =
+ let
+ val _ = scheduler_check "future check";
+
+ val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());
+ val (result, job) = future_job group e;
val task = SYNCHRONIZED "future" (fn () =>
- change_result queue (Task_Queue.enqueue group deps pri run) before notify_all ());
+ change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ());
in Future {task = task, group = group, result = result} end;
-fun fork e = future NONE [] 0 e;
-fun fork_group group e = future (SOME group) [] 0 e;
-fun fork_deps deps e = future NONE (map task_of deps) 0 e;
-fun fork_pri pri e = future NONE [] pri e;
+fun fork e = fork_future NONE [] 0 e;
+fun fork_group group e = fork_future (SOME group) [] 0 e;
+fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
+fun fork_pri pri e = fork_future NONE [] pri e;
-(* join: retrieve results *)
+(* join *)
+
+fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);
-fun join_results [] = []
- | join_results xs = uninterruptible (fn _ => fn () =>
- let
- val _ = scheduler_check "join check";
- val _ = Multithreading.self_critical () andalso
- exists (not o is_finished) xs andalso
- error "Cannot join future values within critical section";
+fun join_results xs =
+ if forall is_finished xs then map get_result xs
+ else uninterruptible (fn _ => fn () =>
+ let
+ val _ = scheduler_check "join check";
+ val _ = Multithreading.self_critical () andalso
+ error "Cannot join future values within critical section";
- fun join_loop _ [] = ()
- | join_loop name tasks =
- (case SYNCHRONIZED name (fn () =>
- change_result queue (Task_Queue.dequeue_towards tasks)) of
- NONE => ()
- | SOME (work, tasks') => (execute name work; join_loop name tasks'));
- val _ =
- (case thread_data () of
- NONE =>
- (*alien thread -- refrain from contending for resources*)
- while exists (not o is_finished) xs
- do SYNCHRONIZED "join_thread" (fn () => wait ())
- | SOME (name, task) =>
- (*proper task -- actively work towards results*)
- let
- val unfinished = xs |> map_filter
- (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
- val _ = SYNCHRONIZED "join" (fn () =>
- (change queue (Task_Queue.depend unfinished task); notify_all ()));
- val _ = join_loop ("join_loop: " ^ name) unfinished;
- val _ =
- while exists (not o is_finished) xs
- do SYNCHRONIZED "join_task" (fn () => worker_wait ());
- in () end);
+ fun join_loop _ [] = ()
+ | join_loop name deps =
+ (case SYNCHRONIZED name (fn () =>
+ change_result queue (Task_Queue.dequeue_towards deps)) of
+ NONE => ()
+ | SOME (work, deps') => (execute name work; join_loop name deps'));
+ val _ =
+ (case thread_data () of
+ NONE =>
+ (*alien thread -- refrain from contending for resources*)
+ while not (forall is_finished xs)
+ do SYNCHRONIZED "join_thread" (fn () => wait ())
+ | SOME (name, task) =>
+ (*proper task -- actively work towards results*)
+ let
+ val pending = filter_out is_finished xs;
+ val deps = map task_of pending;
+ val _ = SYNCHRONIZED "join" (fn () =>
+ (change queue (Task_Queue.depend deps task); notify_all ()));
+ val _ = join_loop ("join_loop: " ^ name) deps;
+ val _ =
+ while not (forall is_finished pending)
+ do SYNCHRONIZED "join_task" (fn () => worker_wait ());
+ in () end);
- in xs |> map (fn Future {result = ref (SOME res), ...} => res) end) ();
+ in map get_result xs end) ();
fun join_result x = singleton join_results x;
fun join x = Exn.release (join_result x);
-fun map f x =
- let val task = task_of x
- in future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) end;
+
+(* map *)
+
+fun map_future f (x as Future {task, group, ...}) =
+ let
+ val _ = scheduler_check "map_future check";
+
+ val (result', job) = future_job group (fn () => f (join x));
+ val extended = SYNCHRONIZED "map_future" (fn () =>
+ (case Task_Queue.extend task job (! queue) of
+ SOME queue' => (queue := queue'; true)
+ | NONE => false));
+ in
+ if extended then Future {task = task, group = group, result = result'}
+ else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
+ end;
-(* misc operations *)
+(* cancel *)
(*interrupt: permissive signal, may get ignored*)
fun interrupt_task id = SYNCHRONIZED "interrupt"
@@ -313,7 +338,9 @@
SYNCHRONIZED "cancel" (fn () => (do_cancel (group_of x); notify_all ())));
-(*global join and shutdown*)
+
+(** global join and shutdown **)
+
fun shutdown () =
if Multithreading.available then
(scheduler_check "shutdown check";
@@ -327,6 +354,10 @@
OS.Process.sleep (Time.fromMilliseconds 300))))
else ();
+
+(*final declarations of this structure!*)
+val map = map_future;
+
end;
type 'a future = 'a Future.future;
--- a/src/Pure/Concurrent/par_list.ML Tue Jan 06 13:36:42 2009 +0100
+++ b/src/Pure/Concurrent/par_list.ML Tue Jan 06 14:45:45 2009 +0100
@@ -1,5 +1,4 @@
(* Title: Pure/Concurrent/par_list.ML
- ID: $Id$
Author: Makarius
Parallel list combinators.
@@ -24,7 +23,7 @@
val forall: ('a -> bool) -> 'a list -> bool
end;
-structure ParList: PAR_LIST =
+structure Par_List: PAR_LIST =
struct
fun raw_map f xs =
--- a/src/Pure/Concurrent/par_list_dummy.ML Tue Jan 06 13:36:42 2009 +0100
+++ b/src/Pure/Concurrent/par_list_dummy.ML Tue Jan 06 14:45:45 2009 +0100
@@ -1,11 +1,10 @@
(* Title: Pure/Concurrent/par_list_dummy.ML
- ID: $Id$
Author: Makarius
Dummy version of parallel list combinators -- plain sequential evaluation.
*)
-structure ParList: PAR_LIST =
+structure Par_List: PAR_LIST =
struct
val map = map;
--- a/src/Pure/Concurrent/task_queue.ML Tue Jan 06 13:36:42 2009 +0100
+++ b/src/Pure/Concurrent/task_queue.ML Tue Jan 06 14:45:45 2009 +0100
@@ -13,16 +13,18 @@
type group
val eq_group: group * group -> bool
val new_group: unit -> group
+ val is_valid: group -> bool
val invalidate_group: group -> unit
val str_of_group: group -> string
type queue
val empty: queue
val is_empty: queue -> bool
val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue
+ val extend: task -> (bool -> bool) -> queue -> queue option
val depend: task list -> task -> queue -> queue
- val dequeue: queue -> (task * group * (unit -> bool)) option * queue
+ val dequeue: queue -> (task * group * (bool -> bool) list) option * queue
val dequeue_towards: task list -> queue ->
- (((task * group * (unit -> bool)) * task list) option * queue)
+ (((task * group * (bool -> bool) list) * task list) option * queue)
val interrupt: queue -> task -> unit
val interrupt_external: queue -> string -> unit
val cancel: queue -> group -> bool
@@ -52,6 +54,7 @@
fun new_group () = Group (serial (), ref true);
+fun is_valid (Group (_, ref ok)) = ok;
fun invalidate_group (Group (_, ok)) = ok := false;
fun str_of_group (Group (i, ref ok)) =
@@ -61,14 +64,14 @@
(* jobs *)
datatype job =
- Job of bool -> bool |
+ Job of (bool -> bool) list |
Running of Thread.thread;
type jobs = (group * job) Task_Graph.T;
fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
-fun map_job task f (jobs: jobs) = Task_Graph.map_node task (apsnd f) jobs;
+fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs;
fun add_job task dep (jobs: jobs) =
Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
@@ -96,9 +99,14 @@
val task = new_task pri;
val groups' = Inttab.cons_list (gid, task) groups;
val jobs' = jobs
- |> Task_Graph.new_node (task, (group, Job job)) |> fold (add_job task) deps;
+ |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps;
in (task, make_queue groups' jobs') end;
+fun extend task job (Queue {groups, jobs}) =
+ (case try (get_job jobs) task of
+ SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs))
+ | _ => NONE);
+
fun depend deps task (Queue {groups, jobs}) =
make_queue groups (fold (add_job_acyclic task) deps jobs);
@@ -109,14 +117,13 @@
fun dequeue_result NONE queue = (NONE, queue)
| dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) =
- (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs));
+ (SOME result, make_queue groups (set_job task (Running (Thread.self ())) jobs));
in
fun dequeue (queue as Queue {jobs, ...}) =
let
- fun ready (task, ((group as Group (_, ref ok), Job job), ([], _))) =
- SOME (task, group, (fn () => job ok))
+ fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
| ready _ = NONE;
in dequeue_result (Task_Graph.get_first ready jobs) queue end;
@@ -126,8 +133,8 @@
fun ready task =
(case Task_Graph.get_node jobs task of
- (group as Group (_, ref ok), Job job) =>
- if null (Task_Graph.imm_preds jobs task) then SOME (task, group, (fn () => job ok))
+ (group, Job list) =>
+ if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list)
else NONE
| _ => NONE);
in
--- a/src/Pure/context.ML Tue Jan 06 13:36:42 2009 +0100
+++ b/src/Pure/context.ML Tue Jan 06 14:45:45 2009 +0100
@@ -132,7 +132,15 @@
val copy_data = Datatab.map' invoke_copy;
val extend_data = Datatab.map' invoke_extend;
-fun merge_data pp = Datatab.join (invoke_merge pp) o pairself extend_data;
+
+fun merge_data pp (data1, data2) =
+ Datatab.keys (Datatab.merge (K true) (data1, data2))
+ |> Par_List.map (fn k =>
+ (case (Datatab.lookup data1 k, Datatab.lookup data2 k) of
+ (SOME x, NONE) => (k, invoke_extend k x)
+ | (NONE, SOME y) => (k, invoke_extend k y)
+ | (SOME x, SOME y) => (k, invoke_merge pp k (invoke_extend k x, invoke_extend k y))))
+ |> Datatab.make;
end;