# HG changeset patch # User wenzelm # Date 1231273063 -3600 # Node ID 68b45381123229fd117c5f4e2d821a696694f6fb # Parent ff4ba1ed4527c0b361c8add03433457a79d272da# Parent 98aaf2cd873fe884c4ee10230053cdf47051c458 merged diff -r ff4ba1ed4527 -r 68b453811232 src/HOL/Tools/res_axioms.ML --- a/src/HOL/Tools/res_axioms.ML Tue Jan 06 09:18:02 2009 -0800 +++ b/src/HOL/Tools/res_axioms.ML Tue Jan 06 21:17:43 2009 +0100 @@ -456,7 +456,7 @@ |> fold (mark_seen o #1) new_facts |> fold_map skolem_def (sort_distinct (Thm.thm_ord o pairself snd) new_thms) |>> map_filter I; - val cache_entries = ParList.map skolem_cnfs defs; + val cache_entries = Par_List.map skolem_cnfs defs; in SOME (fold update_cache cache_entries thy') end end; diff -r ff4ba1ed4527 -r 68b453811232 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Tue Jan 06 09:18:02 2009 -0800 +++ b/src/Pure/Concurrent/future.ML Tue Jan 06 21:17:43 2009 +0100 @@ -137,15 +137,16 @@ change workers (AList.update Thread.equal (Thread.self (), active)); -(* execute *) +(* execute jobs *) fun do_cancel group = (*requires SYNCHRONIZED*) change canceled (insert Task_Queue.eq_group group); -fun execute name (task, group, run) = +fun execute name (task, group, jobs) = let val _ = trace_active (); - val ok = setmp_thread_data (name, task) run (); + val ok = setmp_thread_data (name, task) (fn () => + fold (fn job => fn ok => job ok) jobs (Task_Queue.is_valid group)) (); val _ = SYNCHRONIZED "execute" (fn () => (change queue (Task_Queue.finish task); if ok then () @@ -225,16 +226,15 @@ else ()); -(* future values: fork independent computation *) + +(** futures **) -fun future opt_group deps pri (e: unit -> 'a) = +(* future job: fill result *) + +fun future_job group (e: unit -> 'a) = let - val _ = scheduler_check "future check"; - - val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ()); - val result = ref (NONE: 'a Exn.result option); - val run = Multithreading.with_attributes (Thread.getAttributes ()) + val job = Multithreading.with_attributes (Thread.getAttributes ()) (fn _ => fn ok => let val res = if ok then Exn.capture e () else Exn.Exn Exn.Interrupt; @@ -245,63 +245,88 @@ | Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true) | _ => false); in res_ok end); + in (result, job) end; + +(* fork *) + +fun fork_future opt_group deps pri e = + let + val _ = scheduler_check "future check"; + + val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ()); + val (result, job) = future_job group e; val task = SYNCHRONIZED "future" (fn () => - change_result queue (Task_Queue.enqueue group deps pri run) before notify_all ()); + change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ()); in Future {task = task, group = group, result = result} end; -fun fork e = future NONE [] 0 e; -fun fork_group group e = future (SOME group) [] 0 e; -fun fork_deps deps e = future NONE (map task_of deps) 0 e; -fun fork_pri pri e = future NONE [] pri e; +fun fork e = fork_future NONE [] 0 e; +fun fork_group group e = fork_future (SOME group) [] 0 e; +fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e; +fun fork_pri pri e = fork_future NONE [] pri e; -(* join: retrieve results *) +(* join *) + +fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x); -fun join_results [] = [] - | join_results xs = uninterruptible (fn _ => fn () => - let - val _ = scheduler_check "join check"; - val _ = Multithreading.self_critical () andalso - exists (not o is_finished) xs andalso - error "Cannot join future values within critical section"; +fun join_results xs = + if forall is_finished xs then map get_result xs + else uninterruptible (fn _ => fn () => + let + val _ = scheduler_check "join check"; + val _ = Multithreading.self_critical () andalso + error "Cannot join future values within critical section"; - fun join_loop _ [] = () - | join_loop name tasks = - (case SYNCHRONIZED name (fn () => - change_result queue (Task_Queue.dequeue_towards tasks)) of - NONE => () - | SOME (work, tasks') => (execute name work; join_loop name tasks')); - val _ = - (case thread_data () of - NONE => - (*alien thread -- refrain from contending for resources*) - while exists (not o is_finished) xs - do SYNCHRONIZED "join_thread" (fn () => wait ()) - | SOME (name, task) => - (*proper task -- actively work towards results*) - let - val unfinished = xs |> map_filter - (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE); - val _ = SYNCHRONIZED "join" (fn () => - (change queue (Task_Queue.depend unfinished task); notify_all ())); - val _ = join_loop ("join_loop: " ^ name) unfinished; - val _ = - while exists (not o is_finished) xs - do SYNCHRONIZED "join_task" (fn () => worker_wait ()); - in () end); + fun join_loop _ [] = () + | join_loop name deps = + (case SYNCHRONIZED name (fn () => + change_result queue (Task_Queue.dequeue_towards deps)) of + NONE => () + | SOME (work, deps') => (execute name work; join_loop name deps')); + val _ = + (case thread_data () of + NONE => + (*alien thread -- refrain from contending for resources*) + while not (forall is_finished xs) + do SYNCHRONIZED "join_thread" (fn () => wait ()) + | SOME (name, task) => + (*proper task -- actively work towards results*) + let + val pending = filter_out is_finished xs; + val deps = map task_of pending; + val _ = SYNCHRONIZED "join" (fn () => + (change queue (Task_Queue.depend deps task); notify_all ())); + val _ = join_loop ("join_loop: " ^ name) deps; + val _ = + while not (forall is_finished pending) + do SYNCHRONIZED "join_task" (fn () => worker_wait ()); + in () end); - in xs |> map (fn Future {result = ref (SOME res), ...} => res) end) (); + in map get_result xs end) (); fun join_result x = singleton join_results x; fun join x = Exn.release (join_result x); -fun map f x = - let val task = task_of x - in future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) end; + +(* map *) + +fun map_future f (x as Future {task, group, ...}) = + let + val _ = scheduler_check "map_future check"; + + val (result', job) = future_job group (fn () => f (join x)); + val extended = SYNCHRONIZED "map_future" (fn () => + (case Task_Queue.extend task job (! queue) of + SOME queue' => (queue := queue'; true) + | NONE => false)); + in + if extended then Future {task = task, group = group, result = result'} + else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) + end; -(* misc operations *) +(* cancel *) (*interrupt: permissive signal, may get ignored*) fun interrupt_task id = SYNCHRONIZED "interrupt" @@ -313,7 +338,9 @@ SYNCHRONIZED "cancel" (fn () => (do_cancel (group_of x); notify_all ()))); -(*global join and shutdown*) + +(** global join and shutdown **) + fun shutdown () = if Multithreading.available then (scheduler_check "shutdown check"; @@ -327,6 +354,10 @@ OS.Process.sleep (Time.fromMilliseconds 300)))) else (); + +(*final declarations of this structure!*) +val map = map_future; + end; type 'a future = 'a Future.future; diff -r ff4ba1ed4527 -r 68b453811232 src/Pure/Concurrent/par_list.ML --- a/src/Pure/Concurrent/par_list.ML Tue Jan 06 09:18:02 2009 -0800 +++ b/src/Pure/Concurrent/par_list.ML Tue Jan 06 21:17:43 2009 +0100 @@ -1,5 +1,4 @@ (* Title: Pure/Concurrent/par_list.ML - ID: $Id$ Author: Makarius Parallel list combinators. @@ -24,7 +23,7 @@ val forall: ('a -> bool) -> 'a list -> bool end; -structure ParList: PAR_LIST = +structure Par_List: PAR_LIST = struct fun raw_map f xs = diff -r ff4ba1ed4527 -r 68b453811232 src/Pure/Concurrent/par_list_dummy.ML --- a/src/Pure/Concurrent/par_list_dummy.ML Tue Jan 06 09:18:02 2009 -0800 +++ b/src/Pure/Concurrent/par_list_dummy.ML Tue Jan 06 21:17:43 2009 +0100 @@ -1,11 +1,10 @@ (* Title: Pure/Concurrent/par_list_dummy.ML - ID: $Id$ Author: Makarius Dummy version of parallel list combinators -- plain sequential evaluation. *) -structure ParList: PAR_LIST = +structure Par_List: PAR_LIST = struct val map = map; diff -r ff4ba1ed4527 -r 68b453811232 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Tue Jan 06 09:18:02 2009 -0800 +++ b/src/Pure/Concurrent/task_queue.ML Tue Jan 06 21:17:43 2009 +0100 @@ -13,16 +13,18 @@ type group val eq_group: group * group -> bool val new_group: unit -> group + val is_valid: group -> bool val invalidate_group: group -> unit val str_of_group: group -> string type queue val empty: queue val is_empty: queue -> bool val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue + val extend: task -> (bool -> bool) -> queue -> queue option val depend: task list -> task -> queue -> queue - val dequeue: queue -> (task * group * (unit -> bool)) option * queue + val dequeue: queue -> (task * group * (bool -> bool) list) option * queue val dequeue_towards: task list -> queue -> - (((task * group * (unit -> bool)) * task list) option * queue) + (((task * group * (bool -> bool) list) * task list) option * queue) val interrupt: queue -> task -> unit val interrupt_external: queue -> string -> unit val cancel: queue -> group -> bool @@ -52,6 +54,7 @@ fun new_group () = Group (serial (), ref true); +fun is_valid (Group (_, ref ok)) = ok; fun invalidate_group (Group (_, ok)) = ok := false; fun str_of_group (Group (i, ref ok)) = @@ -61,14 +64,14 @@ (* jobs *) datatype job = - Job of bool -> bool | + Job of (bool -> bool) list | Running of Thread.thread; type jobs = (group * job) Task_Graph.T; fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task); fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task); -fun map_job task f (jobs: jobs) = Task_Graph.map_node task (apsnd f) jobs; +fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs; fun add_job task dep (jobs: jobs) = Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; @@ -96,9 +99,14 @@ val task = new_task pri; val groups' = Inttab.cons_list (gid, task) groups; val jobs' = jobs - |> Task_Graph.new_node (task, (group, Job job)) |> fold (add_job task) deps; + |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps; in (task, make_queue groups' jobs') end; +fun extend task job (Queue {groups, jobs}) = + (case try (get_job jobs) task of + SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs)) + | _ => NONE); + fun depend deps task (Queue {groups, jobs}) = make_queue groups (fold (add_job_acyclic task) deps jobs); @@ -109,14 +117,13 @@ fun dequeue_result NONE queue = (NONE, queue) | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) = - (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs)); + (SOME result, make_queue groups (set_job task (Running (Thread.self ())) jobs)); in fun dequeue (queue as Queue {jobs, ...}) = let - fun ready (task, ((group as Group (_, ref ok), Job job), ([], _))) = - SOME (task, group, (fn () => job ok)) + fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list) | ready _ = NONE; in dequeue_result (Task_Graph.get_first ready jobs) queue end; @@ -126,8 +133,8 @@ fun ready task = (case Task_Graph.get_node jobs task of - (group as Group (_, ref ok), Job job) => - if null (Task_Graph.imm_preds jobs task) then SOME (task, group, (fn () => job ok)) + (group, Job list) => + if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list) else NONE | _ => NONE); in diff -r ff4ba1ed4527 -r 68b453811232 src/Pure/Isar/isar.ML --- a/src/Pure/Isar/isar.ML Tue Jan 06 09:18:02 2009 -0800 +++ b/src/Pure/Isar/isar.ML Tue Jan 06 21:17:43 2009 +0100 @@ -133,11 +133,12 @@ (case Source.get_single (Source.set_prompt Source.default_prompt src) of NONE => if secure then quit () else () | SOME (tr, src') => if op >> tr orelse check_secure () then raw_loop secure src' else ()) - handle exn => (Output.error_msg (Toplevel.exn_message exn) - handle crash => - (CRITICAL (fn () => change crashes (cons crash)); - warning "Recovering after Isar toplevel crash -- see also Isar.crashes"); - raw_loop secure src) + handle exn => + (Output.error_msg (Toplevel.exn_message exn) + handle crash => + (CRITICAL (fn () => change crashes (cons crash)); + warning "Recovering from Isar toplevel crash -- see also Isar.crashes"); + raw_loop secure src) end; in diff -r ff4ba1ed4527 -r 68b453811232 src/Pure/context.ML --- a/src/Pure/context.ML Tue Jan 06 09:18:02 2009 -0800 +++ b/src/Pure/context.ML Tue Jan 06 21:17:43 2009 +0100 @@ -132,7 +132,15 @@ val copy_data = Datatab.map' invoke_copy; val extend_data = Datatab.map' invoke_extend; -fun merge_data pp = Datatab.join (invoke_merge pp) o pairself extend_data; + +fun merge_data pp (data1, data2) = + Datatab.keys (Datatab.merge (K true) (data1, data2)) + |> Par_List.map (fn k => + (case (Datatab.lookup data1 k, Datatab.lookup data2 k) of + (SOME x, NONE) => (k, invoke_extend k x) + | (NONE, SOME y) => (k, invoke_extend k y) + | (SOME x, SOME y) => (k, invoke_merge pp k (invoke_extend k x, invoke_extend k y)))) + |> Datatab.make; end;