--- a/src/Pure/Concurrent/ROOT.ML Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/Concurrent/ROOT.ML Tue Dec 16 18:04:31 2008 +0100
@@ -1,15 +1,12 @@
(* Title: Pure/Concurrent/ROOT.ML
- ID: $Id$
+ Author: Makarius
Concurrency within the ML runtime.
*)
-val future_scheduler = ref true;
-
use "simple_thread.ML";
use "synchronized.ML";
use "mailbox.ML";
-use "schedule.ML";
use "task_queue.ML";
use "future.ML";
use "par_list.ML";
--- a/src/Pure/Concurrent/future.ML Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/Concurrent/future.ML Tue Dec 16 18:04:31 2008 +0100
@@ -1,5 +1,4 @@
(* Title: Pure/Concurrent/future.ML
- ID: $Id$
Author: Makarius
Future values.
@@ -28,8 +27,8 @@
signature FUTURE =
sig
val enabled: unit -> bool
- type task = TaskQueue.task
- type group = TaskQueue.group
+ type task = Task_Queue.task
+ type group = Task_Queue.group
val thread_data: unit -> (string * task) option
type 'a future
val task_of: 'a future -> task
@@ -40,12 +39,11 @@
val fork: (unit -> 'a) -> 'a future
val fork_group: group -> (unit -> 'a) -> 'a future
val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
- val fork_background: (unit -> 'a) -> 'a future
+ val fork_pri: int -> (unit -> 'a) -> 'a future
val join_results: 'a future list -> 'a Exn.result list
val join_result: 'a future -> 'a Exn.result
val join: 'a future -> 'a
val map: ('a -> 'b) -> 'a future -> 'b future
- val focus: task list -> unit
val interrupt_task: string -> unit
val cancel: 'a future -> unit
val shutdown: unit -> unit
@@ -57,14 +55,14 @@
(** future values **)
fun enabled () =
- ! future_scheduler andalso Multithreading.enabled () andalso
+ Multithreading.enabled () andalso
not (Multithreading.self_critical ());
(* identifiers *)
-type task = TaskQueue.task;
-type group = TaskQueue.group;
+type task = Task_Queue.task;
+type group = Task_Queue.group;
local val tag = Universal.tag () : (string * task) option Universal.tag in
fun thread_data () = the_default NONE (Thread.getLocal tag);
@@ -86,8 +84,8 @@
fun is_finished x = is_some (peek x);
fun value x = Future
- {task = TaskQueue.new_task (),
- group = TaskQueue.new_group (),
+ {task = Task_Queue.new_task 0,
+ group = Task_Queue.new_group (),
result = ref (SOME (Exn.Result x))};
@@ -96,12 +94,12 @@
(* global state *)
-val queue = ref TaskQueue.empty;
+val queue = ref Task_Queue.empty;
val next = ref 0;
val workers = ref ([]: (Thread.thread * bool) list);
val scheduler = ref (NONE: Thread.thread option);
val excessive = ref 0;
-val canceled = ref ([]: TaskQueue.group list);
+val canceled = ref ([]: Task_Queue.group list);
val do_shutdown = ref false;
@@ -114,15 +112,11 @@
fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
-fun wait name = (*requires SYNCHRONIZED*)
- (Multithreading.tracing 3 (fn () => name ^ ": wait ...");
+fun wait () = (*requires SYNCHRONIZED*)
ConditionVar.wait (cond, lock);
- Multithreading.tracing 3 (fn () => name ^ ": ... continue"));
-fun wait_timeout name timeout = (*requires SYNCHRONIZED*)
- (Multithreading.tracing 3 (fn () => name ^ ": wait ...");
+fun wait_timeout timeout = (*requires SYNCHRONIZED*)
ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
- Multithreading.tracing 3 (fn () => name ^ ": ... continue"));
fun notify_all () = (*requires SYNCHRONIZED*)
ConditionVar.broadcast cond;
@@ -150,9 +144,9 @@
val _ = trace_active ();
val ok = setmp_thread_data (name, task) run ();
val _ = SYNCHRONIZED "execute" (fn () =>
- (change queue (TaskQueue.finish task);
+ (change queue (Task_Queue.finish task);
if ok then ()
- else if TaskQueue.cancel (! queue) group then ()
+ else if Task_Queue.cancel (! queue) group then ()
else change canceled (cons group);
notify_all ()));
in () end;
@@ -160,23 +154,23 @@
(* worker threads *)
-fun worker_wait name = (*requires SYNCHRONIZED*)
- (change_active false; wait name; change_active true);
+fun worker_wait () = (*requires SYNCHRONIZED*)
+ (change_active false; wait (); change_active true);
-fun worker_next name = (*requires SYNCHRONIZED*)
+fun worker_next () = (*requires SYNCHRONIZED*)
if ! excessive > 0 then
(dec excessive;
change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
notify_all ();
NONE)
else
- (case change_result queue TaskQueue.dequeue of
- NONE => (worker_wait name; worker_next name)
+ (case change_result queue Task_Queue.dequeue of
+ NONE => (worker_wait (); worker_next ())
| some => some);
fun worker_loop name =
- (case SYNCHRONIZED name (fn () => worker_next name) of
- NONE => Multithreading.tracing 3 (fn () => name ^ ": exit")
+ (case SYNCHRONIZED name worker_next of
+ NONE => ()
| SOME work => (execute name work; worker_loop name));
fun worker_start name = (*requires SYNCHRONIZED*)
@@ -204,27 +198,25 @@
else ();
(*canceled groups*)
- val _ = change canceled (filter_out (TaskQueue.cancel (! queue)));
+ val _ = change canceled (filter_out (Task_Queue.cancel (! queue)));
(*shutdown*)
val continue = not (! do_shutdown andalso null (! workers));
val _ = if continue then () else scheduler := NONE;
val _ = notify_all ();
- val _ = wait_timeout "scheduler" (Time.fromSeconds 3);
+ val _ = wait_timeout (Time.fromSeconds 3);
in continue end;
fun scheduler_loop () =
- (while SYNCHRONIZED "scheduler" scheduler_next do ();
- Multithreading.tracing 3 (fn () => "scheduler: exit"));
+ while SYNCHRONIZED "scheduler" scheduler_next do ();
fun scheduler_active () = (*requires SYNCHRONIZED*)
(case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
fun scheduler_check name = SYNCHRONIZED name (fn () =>
if not (scheduler_active ()) then
- (Multithreading.tracing 3 (fn () => "scheduler: fork");
- do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
+ (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
else if ! do_shutdown then error "Scheduler shutdown in progress"
else ());
@@ -235,7 +227,7 @@
let
val _ = scheduler_check "future check";
- val group = (case opt_group of SOME group => group | NONE => TaskQueue.new_group ());
+ val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());
val result = ref (NONE: 'a Exn.result option);
val run = Multithreading.with_attributes (Thread.getAttributes ())
@@ -246,18 +238,18 @@
val res_ok =
(case res of
Exn.Result _ => true
- | Exn.Exn Exn.Interrupt => (TaskQueue.invalidate_group group; true)
+ | Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true)
| _ => false);
in res_ok end);
val task = SYNCHRONIZED "future" (fn () =>
- change_result queue (TaskQueue.enqueue group deps pri run) before notify_all ());
+ change_result queue (Task_Queue.enqueue group deps pri run) before notify_all ());
in Future {task = task, group = group, result = result} end;
-fun fork e = future NONE [] true e;
-fun fork_group group e = future (SOME group) [] true e;
-fun fork_deps deps e = future NONE (map task_of deps) true e;
-fun fork_background e = future NONE [] false e;
+fun fork e = future NONE [] 0 e;
+fun fork_group group e = future (SOME group) [] 0 e;
+fun fork_deps deps e = future NONE (map task_of deps) 0 e;
+fun fork_pri pri e = future NONE [] pri e;
(* join: retrieve results *)
@@ -273,7 +265,7 @@
fun join_loop _ [] = ()
| join_loop name tasks =
(case SYNCHRONIZED name (fn () =>
- change_result queue (TaskQueue.dequeue_towards tasks)) of
+ change_result queue (Task_Queue.dequeue_towards tasks)) of
NONE => ()
| SOME (work, tasks') => (execute name work; join_loop name tasks'));
val _ =
@@ -281,18 +273,18 @@
NONE =>
(*alien thread -- refrain from contending for resources*)
while exists (not o is_finished) xs
- do SYNCHRONIZED "join_thread" (fn () => wait "join_thread")
+ do SYNCHRONIZED "join_thread" (fn () => wait ())
| SOME (name, task) =>
(*proper task -- actively work towards results*)
let
val unfinished = xs |> map_filter
(fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
val _ = SYNCHRONIZED "join" (fn () =>
- (change queue (TaskQueue.depend unfinished task); notify_all ()));
+ (change queue (Task_Queue.depend unfinished task); notify_all ()));
val _ = join_loop ("join_loop: " ^ name) unfinished;
val _ =
while exists (not o is_finished) xs
- do SYNCHRONIZED "join_task" (fn () => worker_wait "join_task");
+ do SYNCHRONIZED "join_task" (fn () => worker_wait ());
in () end);
in xs |> map (fn Future {result = ref (SOME res), ...} => res) end) ();
@@ -300,18 +292,16 @@
fun join_result x = singleton join_results x;
fun join x = Exn.release (join_result x);
-fun map f x = fork_deps [x] (fn () => f (join x));
+fun map f x =
+ let val task = task_of x
+ in future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) end;
(* misc operations *)
-(*focus: collection of high-priority task*)
-fun focus tasks = SYNCHRONIZED "focus" (fn () =>
- change queue (TaskQueue.focus tasks));
-
(*interrupt: permissive signal, may get ignored*)
fun interrupt_task id = SYNCHRONIZED "interrupt"
- (fn () => TaskQueue.interrupt_external (! queue) id);
+ (fn () => Task_Queue.interrupt_external (! queue) id);
(*cancel: present and future group members will be interrupted eventually*)
fun cancel x =
@@ -324,12 +314,12 @@
if Multithreading.available then
(scheduler_check "shutdown check";
SYNCHRONIZED "shutdown" (fn () =>
- (while not (scheduler_active ()) do wait "shutdown: scheduler inactive";
- while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join";
+ (while not (scheduler_active ()) do wait ();
+ while not (Task_Queue.is_empty (! queue)) do wait ();
do_shutdown := true;
notify_all ();
- while not (null (! workers)) do wait "shutdown: workers";
- while scheduler_active () do wait "shutdown: scheduler still active";
+ while not (null (! workers)) do wait ();
+ while scheduler_active () do wait ();
OS.Process.sleep (Time.fromMilliseconds 300))))
else ();
--- a/src/Pure/Concurrent/par_list.ML Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/Concurrent/par_list.ML Tue Dec 16 18:04:31 2008 +0100
@@ -30,7 +30,7 @@
fun raw_map f xs =
if Future.enabled () then
let
- val group = TaskQueue.new_group ();
+ val group = Task_Queue.new_group ();
val futures = map (fn x => Future.fork_group group (fn () => f x)) xs;
val _ = List.app (ignore o Future.join_result) futures;
in Future.join_results futures end
--- a/src/Pure/Concurrent/schedule.ML Tue Dec 16 08:46:07 2008 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,85 +0,0 @@
-(* Title: Pure/Concurrent/schedule.ML
- ID: $Id$
- Author: Makarius
-
-Scheduling -- multiple threads working on a queue of tasks.
-*)
-
-signature SCHEDULE =
-sig
- datatype 'a task =
- Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
- val schedule: int -> ('a -> 'a task * 'a) -> 'a -> exn list
-end;
-
-structure Schedule: SCHEDULE =
-struct
-
-datatype 'a task =
- Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
-
-fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks =>
- let
- (*synchronized execution*)
- val lock = Mutex.mutex ();
- fun SYNCHRONIZED e =
- let
- val _ = Mutex.lock lock;
- val res = Exn.capture e ();
- val _ = Mutex.unlock lock;
- in Exn.release res end;
-
- (*wakeup condition*)
- val wakeup = ConditionVar.conditionVar ();
- fun wakeup_all () = ConditionVar.broadcast wakeup;
- fun wait () = ConditionVar.wait (wakeup, lock);
- fun wait_timeout () =
- ConditionVar.waitUntil (wakeup, lock, Time.+ (Time.now (), Time.fromSeconds 1));
-
- (*queue of tasks*)
- val queue = ref tasks;
- val active = ref 0;
- fun trace_active () = Multithreading.tracing 1 (fn () =>
- "SCHEDULE: " ^ string_of_int (! active) ^ " active");
- fun dequeue () =
- (case change_result queue next_task of
- Wait =>
- (dec active; trace_active ();
- wait ();
- inc active; trace_active ();
- dequeue ())
- | next => next);
-
- (*pool of running threads*)
- val status = ref ([]: exn list);
- val running = ref ([]: Thread.thread list);
- fun start f = (inc active; change running (cons (SimpleThread.fork false f)));
- fun stop () = (dec active; change running (remove Thread.equal (Thread.self ())));
-
- (*worker thread*)
- fun worker () =
- (case SYNCHRONIZED dequeue of
- Task {body, cont, fail} =>
- (case Exn.capture (restore_attributes body) () of
- Exn.Result () =>
- (SYNCHRONIZED (fn () => (change queue cont; wakeup_all ())); worker ())
- | Exn.Exn exn =>
- SYNCHRONIZED (fn () =>
- (change status (cons exn); change queue fail; stop (); wakeup_all ())))
- | Terminate => SYNCHRONIZED (fn () => (stop (); wakeup_all ())));
-
- (*main control: fork and wait*)
- fun fork 0 = ()
- | fork k = (start worker; fork (k - 1));
- val _ = SYNCHRONIZED (fn () =>
- (fork (Int.max (n, 1));
- while not (null (! running)) do
- (trace_active ();
- if not (null (! status))
- then (List.app SimpleThread.interrupt (! running))
- else ();
- wait_timeout ())));
-
- in ! status end);
-
-end;
--- a/src/Pure/Concurrent/task_queue.ML Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/Concurrent/task_queue.ML Tue Dec 16 18:04:31 2008 +0100
@@ -1,5 +1,4 @@
(* Title: Pure/Concurrent/task_queue.ML
- ID: $Id$
Author: Makarius
Ordered queue of grouped tasks.
@@ -8,7 +7,8 @@
signature TASK_QUEUE =
sig
eqtype task
- val new_task: unit -> task
+ val new_task: int -> task
+ val pri_of_task: task -> int
val str_of_task: task -> string
eqtype group
val new_group: unit -> group
@@ -17,9 +17,8 @@
type queue
val empty: queue
val is_empty: queue -> bool
- val enqueue: group -> task list -> bool -> (bool -> bool) -> queue -> task * queue
+ val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue
val depend: task list -> task -> queue -> queue
- val focus: task list -> queue -> queue
val dequeue: queue -> (task * group * (unit -> bool)) option * queue
val dequeue_towards: task list -> queue ->
(((task * group * (unit -> bool)) * task list) option * queue)
@@ -29,20 +28,27 @@
val cancel: queue -> group -> bool
end;
-structure TaskQueue: TASK_QUEUE =
+structure Task_Queue: TASK_QUEUE =
struct
-(* identifiers *)
+(* tasks *)
+
+datatype task = Task of int * serial;
+fun new_task pri = Task (pri, serial ());
-datatype task = Task of serial;
-fun new_task () = Task (serial ());
+fun pri_of_task (Task (pri, _)) = pri;
+fun str_of_task (Task (_, i)) = string_of_int i;
-fun str_of_task (Task i) = string_of_int i;
+fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2);
+structure Task_Graph = GraphFun(type key = task val ord = task_ord);
+(* groups *)
+
datatype group = Group of serial * bool ref;
fun new_group () = Group (serial (), ref true);
+
fun invalidate_group (Group (_, ok)) = ok := false;
fun str_of_group (Group (i, ref ok)) =
@@ -52,53 +58,46 @@
(* jobs *)
datatype job =
- Job of bool * (bool -> bool) | (*priority, job: status -> status*)
+ Job of bool -> bool |
Running of Thread.thread;
-type jobs = (group * job) IntGraph.T;
+type jobs = (group * job) Task_Graph.T;
-fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id);
-fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id);
-fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs;
+fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
+fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
+fun map_job task f (jobs: jobs) = Task_Graph.map_node task (apsnd f) jobs;
-fun add_job (Task id) (Task dep) (jobs: jobs) =
- IntGraph.add_edge (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
+fun add_job task dep (jobs: jobs) =
+ Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
-fun add_job_acyclic (Task id) (Task dep) (jobs: jobs) =
- IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
-
-fun check_job (jobs: jobs) (task as Task id) =
- if can (IntGraph.get_node jobs) id then SOME task else NONE;
+fun add_job_acyclic task dep (jobs: jobs) =
+ Task_Graph.add_edge_acyclic (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
(* queue of grouped jobs *)
datatype queue = Queue of
{groups: task list Inttab.table, (*groups with presently active members*)
- jobs: jobs, (*job dependency graph*)
- focus: task list}; (*particular collection of high-priority tasks*)
+ jobs: jobs}; (*job dependency graph*)
-fun make_queue groups jobs focus = Queue {groups = groups, jobs = jobs, focus = focus};
+fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
-val empty = make_queue Inttab.empty IntGraph.empty [];
-fun is_empty (Queue {jobs, ...}) = IntGraph.is_empty jobs;
+val empty = make_queue Inttab.empty Task_Graph.empty;
+fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
(* enqueue *)
-fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, focus}) =
+fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs}) =
let
- val task as Task id = new_task ();
+ val task = new_task pri;
val groups' = Inttab.cons_list (gid, task) groups;
val jobs' = jobs
- |> IntGraph.new_node (id, (group, Job (pri, job))) |> fold (add_job task) deps;
- in (task, make_queue groups' jobs' focus) end;
+ |> Task_Graph.new_node (task, (group, Job job)) |> fold (add_job task) deps;
+ in (task, make_queue groups' jobs') end;
-fun depend deps task (Queue {groups, jobs, focus}) =
- make_queue groups (fold (add_job_acyclic task) deps jobs) focus;
-
-fun focus tasks (Queue {groups, jobs, ...}) =
- make_queue groups jobs (map_filter (check_job jobs) tasks);
+fun depend deps task (Queue {groups, jobs}) =
+ make_queue groups (fold (add_job_acyclic task) deps jobs);
(* dequeue *)
@@ -106,38 +105,30 @@
local
fun dequeue_result NONE queue = (NONE, queue)
- | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs, focus}) =
- (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs) focus);
-
-fun dequeue_global req_pri (queue as Queue {jobs, ...}) =
- let
- fun ready (id, ((group as Group (_, ref ok), Job (pri, job)), ([], _))) =
- if pri = req_pri then SOME (Task id, group, (fn () => job ok)) else NONE
- | ready _ = NONE;
- in dequeue_result (IntGraph.get_first ready jobs) queue end;
-
-fun dequeue_local focus (queue as Queue {jobs, ...}) =
- let
- fun ready id =
- (case IntGraph.get_node jobs id of
- (group as Group (_, ref ok), Job (_, job)) =>
- if null (IntGraph.imm_preds jobs id) then SOME (Task id, group, (fn () => job ok))
- else NONE
- | _ => NONE);
- val ids = map (fn Task id => id) focus;
- in dequeue_result (get_first ready (IntGraph.all_preds jobs ids)) queue end;
+ | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) =
+ (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs));
in
-fun dequeue (queue as Queue {focus, ...}) =
- (case dequeue_local focus queue of
- (NONE, _) =>
- (case dequeue_global true queue of (NONE, _) => dequeue_global false queue | res => res)
- | res => res);
+fun dequeue (queue as Queue {jobs, ...}) =
+ let
+ fun ready (task, ((group as Group (_, ref ok), Job job), ([], _))) =
+ SOME (task, group, (fn () => job ok))
+ | ready _ = NONE;
+ in dequeue_result (Task_Graph.get_first ready jobs) queue end;
fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
- let val tasks' = map_filter (check_job jobs) tasks in
- (case dequeue_local tasks' queue of
+ let
+ val tasks' = filter (can (Task_Graph.get_node jobs)) tasks;
+
+ fun ready task =
+ (case Task_Graph.get_node jobs task of
+ (group as Group (_, ref ok), Job job) =>
+ if null (Task_Graph.imm_preds jobs task) then SOME (task, group, (fn () => job ok))
+ else NONE
+ | _ => NONE);
+ in
+ (case dequeue_result (get_first ready (Task_Graph.all_preds jobs tasks')) queue of
(NONE, queue') => (NONE, queue')
| (SOME work, queue') => (SOME (work, tasks'), queue'))
end;
@@ -150,8 +141,13 @@
fun interrupt (Queue {jobs, ...}) task =
(case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ());
-fun interrupt_external queue str =
- (case Int.fromString str of SOME id => interrupt queue (Task id) | NONE => ());
+fun interrupt_external (queue as Queue {jobs, ...}) str =
+ (case Int.fromString str of
+ SOME i =>
+ (case Task_Graph.get_first
+ (fn (task as Task (_, j), _) => if i = j then SOME task else NONE) jobs
+ of SOME task => interrupt queue task | NONE => ())
+ | NONE => ());
(* misc operations *)
@@ -164,12 +160,11 @@
val _ = List.app SimpleThread.interrupt running;
in null running end;
-fun finish (task as Task id) (Queue {groups, jobs, focus}) =
+fun finish task (Queue {groups, jobs}) =
let
val Group (gid, _) = get_group jobs task;
val groups' = Inttab.remove_list (op =) (gid, task) groups;
- val jobs' = IntGraph.del_node id jobs;
- val focus' = remove (op =) task focus;
- in make_queue groups' jobs' focus' end;
+ val jobs' = Task_Graph.del_node task jobs;
+ in make_queue groups' jobs' end;
end;
--- a/src/Pure/IsaMakefile Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/IsaMakefile Tue Dec 16 18:04:31 2008 +0100
@@ -23,26 +23,24 @@
$(OUT)/Pure: Concurrent/ROOT.ML Concurrent/future.ML \
Concurrent/mailbox.ML Concurrent/par_list.ML \
- Concurrent/par_list_dummy.ML Concurrent/schedule.ML \
- Concurrent/simple_thread.ML Concurrent/synchronized.ML \
- Concurrent/task_queue.ML General/ROOT.ML General/alist.ML \
- General/balanced_tree.ML General/basics.ML General/buffer.ML \
- General/file.ML General/graph.ML General/heap.ML General/integer.ML \
- General/lazy.ML General/markup.ML General/name_space.ML \
- General/ord_list.ML General/output.ML General/path.ML \
- General/position.ML General/pretty.ML General/print_mode.ML \
- General/properties.ML General/queue.ML General/scan.ML \
- General/secure.ML General/seq.ML General/source.ML General/stack.ML \
- General/symbol.ML General/symbol_pos.ML General/table.ML \
- General/url.ML General/xml.ML General/yxml.ML Isar/ROOT.ML \
- Isar/antiquote.ML Isar/args.ML Isar/attrib.ML Isar/auto_bind.ML \
- Isar/calculation.ML Isar/class.ML Isar/code.ML Isar/code_unit.ML \
- Isar/constdefs.ML Isar/context_rules.ML Isar/element.ML \
- Isar/expression.ML \
- Isar/find_theorems.ML Isar/instance.ML Isar/isar.ML Isar/isar_cmd.ML \
- Isar/isar_syn.ML Isar/local_defs.ML Isar/local_syntax.ML \
- Isar/local_theory.ML Isar/locale.ML Isar/method.ML Isar/net_rules.ML \
- Isar/new_locale.ML \
+ Concurrent/par_list_dummy.ML Concurrent/simple_thread.ML \
+ Concurrent/synchronized.ML Concurrent/task_queue.ML General/ROOT.ML \
+ General/alist.ML General/balanced_tree.ML General/basics.ML \
+ General/buffer.ML General/file.ML General/graph.ML General/heap.ML \
+ General/integer.ML General/lazy.ML General/markup.ML \
+ General/name_space.ML General/ord_list.ML General/output.ML \
+ General/path.ML General/position.ML General/pretty.ML \
+ General/print_mode.ML General/properties.ML General/queue.ML \
+ General/scan.ML General/secure.ML General/seq.ML General/source.ML \
+ General/stack.ML General/symbol.ML General/symbol_pos.ML \
+ General/table.ML General/url.ML General/xml.ML General/yxml.ML \
+ Isar/ROOT.ML Isar/antiquote.ML Isar/args.ML Isar/attrib.ML \
+ Isar/auto_bind.ML Isar/calculation.ML Isar/class.ML Isar/code.ML \
+ Isar/code_unit.ML Isar/constdefs.ML Isar/context_rules.ML \
+ Isar/element.ML Isar/expression.ML Isar/find_theorems.ML \
+ Isar/instance.ML Isar/isar.ML Isar/isar_cmd.ML Isar/isar_syn.ML \
+ Isar/local_defs.ML Isar/local_syntax.ML Isar/local_theory.ML \
+ Isar/locale.ML Isar/method.ML Isar/net_rules.ML Isar/new_locale.ML \
Isar/object_logic.ML Isar/obtain.ML Isar/outer_keyword.ML \
Isar/outer_lex.ML Isar/outer_parse.ML Isar/outer_syntax.ML \
Isar/overloading.ML Isar/proof.ML Isar/proof_context.ML \
@@ -76,17 +74,16 @@
Syntax/syn_trans.ML Syntax/syntax.ML Syntax/type_ext.ML Thy/html.ML \
Thy/latex.ML Thy/present.ML Thy/term_style.ML Thy/thm_deps.ML \
Thy/thy_edit.ML Thy/thy_header.ML Thy/thy_info.ML Thy/thy_load.ML \
- Thy/thy_output.ML Tools/ROOT.ML Tools/invoke.ML \
- Tools/isabelle_process.ML Tools/named_thms.ML \
- Tools/xml_syntax.ML assumption.ML axclass.ML codegen.ML config.ML \
- conjunction.ML consts.ML context.ML context_position.ML conv.ML \
- defs.ML display.ML drule.ML envir.ML facts.ML goal.ML \
- interpretation.ML library.ML logic.ML meta_simplifier.ML more_thm.ML \
- morphism.ML name.ML net.ML old_goals.ML pattern.ML primitive_defs.ML \
- proofterm.ML pure_setup.ML pure_thy.ML search.ML sign.ML \
- simplifier.ML sorts.ML subgoal.ML tactic.ML tctical.ML term.ML \
- term_subst.ML theory.ML thm.ML type.ML type_infer.ML unify.ML \
- variable.ML ../Tools/quickcheck.ML
+ Thy/thy_output.ML Tools/ROOT.ML Tools/invoke.ML \
+ Tools/isabelle_process.ML Tools/named_thms.ML Tools/xml_syntax.ML \
+ assumption.ML axclass.ML codegen.ML config.ML conjunction.ML \
+ consts.ML context.ML context_position.ML conv.ML defs.ML display.ML \
+ drule.ML envir.ML facts.ML goal.ML interpretation.ML library.ML \
+ logic.ML meta_simplifier.ML more_thm.ML morphism.ML name.ML net.ML \
+ old_goals.ML pattern.ML primitive_defs.ML proofterm.ML pure_setup.ML \
+ pure_thy.ML search.ML sign.ML simplifier.ML sorts.ML subgoal.ML \
+ tactic.ML tctical.ML term.ML term_subst.ML theory.ML thm.ML type.ML \
+ type_infer.ML unify.ML variable.ML ../Tools/quickcheck.ML
@./mk
--- a/src/Pure/Isar/toplevel.ML Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/Isar/toplevel.ML Tue Dec 16 18:04:31 2008 +0100
@@ -718,7 +718,7 @@
val future_proof = Proof.future_proof
(fn prf =>
- Future.fork_background (fn () =>
+ Future.fork_pri 1 (fn () =>
let val (states, State (result_node, _)) =
(case st' of State (SOME (Proof (_, (_, orig_gthy)), exit), prev)
=> State (SOME (Proof (ProofNode.init prf, (finish, orig_gthy)), exit), prev))
--- a/src/Pure/Thy/thy_info.ML Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/Thy/thy_info.ML Tue Dec 16 18:04:31 2008 +0100
@@ -315,7 +315,13 @@
datatype task = Task of (unit -> unit) | Finished | Running;
fun task_finished Finished = true | task_finished _ = false;
-fun future_schedule task_graph =
+local
+
+fun schedule_seq tasks =
+ Graph.topological_order tasks
+ |> List.app (fn name => (case Graph.get_node tasks name of Task body => body () | _ => ()));
+
+fun schedule_futures task_graph =
let
val tasks = Graph.topological_order task_graph |> map_filter (fn name =>
(case Graph.get_node task_graph name of Task body => SOME (name, body) | _ => NONE));
@@ -339,45 +345,14 @@
val proof_results = PureThy.join_proofs (map_filter (try get_theory o #1) tasks);
in ignore (Exn.release_all (thy_results @ proof_results)) end;
-local
-
-fun max_task (name, (Task body, m)) NONE = SOME (name: string, (body, m))
- | max_task (name, (Task body, m)) (task' as SOME (name', (_, m'))) =
- if m > m' orelse m = m' andalso name < name' then SOME (name, (body, m)) else task'
- | max_task _ task' = task';
-
-fun next_task G =
- let
- val tasks = Graph.minimals G |> map (fn name =>
- (name, (Graph.get_node G name, length (Graph.imm_succs G name))));
- val finished = filter (task_finished o fst o snd) tasks;
- in
- if not (null finished) then next_task (Graph.del_nodes (map fst finished) G)
- else if null tasks then (Schedule.Terminate, G)
- else
- (case fold max_task tasks NONE of
- NONE => (Schedule.Wait, G)
- | SOME (name, (body, _)) =>
- (Schedule.Task {body = PrintMode.closure body,
- cont = Graph.del_nodes [name], fail = K Graph.empty},
- Graph.map_node name (K Running) G))
- end;
-
-fun schedule_seq tasks =
- Graph.topological_order tasks
- |> List.app (fn name => (case Graph.get_node tasks name of Task body => body () | _ => ()));
-
in
fun schedule_tasks tasks n =
- let val m = Multithreading.max_threads_value () in
- if m <= 1 then schedule_seq tasks
- else if Multithreading.self_critical () then
+ if not (Multithreading.enabled ()) then schedule_seq tasks
+ else if Multithreading.self_critical () then
(warning (loader_msg "no multithreading within critical section" []);
schedule_seq tasks)
- else if Future.enabled () then future_schedule tasks
- else ignore (Exn.release_all (map Exn.Exn (Schedule.schedule (Int.min (m, n)) next_task tasks)))
- end;
+ else schedule_futures tasks;
end;
--- a/src/Pure/goal.ML Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/goal.ML Tue Dec 16 18:04:31 2008 +0100
@@ -179,7 +179,7 @@
val res =
if immediate orelse #maxidx (Thm.rep_cterm stmt) >= 0 orelse not (Future.enabled ())
then result ()
- else future_result ctxt' (Future.fork_background result) (Thm.term_of stmt);
+ else future_result ctxt' (Future.fork_pri 1 result) (Thm.term_of stmt);
in
Conjunction.elim_balanced (length props) res
|> map (Assumption.export false ctxt' ctxt)
--- a/src/Pure/pure_setup.ML Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/pure_setup.ML Tue Dec 16 18:04:31 2008 +0100
@@ -1,5 +1,4 @@
(* Title: Pure/pure_setup.ML
- ID: $Id$
Author: Makarius
Pure theory and ML toplevel setup.
@@ -28,8 +27,8 @@
(* ML toplevel pretty printing *)
-install_pp (make_pp ["TaskQueue", "task"] (Pretty.pprint o Pretty.str o TaskQueue.str_of_task));
-install_pp (make_pp ["TaskQueue", "group"] (Pretty.pprint o Pretty.str o TaskQueue.str_of_group));
+install_pp (make_pp ["Task_Queue", "task"] (Pretty.pprint o Pretty.str o Task_Queue.str_of_task));
+install_pp (make_pp ["Task_Queue", "group"] (Pretty.pprint o Pretty.str o Task_Queue.str_of_group));
install_pp (make_pp ["Position", "T"] (Pretty.pprint o Pretty.enum "," "{" "}" o
map (fn (x, y) => Pretty.str (x ^ "=" ^ y)) o Position.properties_of));
install_pp (make_pp ["Thm", "thm"] ProofDisplay.pprint_thm);