# HG changeset patch # User wenzelm # Date 1229447071 -3600 # Node ID 970d746274d5953d463aa31920cf01d35a1707e7 # Parent d41182a8135ccebc6d5fa11e4dd48f2a014830e4# Parent ce6f21913e54e4673d261cab9f134e6fe1067bdc merged diff -r d41182a8135c -r 970d746274d5 src/Pure/Concurrent/ROOT.ML --- a/src/Pure/Concurrent/ROOT.ML Tue Dec 16 08:46:07 2008 +0100 +++ b/src/Pure/Concurrent/ROOT.ML Tue Dec 16 18:04:31 2008 +0100 @@ -1,15 +1,12 @@ (* Title: Pure/Concurrent/ROOT.ML - ID: $Id$ + Author: Makarius Concurrency within the ML runtime. *) -val future_scheduler = ref true; - use "simple_thread.ML"; use "synchronized.ML"; use "mailbox.ML"; -use "schedule.ML"; use "task_queue.ML"; use "future.ML"; use "par_list.ML"; diff -r d41182a8135c -r 970d746274d5 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Tue Dec 16 08:46:07 2008 +0100 +++ b/src/Pure/Concurrent/future.ML Tue Dec 16 18:04:31 2008 +0100 @@ -1,5 +1,4 @@ (* Title: Pure/Concurrent/future.ML - ID: $Id$ Author: Makarius Future values. @@ -28,8 +27,8 @@ signature FUTURE = sig val enabled: unit -> bool - type task = TaskQueue.task - type group = TaskQueue.group + type task = Task_Queue.task + type group = Task_Queue.group val thread_data: unit -> (string * task) option type 'a future val task_of: 'a future -> task @@ -40,12 +39,11 @@ val fork: (unit -> 'a) -> 'a future val fork_group: group -> (unit -> 'a) -> 'a future val fork_deps: 'b future list -> (unit -> 'a) -> 'a future - val fork_background: (unit -> 'a) -> 'a future + val fork_pri: int -> (unit -> 'a) -> 'a future val join_results: 'a future list -> 'a Exn.result list val join_result: 'a future -> 'a Exn.result val join: 'a future -> 'a val map: ('a -> 'b) -> 'a future -> 'b future - val focus: task list -> unit val interrupt_task: string -> unit val cancel: 'a future -> unit val shutdown: unit -> unit @@ -57,14 +55,14 @@ (** future values **) fun enabled () = - ! future_scheduler andalso Multithreading.enabled () andalso + Multithreading.enabled () andalso not (Multithreading.self_critical ()); (* identifiers *) -type task = TaskQueue.task; -type group = TaskQueue.group; +type task = Task_Queue.task; +type group = Task_Queue.group; local val tag = Universal.tag () : (string * task) option Universal.tag in fun thread_data () = the_default NONE (Thread.getLocal tag); @@ -86,8 +84,8 @@ fun is_finished x = is_some (peek x); fun value x = Future - {task = TaskQueue.new_task (), - group = TaskQueue.new_group (), + {task = Task_Queue.new_task 0, + group = Task_Queue.new_group (), result = ref (SOME (Exn.Result x))}; @@ -96,12 +94,12 @@ (* global state *) -val queue = ref TaskQueue.empty; +val queue = ref Task_Queue.empty; val next = ref 0; val workers = ref ([]: (Thread.thread * bool) list); val scheduler = ref (NONE: Thread.thread option); val excessive = ref 0; -val canceled = ref ([]: TaskQueue.group list); +val canceled = ref ([]: Task_Queue.group list); val do_shutdown = ref false; @@ -114,15 +112,11 @@ fun SYNCHRONIZED name = SimpleThread.synchronized name lock; -fun wait name = (*requires SYNCHRONIZED*) - (Multithreading.tracing 3 (fn () => name ^ ": wait ..."); +fun wait () = (*requires SYNCHRONIZED*) ConditionVar.wait (cond, lock); - Multithreading.tracing 3 (fn () => name ^ ": ... continue")); -fun wait_timeout name timeout = (*requires SYNCHRONIZED*) - (Multithreading.tracing 3 (fn () => name ^ ": wait ..."); +fun wait_timeout timeout = (*requires SYNCHRONIZED*) ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)); - Multithreading.tracing 3 (fn () => name ^ ": ... continue")); fun notify_all () = (*requires SYNCHRONIZED*) ConditionVar.broadcast cond; @@ -150,9 +144,9 @@ val _ = trace_active (); val ok = setmp_thread_data (name, task) run (); val _ = SYNCHRONIZED "execute" (fn () => - (change queue (TaskQueue.finish task); + (change queue (Task_Queue.finish task); if ok then () - else if TaskQueue.cancel (! queue) group then () + else if Task_Queue.cancel (! queue) group then () else change canceled (cons group); notify_all ())); in () end; @@ -160,23 +154,23 @@ (* worker threads *) -fun worker_wait name = (*requires SYNCHRONIZED*) - (change_active false; wait name; change_active true); +fun worker_wait () = (*requires SYNCHRONIZED*) + (change_active false; wait (); change_active true); -fun worker_next name = (*requires SYNCHRONIZED*) +fun worker_next () = (*requires SYNCHRONIZED*) if ! excessive > 0 then (dec excessive; change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); notify_all (); NONE) else - (case change_result queue TaskQueue.dequeue of - NONE => (worker_wait name; worker_next name) + (case change_result queue Task_Queue.dequeue of + NONE => (worker_wait (); worker_next ()) | some => some); fun worker_loop name = - (case SYNCHRONIZED name (fn () => worker_next name) of - NONE => Multithreading.tracing 3 (fn () => name ^ ": exit") + (case SYNCHRONIZED name worker_next of + NONE => () | SOME work => (execute name work; worker_loop name)); fun worker_start name = (*requires SYNCHRONIZED*) @@ -204,27 +198,25 @@ else (); (*canceled groups*) - val _ = change canceled (filter_out (TaskQueue.cancel (! queue))); + val _ = change canceled (filter_out (Task_Queue.cancel (! queue))); (*shutdown*) val continue = not (! do_shutdown andalso null (! workers)); val _ = if continue then () else scheduler := NONE; val _ = notify_all (); - val _ = wait_timeout "scheduler" (Time.fromSeconds 3); + val _ = wait_timeout (Time.fromSeconds 3); in continue end; fun scheduler_loop () = - (while SYNCHRONIZED "scheduler" scheduler_next do (); - Multithreading.tracing 3 (fn () => "scheduler: exit")); + while SYNCHRONIZED "scheduler" scheduler_next do (); fun scheduler_active () = (*requires SYNCHRONIZED*) (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); fun scheduler_check name = SYNCHRONIZED name (fn () => if not (scheduler_active ()) then - (Multithreading.tracing 3 (fn () => "scheduler: fork"); - do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop)) + (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop)) else if ! do_shutdown then error "Scheduler shutdown in progress" else ()); @@ -235,7 +227,7 @@ let val _ = scheduler_check "future check"; - val group = (case opt_group of SOME group => group | NONE => TaskQueue.new_group ()); + val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ()); val result = ref (NONE: 'a Exn.result option); val run = Multithreading.with_attributes (Thread.getAttributes ()) @@ -246,18 +238,18 @@ val res_ok = (case res of Exn.Result _ => true - | Exn.Exn Exn.Interrupt => (TaskQueue.invalidate_group group; true) + | Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true) | _ => false); in res_ok end); val task = SYNCHRONIZED "future" (fn () => - change_result queue (TaskQueue.enqueue group deps pri run) before notify_all ()); + change_result queue (Task_Queue.enqueue group deps pri run) before notify_all ()); in Future {task = task, group = group, result = result} end; -fun fork e = future NONE [] true e; -fun fork_group group e = future (SOME group) [] true e; -fun fork_deps deps e = future NONE (map task_of deps) true e; -fun fork_background e = future NONE [] false e; +fun fork e = future NONE [] 0 e; +fun fork_group group e = future (SOME group) [] 0 e; +fun fork_deps deps e = future NONE (map task_of deps) 0 e; +fun fork_pri pri e = future NONE [] pri e; (* join: retrieve results *) @@ -273,7 +265,7 @@ fun join_loop _ [] = () | join_loop name tasks = (case SYNCHRONIZED name (fn () => - change_result queue (TaskQueue.dequeue_towards tasks)) of + change_result queue (Task_Queue.dequeue_towards tasks)) of NONE => () | SOME (work, tasks') => (execute name work; join_loop name tasks')); val _ = @@ -281,18 +273,18 @@ NONE => (*alien thread -- refrain from contending for resources*) while exists (not o is_finished) xs - do SYNCHRONIZED "join_thread" (fn () => wait "join_thread") + do SYNCHRONIZED "join_thread" (fn () => wait ()) | SOME (name, task) => (*proper task -- actively work towards results*) let val unfinished = xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE); val _ = SYNCHRONIZED "join" (fn () => - (change queue (TaskQueue.depend unfinished task); notify_all ())); + (change queue (Task_Queue.depend unfinished task); notify_all ())); val _ = join_loop ("join_loop: " ^ name) unfinished; val _ = while exists (not o is_finished) xs - do SYNCHRONIZED "join_task" (fn () => worker_wait "join_task"); + do SYNCHRONIZED "join_task" (fn () => worker_wait ()); in () end); in xs |> map (fn Future {result = ref (SOME res), ...} => res) end) (); @@ -300,18 +292,16 @@ fun join_result x = singleton join_results x; fun join x = Exn.release (join_result x); -fun map f x = fork_deps [x] (fn () => f (join x)); +fun map f x = + let val task = task_of x + in future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) end; (* misc operations *) -(*focus: collection of high-priority task*) -fun focus tasks = SYNCHRONIZED "focus" (fn () => - change queue (TaskQueue.focus tasks)); - (*interrupt: permissive signal, may get ignored*) fun interrupt_task id = SYNCHRONIZED "interrupt" - (fn () => TaskQueue.interrupt_external (! queue) id); + (fn () => Task_Queue.interrupt_external (! queue) id); (*cancel: present and future group members will be interrupted eventually*) fun cancel x = @@ -324,12 +314,12 @@ if Multithreading.available then (scheduler_check "shutdown check"; SYNCHRONIZED "shutdown" (fn () => - (while not (scheduler_active ()) do wait "shutdown: scheduler inactive"; - while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join"; + (while not (scheduler_active ()) do wait (); + while not (Task_Queue.is_empty (! queue)) do wait (); do_shutdown := true; notify_all (); - while not (null (! workers)) do wait "shutdown: workers"; - while scheduler_active () do wait "shutdown: scheduler still active"; + while not (null (! workers)) do wait (); + while scheduler_active () do wait (); OS.Process.sleep (Time.fromMilliseconds 300)))) else (); diff -r d41182a8135c -r 970d746274d5 src/Pure/Concurrent/par_list.ML --- a/src/Pure/Concurrent/par_list.ML Tue Dec 16 08:46:07 2008 +0100 +++ b/src/Pure/Concurrent/par_list.ML Tue Dec 16 18:04:31 2008 +0100 @@ -30,7 +30,7 @@ fun raw_map f xs = if Future.enabled () then let - val group = TaskQueue.new_group (); + val group = Task_Queue.new_group (); val futures = map (fn x => Future.fork_group group (fn () => f x)) xs; val _ = List.app (ignore o Future.join_result) futures; in Future.join_results futures end diff -r d41182a8135c -r 970d746274d5 src/Pure/Concurrent/schedule.ML --- a/src/Pure/Concurrent/schedule.ML Tue Dec 16 08:46:07 2008 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,85 +0,0 @@ -(* Title: Pure/Concurrent/schedule.ML - ID: $Id$ - Author: Makarius - -Scheduling -- multiple threads working on a queue of tasks. -*) - -signature SCHEDULE = -sig - datatype 'a task = - Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate; - val schedule: int -> ('a -> 'a task * 'a) -> 'a -> exn list -end; - -structure Schedule: SCHEDULE = -struct - -datatype 'a task = - Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate; - -fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks => - let - (*synchronized execution*) - val lock = Mutex.mutex (); - fun SYNCHRONIZED e = - let - val _ = Mutex.lock lock; - val res = Exn.capture e (); - val _ = Mutex.unlock lock; - in Exn.release res end; - - (*wakeup condition*) - val wakeup = ConditionVar.conditionVar (); - fun wakeup_all () = ConditionVar.broadcast wakeup; - fun wait () = ConditionVar.wait (wakeup, lock); - fun wait_timeout () = - ConditionVar.waitUntil (wakeup, lock, Time.+ (Time.now (), Time.fromSeconds 1)); - - (*queue of tasks*) - val queue = ref tasks; - val active = ref 0; - fun trace_active () = Multithreading.tracing 1 (fn () => - "SCHEDULE: " ^ string_of_int (! active) ^ " active"); - fun dequeue () = - (case change_result queue next_task of - Wait => - (dec active; trace_active (); - wait (); - inc active; trace_active (); - dequeue ()) - | next => next); - - (*pool of running threads*) - val status = ref ([]: exn list); - val running = ref ([]: Thread.thread list); - fun start f = (inc active; change running (cons (SimpleThread.fork false f))); - fun stop () = (dec active; change running (remove Thread.equal (Thread.self ()))); - - (*worker thread*) - fun worker () = - (case SYNCHRONIZED dequeue of - Task {body, cont, fail} => - (case Exn.capture (restore_attributes body) () of - Exn.Result () => - (SYNCHRONIZED (fn () => (change queue cont; wakeup_all ())); worker ()) - | Exn.Exn exn => - SYNCHRONIZED (fn () => - (change status (cons exn); change queue fail; stop (); wakeup_all ()))) - | Terminate => SYNCHRONIZED (fn () => (stop (); wakeup_all ()))); - - (*main control: fork and wait*) - fun fork 0 = () - | fork k = (start worker; fork (k - 1)); - val _ = SYNCHRONIZED (fn () => - (fork (Int.max (n, 1)); - while not (null (! running)) do - (trace_active (); - if not (null (! status)) - then (List.app SimpleThread.interrupt (! running)) - else (); - wait_timeout ()))); - - in ! status end); - -end; diff -r d41182a8135c -r 970d746274d5 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Tue Dec 16 08:46:07 2008 +0100 +++ b/src/Pure/Concurrent/task_queue.ML Tue Dec 16 18:04:31 2008 +0100 @@ -1,5 +1,4 @@ (* Title: Pure/Concurrent/task_queue.ML - ID: $Id$ Author: Makarius Ordered queue of grouped tasks. @@ -8,7 +7,8 @@ signature TASK_QUEUE = sig eqtype task - val new_task: unit -> task + val new_task: int -> task + val pri_of_task: task -> int val str_of_task: task -> string eqtype group val new_group: unit -> group @@ -17,9 +17,8 @@ type queue val empty: queue val is_empty: queue -> bool - val enqueue: group -> task list -> bool -> (bool -> bool) -> queue -> task * queue + val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue val depend: task list -> task -> queue -> queue - val focus: task list -> queue -> queue val dequeue: queue -> (task * group * (unit -> bool)) option * queue val dequeue_towards: task list -> queue -> (((task * group * (unit -> bool)) * task list) option * queue) @@ -29,20 +28,27 @@ val cancel: queue -> group -> bool end; -structure TaskQueue: TASK_QUEUE = +structure Task_Queue: TASK_QUEUE = struct -(* identifiers *) +(* tasks *) + +datatype task = Task of int * serial; +fun new_task pri = Task (pri, serial ()); -datatype task = Task of serial; -fun new_task () = Task (serial ()); +fun pri_of_task (Task (pri, _)) = pri; +fun str_of_task (Task (_, i)) = string_of_int i; -fun str_of_task (Task i) = string_of_int i; +fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2); +structure Task_Graph = GraphFun(type key = task val ord = task_ord); +(* groups *) + datatype group = Group of serial * bool ref; fun new_group () = Group (serial (), ref true); + fun invalidate_group (Group (_, ok)) = ok := false; fun str_of_group (Group (i, ref ok)) = @@ -52,53 +58,46 @@ (* jobs *) datatype job = - Job of bool * (bool -> bool) | (*priority, job: status -> status*) + Job of bool -> bool | Running of Thread.thread; -type jobs = (group * job) IntGraph.T; +type jobs = (group * job) Task_Graph.T; -fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id); -fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id); -fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs; +fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task); +fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task); +fun map_job task f (jobs: jobs) = Task_Graph.map_node task (apsnd f) jobs; -fun add_job (Task id) (Task dep) (jobs: jobs) = - IntGraph.add_edge (dep, id) jobs handle IntGraph.UNDEF _ => jobs; +fun add_job task dep (jobs: jobs) = + Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; -fun add_job_acyclic (Task id) (Task dep) (jobs: jobs) = - IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs; - -fun check_job (jobs: jobs) (task as Task id) = - if can (IntGraph.get_node jobs) id then SOME task else NONE; +fun add_job_acyclic task dep (jobs: jobs) = + Task_Graph.add_edge_acyclic (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; (* queue of grouped jobs *) datatype queue = Queue of {groups: task list Inttab.table, (*groups with presently active members*) - jobs: jobs, (*job dependency graph*) - focus: task list}; (*particular collection of high-priority tasks*) + jobs: jobs}; (*job dependency graph*) -fun make_queue groups jobs focus = Queue {groups = groups, jobs = jobs, focus = focus}; +fun make_queue groups jobs = Queue {groups = groups, jobs = jobs}; -val empty = make_queue Inttab.empty IntGraph.empty []; -fun is_empty (Queue {jobs, ...}) = IntGraph.is_empty jobs; +val empty = make_queue Inttab.empty Task_Graph.empty; +fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs; (* enqueue *) -fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, focus}) = +fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs}) = let - val task as Task id = new_task (); + val task = new_task pri; val groups' = Inttab.cons_list (gid, task) groups; val jobs' = jobs - |> IntGraph.new_node (id, (group, Job (pri, job))) |> fold (add_job task) deps; - in (task, make_queue groups' jobs' focus) end; + |> Task_Graph.new_node (task, (group, Job job)) |> fold (add_job task) deps; + in (task, make_queue groups' jobs') end; -fun depend deps task (Queue {groups, jobs, focus}) = - make_queue groups (fold (add_job_acyclic task) deps jobs) focus; - -fun focus tasks (Queue {groups, jobs, ...}) = - make_queue groups jobs (map_filter (check_job jobs) tasks); +fun depend deps task (Queue {groups, jobs}) = + make_queue groups (fold (add_job_acyclic task) deps jobs); (* dequeue *) @@ -106,38 +105,30 @@ local fun dequeue_result NONE queue = (NONE, queue) - | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs, focus}) = - (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs) focus); - -fun dequeue_global req_pri (queue as Queue {jobs, ...}) = - let - fun ready (id, ((group as Group (_, ref ok), Job (pri, job)), ([], _))) = - if pri = req_pri then SOME (Task id, group, (fn () => job ok)) else NONE - | ready _ = NONE; - in dequeue_result (IntGraph.get_first ready jobs) queue end; - -fun dequeue_local focus (queue as Queue {jobs, ...}) = - let - fun ready id = - (case IntGraph.get_node jobs id of - (group as Group (_, ref ok), Job (_, job)) => - if null (IntGraph.imm_preds jobs id) then SOME (Task id, group, (fn () => job ok)) - else NONE - | _ => NONE); - val ids = map (fn Task id => id) focus; - in dequeue_result (get_first ready (IntGraph.all_preds jobs ids)) queue end; + | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) = + (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs)); in -fun dequeue (queue as Queue {focus, ...}) = - (case dequeue_local focus queue of - (NONE, _) => - (case dequeue_global true queue of (NONE, _) => dequeue_global false queue | res => res) - | res => res); +fun dequeue (queue as Queue {jobs, ...}) = + let + fun ready (task, ((group as Group (_, ref ok), Job job), ([], _))) = + SOME (task, group, (fn () => job ok)) + | ready _ = NONE; + in dequeue_result (Task_Graph.get_first ready jobs) queue end; fun dequeue_towards tasks (queue as Queue {jobs, ...}) = - let val tasks' = map_filter (check_job jobs) tasks in - (case dequeue_local tasks' queue of + let + val tasks' = filter (can (Task_Graph.get_node jobs)) tasks; + + fun ready task = + (case Task_Graph.get_node jobs task of + (group as Group (_, ref ok), Job job) => + if null (Task_Graph.imm_preds jobs task) then SOME (task, group, (fn () => job ok)) + else NONE + | _ => NONE); + in + (case dequeue_result (get_first ready (Task_Graph.all_preds jobs tasks')) queue of (NONE, queue') => (NONE, queue') | (SOME work, queue') => (SOME (work, tasks'), queue')) end; @@ -150,8 +141,13 @@ fun interrupt (Queue {jobs, ...}) task = (case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ()); -fun interrupt_external queue str = - (case Int.fromString str of SOME id => interrupt queue (Task id) | NONE => ()); +fun interrupt_external (queue as Queue {jobs, ...}) str = + (case Int.fromString str of + SOME i => + (case Task_Graph.get_first + (fn (task as Task (_, j), _) => if i = j then SOME task else NONE) jobs + of SOME task => interrupt queue task | NONE => ()) + | NONE => ()); (* misc operations *) @@ -164,12 +160,11 @@ val _ = List.app SimpleThread.interrupt running; in null running end; -fun finish (task as Task id) (Queue {groups, jobs, focus}) = +fun finish task (Queue {groups, jobs}) = let val Group (gid, _) = get_group jobs task; val groups' = Inttab.remove_list (op =) (gid, task) groups; - val jobs' = IntGraph.del_node id jobs; - val focus' = remove (op =) task focus; - in make_queue groups' jobs' focus' end; + val jobs' = Task_Graph.del_node task jobs; + in make_queue groups' jobs' end; end; diff -r d41182a8135c -r 970d746274d5 src/Pure/IsaMakefile --- a/src/Pure/IsaMakefile Tue Dec 16 08:46:07 2008 +0100 +++ b/src/Pure/IsaMakefile Tue Dec 16 18:04:31 2008 +0100 @@ -23,26 +23,24 @@ $(OUT)/Pure: Concurrent/ROOT.ML Concurrent/future.ML \ Concurrent/mailbox.ML Concurrent/par_list.ML \ - Concurrent/par_list_dummy.ML Concurrent/schedule.ML \ - Concurrent/simple_thread.ML Concurrent/synchronized.ML \ - Concurrent/task_queue.ML General/ROOT.ML General/alist.ML \ - General/balanced_tree.ML General/basics.ML General/buffer.ML \ - General/file.ML General/graph.ML General/heap.ML General/integer.ML \ - General/lazy.ML General/markup.ML General/name_space.ML \ - General/ord_list.ML General/output.ML General/path.ML \ - General/position.ML General/pretty.ML General/print_mode.ML \ - General/properties.ML General/queue.ML General/scan.ML \ - General/secure.ML General/seq.ML General/source.ML General/stack.ML \ - General/symbol.ML General/symbol_pos.ML General/table.ML \ - General/url.ML General/xml.ML General/yxml.ML Isar/ROOT.ML \ - Isar/antiquote.ML Isar/args.ML Isar/attrib.ML Isar/auto_bind.ML \ - Isar/calculation.ML Isar/class.ML Isar/code.ML Isar/code_unit.ML \ - Isar/constdefs.ML Isar/context_rules.ML Isar/element.ML \ - Isar/expression.ML \ - Isar/find_theorems.ML Isar/instance.ML Isar/isar.ML Isar/isar_cmd.ML \ - Isar/isar_syn.ML Isar/local_defs.ML Isar/local_syntax.ML \ - Isar/local_theory.ML Isar/locale.ML Isar/method.ML Isar/net_rules.ML \ - Isar/new_locale.ML \ + Concurrent/par_list_dummy.ML Concurrent/simple_thread.ML \ + Concurrent/synchronized.ML Concurrent/task_queue.ML General/ROOT.ML \ + General/alist.ML General/balanced_tree.ML General/basics.ML \ + General/buffer.ML General/file.ML General/graph.ML General/heap.ML \ + General/integer.ML General/lazy.ML General/markup.ML \ + General/name_space.ML General/ord_list.ML General/output.ML \ + General/path.ML General/position.ML General/pretty.ML \ + General/print_mode.ML General/properties.ML General/queue.ML \ + General/scan.ML General/secure.ML General/seq.ML General/source.ML \ + General/stack.ML General/symbol.ML General/symbol_pos.ML \ + General/table.ML General/url.ML General/xml.ML General/yxml.ML \ + Isar/ROOT.ML Isar/antiquote.ML Isar/args.ML Isar/attrib.ML \ + Isar/auto_bind.ML Isar/calculation.ML Isar/class.ML Isar/code.ML \ + Isar/code_unit.ML Isar/constdefs.ML Isar/context_rules.ML \ + Isar/element.ML Isar/expression.ML Isar/find_theorems.ML \ + Isar/instance.ML Isar/isar.ML Isar/isar_cmd.ML Isar/isar_syn.ML \ + Isar/local_defs.ML Isar/local_syntax.ML Isar/local_theory.ML \ + Isar/locale.ML Isar/method.ML Isar/net_rules.ML Isar/new_locale.ML \ Isar/object_logic.ML Isar/obtain.ML Isar/outer_keyword.ML \ Isar/outer_lex.ML Isar/outer_parse.ML Isar/outer_syntax.ML \ Isar/overloading.ML Isar/proof.ML Isar/proof_context.ML \ @@ -76,17 +74,16 @@ Syntax/syn_trans.ML Syntax/syntax.ML Syntax/type_ext.ML Thy/html.ML \ Thy/latex.ML Thy/present.ML Thy/term_style.ML Thy/thm_deps.ML \ Thy/thy_edit.ML Thy/thy_header.ML Thy/thy_info.ML Thy/thy_load.ML \ - Thy/thy_output.ML Tools/ROOT.ML Tools/invoke.ML \ - Tools/isabelle_process.ML Tools/named_thms.ML \ - Tools/xml_syntax.ML assumption.ML axclass.ML codegen.ML config.ML \ - conjunction.ML consts.ML context.ML context_position.ML conv.ML \ - defs.ML display.ML drule.ML envir.ML facts.ML goal.ML \ - interpretation.ML library.ML logic.ML meta_simplifier.ML more_thm.ML \ - morphism.ML name.ML net.ML old_goals.ML pattern.ML primitive_defs.ML \ - proofterm.ML pure_setup.ML pure_thy.ML search.ML sign.ML \ - simplifier.ML sorts.ML subgoal.ML tactic.ML tctical.ML term.ML \ - term_subst.ML theory.ML thm.ML type.ML type_infer.ML unify.ML \ - variable.ML ../Tools/quickcheck.ML + Thy/thy_output.ML Tools/ROOT.ML Tools/invoke.ML \ + Tools/isabelle_process.ML Tools/named_thms.ML Tools/xml_syntax.ML \ + assumption.ML axclass.ML codegen.ML config.ML conjunction.ML \ + consts.ML context.ML context_position.ML conv.ML defs.ML display.ML \ + drule.ML envir.ML facts.ML goal.ML interpretation.ML library.ML \ + logic.ML meta_simplifier.ML more_thm.ML morphism.ML name.ML net.ML \ + old_goals.ML pattern.ML primitive_defs.ML proofterm.ML pure_setup.ML \ + pure_thy.ML search.ML sign.ML simplifier.ML sorts.ML subgoal.ML \ + tactic.ML tctical.ML term.ML term_subst.ML theory.ML thm.ML type.ML \ + type_infer.ML unify.ML variable.ML ../Tools/quickcheck.ML @./mk diff -r d41182a8135c -r 970d746274d5 src/Pure/Isar/toplevel.ML --- a/src/Pure/Isar/toplevel.ML Tue Dec 16 08:46:07 2008 +0100 +++ b/src/Pure/Isar/toplevel.ML Tue Dec 16 18:04:31 2008 +0100 @@ -718,7 +718,7 @@ val future_proof = Proof.future_proof (fn prf => - Future.fork_background (fn () => + Future.fork_pri 1 (fn () => let val (states, State (result_node, _)) = (case st' of State (SOME (Proof (_, (_, orig_gthy)), exit), prev) => State (SOME (Proof (ProofNode.init prf, (finish, orig_gthy)), exit), prev)) diff -r d41182a8135c -r 970d746274d5 src/Pure/Thy/thy_info.ML --- a/src/Pure/Thy/thy_info.ML Tue Dec 16 08:46:07 2008 +0100 +++ b/src/Pure/Thy/thy_info.ML Tue Dec 16 18:04:31 2008 +0100 @@ -315,7 +315,13 @@ datatype task = Task of (unit -> unit) | Finished | Running; fun task_finished Finished = true | task_finished _ = false; -fun future_schedule task_graph = +local + +fun schedule_seq tasks = + Graph.topological_order tasks + |> List.app (fn name => (case Graph.get_node tasks name of Task body => body () | _ => ())); + +fun schedule_futures task_graph = let val tasks = Graph.topological_order task_graph |> map_filter (fn name => (case Graph.get_node task_graph name of Task body => SOME (name, body) | _ => NONE)); @@ -339,45 +345,14 @@ val proof_results = PureThy.join_proofs (map_filter (try get_theory o #1) tasks); in ignore (Exn.release_all (thy_results @ proof_results)) end; -local - -fun max_task (name, (Task body, m)) NONE = SOME (name: string, (body, m)) - | max_task (name, (Task body, m)) (task' as SOME (name', (_, m'))) = - if m > m' orelse m = m' andalso name < name' then SOME (name, (body, m)) else task' - | max_task _ task' = task'; - -fun next_task G = - let - val tasks = Graph.minimals G |> map (fn name => - (name, (Graph.get_node G name, length (Graph.imm_succs G name)))); - val finished = filter (task_finished o fst o snd) tasks; - in - if not (null finished) then next_task (Graph.del_nodes (map fst finished) G) - else if null tasks then (Schedule.Terminate, G) - else - (case fold max_task tasks NONE of - NONE => (Schedule.Wait, G) - | SOME (name, (body, _)) => - (Schedule.Task {body = PrintMode.closure body, - cont = Graph.del_nodes [name], fail = K Graph.empty}, - Graph.map_node name (K Running) G)) - end; - -fun schedule_seq tasks = - Graph.topological_order tasks - |> List.app (fn name => (case Graph.get_node tasks name of Task body => body () | _ => ())); - in fun schedule_tasks tasks n = - let val m = Multithreading.max_threads_value () in - if m <= 1 then schedule_seq tasks - else if Multithreading.self_critical () then + if not (Multithreading.enabled ()) then schedule_seq tasks + else if Multithreading.self_critical () then (warning (loader_msg "no multithreading within critical section" []); schedule_seq tasks) - else if Future.enabled () then future_schedule tasks - else ignore (Exn.release_all (map Exn.Exn (Schedule.schedule (Int.min (m, n)) next_task tasks))) - end; + else schedule_futures tasks; end; diff -r d41182a8135c -r 970d746274d5 src/Pure/goal.ML --- a/src/Pure/goal.ML Tue Dec 16 08:46:07 2008 +0100 +++ b/src/Pure/goal.ML Tue Dec 16 18:04:31 2008 +0100 @@ -179,7 +179,7 @@ val res = if immediate orelse #maxidx (Thm.rep_cterm stmt) >= 0 orelse not (Future.enabled ()) then result () - else future_result ctxt' (Future.fork_background result) (Thm.term_of stmt); + else future_result ctxt' (Future.fork_pri 1 result) (Thm.term_of stmt); in Conjunction.elim_balanced (length props) res |> map (Assumption.export false ctxt' ctxt) diff -r d41182a8135c -r 970d746274d5 src/Pure/pure_setup.ML --- a/src/Pure/pure_setup.ML Tue Dec 16 08:46:07 2008 +0100 +++ b/src/Pure/pure_setup.ML Tue Dec 16 18:04:31 2008 +0100 @@ -1,5 +1,4 @@ (* Title: Pure/pure_setup.ML - ID: $Id$ Author: Makarius Pure theory and ML toplevel setup. @@ -28,8 +27,8 @@ (* ML toplevel pretty printing *) -install_pp (make_pp ["TaskQueue", "task"] (Pretty.pprint o Pretty.str o TaskQueue.str_of_task)); -install_pp (make_pp ["TaskQueue", "group"] (Pretty.pprint o Pretty.str o TaskQueue.str_of_group)); +install_pp (make_pp ["Task_Queue", "task"] (Pretty.pprint o Pretty.str o Task_Queue.str_of_task)); +install_pp (make_pp ["Task_Queue", "group"] (Pretty.pprint o Pretty.str o Task_Queue.str_of_group)); install_pp (make_pp ["Position", "T"] (Pretty.pprint o Pretty.enum "," "{" "}" o map (fn (x, y) => Pretty.str (x ^ "=" ^ y)) o Position.properties_of)); install_pp (make_pp ["Thm", "thm"] ProofDisplay.pprint_thm);