merged
authorwenzelm
Tue, 16 Dec 2008 18:04:31 +0100
changeset 29126 970d746274d5
parent 29125 d41182a8135c (current diff)
parent 29124 ce6f21913e54 (diff)
child 29127 2a684ee023e7
merged
src/Pure/Concurrent/schedule.ML
--- a/src/Pure/Concurrent/ROOT.ML	Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/Concurrent/ROOT.ML	Tue Dec 16 18:04:31 2008 +0100
@@ -1,15 +1,12 @@
 (*  Title:      Pure/Concurrent/ROOT.ML
-    ID:         $Id$
+    Author:     Makarius
 
 Concurrency within the ML runtime.
 *)
 
-val future_scheduler = ref true;
-
 use "simple_thread.ML";
 use "synchronized.ML";
 use "mailbox.ML";
-use "schedule.ML";
 use "task_queue.ML";
 use "future.ML";
 use "par_list.ML";
--- a/src/Pure/Concurrent/future.ML	Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/Concurrent/future.ML	Tue Dec 16 18:04:31 2008 +0100
@@ -1,5 +1,4 @@
 (*  Title:      Pure/Concurrent/future.ML
-    ID:         $Id$
     Author:     Makarius
 
 Future values.
@@ -28,8 +27,8 @@
 signature FUTURE =
 sig
   val enabled: unit -> bool
-  type task = TaskQueue.task
-  type group = TaskQueue.group
+  type task = Task_Queue.task
+  type group = Task_Queue.group
   val thread_data: unit -> (string * task) option
   type 'a future
   val task_of: 'a future -> task
@@ -40,12 +39,11 @@
   val fork: (unit -> 'a) -> 'a future
   val fork_group: group -> (unit -> 'a) -> 'a future
   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
-  val fork_background: (unit -> 'a) -> 'a future
+  val fork_pri: int -> (unit -> 'a) -> 'a future
   val join_results: 'a future list -> 'a Exn.result list
   val join_result: 'a future -> 'a Exn.result
   val join: 'a future -> 'a
   val map: ('a -> 'b) -> 'a future -> 'b future
-  val focus: task list -> unit
   val interrupt_task: string -> unit
   val cancel: 'a future -> unit
   val shutdown: unit -> unit
@@ -57,14 +55,14 @@
 (** future values **)
 
 fun enabled () =
-  ! future_scheduler andalso Multithreading.enabled () andalso
+  Multithreading.enabled () andalso
     not (Multithreading.self_critical ());
 
 
 (* identifiers *)
 
-type task = TaskQueue.task;
-type group = TaskQueue.group;
+type task = Task_Queue.task;
+type group = Task_Queue.group;
 
 local val tag = Universal.tag () : (string * task) option Universal.tag in
   fun thread_data () = the_default NONE (Thread.getLocal tag);
@@ -86,8 +84,8 @@
 fun is_finished x = is_some (peek x);
 
 fun value x = Future
- {task = TaskQueue.new_task (),
-  group = TaskQueue.new_group (),
+ {task = Task_Queue.new_task 0,
+  group = Task_Queue.new_group (),
   result = ref (SOME (Exn.Result x))};
 
 
@@ -96,12 +94,12 @@
 
 (* global state *)
 
-val queue = ref TaskQueue.empty;
+val queue = ref Task_Queue.empty;
 val next = ref 0;
 val workers = ref ([]: (Thread.thread * bool) list);
 val scheduler = ref (NONE: Thread.thread option);
 val excessive = ref 0;
-val canceled = ref ([]: TaskQueue.group list);
+val canceled = ref ([]: Task_Queue.group list);
 val do_shutdown = ref false;
 
 
@@ -114,15 +112,11 @@
 
 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
 
-fun wait name = (*requires SYNCHRONIZED*)
- (Multithreading.tracing 3 (fn () => name ^ ": wait ...");
+fun wait () = (*requires SYNCHRONIZED*)
   ConditionVar.wait (cond, lock);
-  Multithreading.tracing 3 (fn () => name ^ ": ... continue"));
 
-fun wait_timeout name timeout = (*requires SYNCHRONIZED*)
- (Multithreading.tracing 3 (fn () => name ^ ": wait ...");
+fun wait_timeout timeout = (*requires SYNCHRONIZED*)
   ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
-  Multithreading.tracing 3 (fn () => name ^ ": ... continue"));
 
 fun notify_all () = (*requires SYNCHRONIZED*)
   ConditionVar.broadcast cond;
@@ -150,9 +144,9 @@
     val _ = trace_active ();
     val ok = setmp_thread_data (name, task) run ();
     val _ = SYNCHRONIZED "execute" (fn () =>
-     (change queue (TaskQueue.finish task);
+     (change queue (Task_Queue.finish task);
       if ok then ()
-      else if TaskQueue.cancel (! queue) group then ()
+      else if Task_Queue.cancel (! queue) group then ()
       else change canceled (cons group);
       notify_all ()));
   in () end;
@@ -160,23 +154,23 @@
 
 (* worker threads *)
 
-fun worker_wait name = (*requires SYNCHRONIZED*)
-  (change_active false; wait name; change_active true);
+fun worker_wait () = (*requires SYNCHRONIZED*)
+  (change_active false; wait (); change_active true);
 
-fun worker_next name = (*requires SYNCHRONIZED*)
+fun worker_next () = (*requires SYNCHRONIZED*)
   if ! excessive > 0 then
     (dec excessive;
      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
      notify_all ();
      NONE)
   else
-    (case change_result queue TaskQueue.dequeue of
-      NONE => (worker_wait name; worker_next name)
+    (case change_result queue Task_Queue.dequeue of
+      NONE => (worker_wait (); worker_next ())
     | some => some);
 
 fun worker_loop name =
-  (case SYNCHRONIZED name (fn () => worker_next name) of
-    NONE => Multithreading.tracing 3 (fn () => name ^ ": exit")
+  (case SYNCHRONIZED name worker_next of
+    NONE => ()
   | SOME work => (execute name work; worker_loop name));
 
 fun worker_start name = (*requires SYNCHRONIZED*)
@@ -204,27 +198,25 @@
       else ();
 
     (*canceled groups*)
-    val _ =  change canceled (filter_out (TaskQueue.cancel (! queue)));
+    val _ =  change canceled (filter_out (Task_Queue.cancel (! queue)));
 
     (*shutdown*)
     val continue = not (! do_shutdown andalso null (! workers));
     val _ = if continue then () else scheduler := NONE;
 
     val _ = notify_all ();
-    val _ = wait_timeout "scheduler" (Time.fromSeconds 3);
+    val _ = wait_timeout (Time.fromSeconds 3);
   in continue end;
 
 fun scheduler_loop () =
- (while SYNCHRONIZED "scheduler" scheduler_next do ();
-  Multithreading.tracing 3 (fn () => "scheduler: exit"));
+  while SYNCHRONIZED "scheduler" scheduler_next do ();
 
 fun scheduler_active () = (*requires SYNCHRONIZED*)
   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
 
 fun scheduler_check name = SYNCHRONIZED name (fn () =>
   if not (scheduler_active ()) then
-    (Multithreading.tracing 3 (fn () => "scheduler: fork");
-     do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
+    (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
   else if ! do_shutdown then error "Scheduler shutdown in progress"
   else ());
 
@@ -235,7 +227,7 @@
   let
     val _ = scheduler_check "future check";
 
-    val group = (case opt_group of SOME group => group | NONE => TaskQueue.new_group ());
+    val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());
 
     val result = ref (NONE: 'a Exn.result option);
     val run = Multithreading.with_attributes (Thread.getAttributes ())
@@ -246,18 +238,18 @@
           val res_ok =
             (case res of
               Exn.Result _ => true
-            | Exn.Exn Exn.Interrupt => (TaskQueue.invalidate_group group; true)
+            | Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true)
             | _ => false);
         in res_ok end);
 
     val task = SYNCHRONIZED "future" (fn () =>
-      change_result queue (TaskQueue.enqueue group deps pri run) before notify_all ());
+      change_result queue (Task_Queue.enqueue group deps pri run) before notify_all ());
   in Future {task = task, group = group, result = result} end;
 
-fun fork e = future NONE [] true e;
-fun fork_group group e = future (SOME group) [] true e;
-fun fork_deps deps e = future NONE (map task_of deps) true e;
-fun fork_background e = future NONE [] false e;
+fun fork e = future NONE [] 0 e;
+fun fork_group group e = future (SOME group) [] 0 e;
+fun fork_deps deps e = future NONE (map task_of deps) 0 e;
+fun fork_pri pri e = future NONE [] pri e;
 
 
 (* join: retrieve results *)
@@ -273,7 +265,7 @@
         fun join_loop _ [] = ()
           | join_loop name tasks =
               (case SYNCHRONIZED name (fn () =>
-                  change_result queue (TaskQueue.dequeue_towards tasks)) of
+                  change_result queue (Task_Queue.dequeue_towards tasks)) of
                 NONE => ()
               | SOME (work, tasks') => (execute name work; join_loop name tasks'));
         val _ =
@@ -281,18 +273,18 @@
             NONE =>
               (*alien thread -- refrain from contending for resources*)
               while exists (not o is_finished) xs
-              do SYNCHRONIZED "join_thread" (fn () => wait "join_thread")
+              do SYNCHRONIZED "join_thread" (fn () => wait ())
           | SOME (name, task) =>
               (*proper task -- actively work towards results*)
               let
                 val unfinished = xs |> map_filter
                   (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
                 val _ = SYNCHRONIZED "join" (fn () =>
-                  (change queue (TaskQueue.depend unfinished task); notify_all ()));
+                  (change queue (Task_Queue.depend unfinished task); notify_all ()));
                 val _ = join_loop ("join_loop: " ^ name) unfinished;
                 val _ =
                   while exists (not o is_finished) xs
-                  do SYNCHRONIZED "join_task" (fn () => worker_wait "join_task");
+                  do SYNCHRONIZED "join_task" (fn () => worker_wait ());
               in () end);
 
       in xs |> map (fn Future {result = ref (SOME res), ...} => res) end) ();
@@ -300,18 +292,16 @@
 fun join_result x = singleton join_results x;
 fun join x = Exn.release (join_result x);
 
-fun map f x = fork_deps [x] (fn () => f (join x));
+fun map f x =
+  let val task = task_of x
+  in future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) end;
 
 
 (* misc operations *)
 
-(*focus: collection of high-priority task*)
-fun focus tasks = SYNCHRONIZED "focus" (fn () =>
-  change queue (TaskQueue.focus tasks));
-
 (*interrupt: permissive signal, may get ignored*)
 fun interrupt_task id = SYNCHRONIZED "interrupt"
-  (fn () => TaskQueue.interrupt_external (! queue) id);
+  (fn () => Task_Queue.interrupt_external (! queue) id);
 
 (*cancel: present and future group members will be interrupted eventually*)
 fun cancel x =
@@ -324,12 +314,12 @@
   if Multithreading.available then
    (scheduler_check "shutdown check";
     SYNCHRONIZED "shutdown" (fn () =>
-     (while not (scheduler_active ()) do wait "shutdown: scheduler inactive";
-      while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join";
+     (while not (scheduler_active ()) do wait ();
+      while not (Task_Queue.is_empty (! queue)) do wait ();
       do_shutdown := true;
       notify_all ();
-      while not (null (! workers)) do wait "shutdown: workers";
-      while scheduler_active () do wait "shutdown: scheduler still active";
+      while not (null (! workers)) do wait ();
+      while scheduler_active () do wait ();
       OS.Process.sleep (Time.fromMilliseconds 300))))
   else ();
 
--- a/src/Pure/Concurrent/par_list.ML	Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/Concurrent/par_list.ML	Tue Dec 16 18:04:31 2008 +0100
@@ -30,7 +30,7 @@
 fun raw_map f xs =
   if Future.enabled () then
     let
-      val group = TaskQueue.new_group ();
+      val group = Task_Queue.new_group ();
       val futures = map (fn x => Future.fork_group group (fn () => f x)) xs;
       val _ = List.app (ignore o Future.join_result) futures;
     in Future.join_results futures end
--- a/src/Pure/Concurrent/schedule.ML	Tue Dec 16 08:46:07 2008 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,85 +0,0 @@
-(*  Title:      Pure/Concurrent/schedule.ML
-    ID:         $Id$
-    Author:     Makarius
-
-Scheduling -- multiple threads working on a queue of tasks.
-*)
-
-signature SCHEDULE =
-sig
-  datatype 'a task =
-    Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
-  val schedule: int -> ('a -> 'a task * 'a) -> 'a -> exn list
-end;
-
-structure Schedule: SCHEDULE =
-struct
-
-datatype 'a task =
-  Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
-
-fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks =>
-  let
-    (*synchronized execution*)
-    val lock = Mutex.mutex ();
-    fun SYNCHRONIZED e =
-      let
-        val _ = Mutex.lock lock;
-        val res = Exn.capture e ();
-        val _ = Mutex.unlock lock;
-      in Exn.release res end;
-
-    (*wakeup condition*)
-    val wakeup = ConditionVar.conditionVar ();
-    fun wakeup_all () = ConditionVar.broadcast wakeup;
-    fun wait () = ConditionVar.wait (wakeup, lock);
-    fun wait_timeout () =
-      ConditionVar.waitUntil (wakeup, lock, Time.+ (Time.now (), Time.fromSeconds 1));
-
-    (*queue of tasks*)
-    val queue = ref tasks;
-    val active = ref 0;
-    fun trace_active () = Multithreading.tracing 1 (fn () =>
-      "SCHEDULE: " ^ string_of_int (! active) ^ " active");
-    fun dequeue () =
-      (case change_result queue next_task of
-        Wait =>
-          (dec active; trace_active ();
-            wait ();
-            inc active; trace_active ();
-            dequeue ())
-      | next => next);
-
-    (*pool of running threads*)
-    val status = ref ([]: exn list);
-    val running = ref ([]: Thread.thread list);
-    fun start f = (inc active; change running (cons (SimpleThread.fork false f)));
-    fun stop () = (dec active; change running (remove Thread.equal (Thread.self ())));
-
-    (*worker thread*)
-    fun worker () =
-      (case SYNCHRONIZED dequeue of
-        Task {body, cont, fail} =>
-          (case Exn.capture (restore_attributes body) () of
-            Exn.Result () =>
-              (SYNCHRONIZED (fn () => (change queue cont; wakeup_all ())); worker ())
-          | Exn.Exn exn =>
-              SYNCHRONIZED (fn () =>
-                (change status (cons exn); change queue fail; stop (); wakeup_all ())))
-      | Terminate => SYNCHRONIZED (fn () => (stop (); wakeup_all ())));
-
-    (*main control: fork and wait*)
-    fun fork 0 = ()
-      | fork k = (start worker; fork (k - 1));
-    val _ = SYNCHRONIZED (fn () =>
-     (fork (Int.max (n, 1));
-      while not (null (! running)) do
-      (trace_active ();
-       if not (null (! status))
-       then (List.app SimpleThread.interrupt (! running))
-       else ();
-       wait_timeout ())));
-
-  in ! status end);
-
-end;
--- a/src/Pure/Concurrent/task_queue.ML	Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/Concurrent/task_queue.ML	Tue Dec 16 18:04:31 2008 +0100
@@ -1,5 +1,4 @@
 (*  Title:      Pure/Concurrent/task_queue.ML
-    ID:         $Id$
     Author:     Makarius
 
 Ordered queue of grouped tasks.
@@ -8,7 +7,8 @@
 signature TASK_QUEUE =
 sig
   eqtype task
-  val new_task: unit -> task
+  val new_task: int -> task
+  val pri_of_task: task -> int
   val str_of_task: task -> string
   eqtype group
   val new_group: unit -> group
@@ -17,9 +17,8 @@
   type queue
   val empty: queue
   val is_empty: queue -> bool
-  val enqueue: group -> task list -> bool -> (bool -> bool) -> queue -> task * queue
+  val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue
   val depend: task list -> task -> queue -> queue
-  val focus: task list -> queue -> queue
   val dequeue: queue -> (task * group * (unit -> bool)) option * queue
   val dequeue_towards: task list -> queue ->
     (((task * group * (unit -> bool)) * task list) option * queue)
@@ -29,20 +28,27 @@
   val cancel: queue -> group -> bool
 end;
 
-structure TaskQueue: TASK_QUEUE =
+structure Task_Queue: TASK_QUEUE =
 struct
 
-(* identifiers *)
+(* tasks *)
+
+datatype task = Task of int * serial;
+fun new_task pri = Task (pri, serial ());
 
-datatype task = Task of serial;
-fun new_task () = Task (serial ());
+fun pri_of_task (Task (pri, _)) = pri;
+fun str_of_task (Task (_, i)) = string_of_int i;
 
-fun str_of_task (Task i) = string_of_int i;
+fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2);
+structure Task_Graph = GraphFun(type key = task val ord = task_ord);
 
 
+(* groups *)
+
 datatype group = Group of serial * bool ref;
 
 fun new_group () = Group (serial (), ref true);
+
 fun invalidate_group (Group (_, ok)) = ok := false;
 
 fun str_of_group (Group (i, ref ok)) =
@@ -52,53 +58,46 @@
 (* jobs *)
 
 datatype job =
-  Job of bool * (bool -> bool) |   (*priority, job: status -> status*)
+  Job of bool -> bool |
   Running of Thread.thread;
 
-type jobs = (group * job) IntGraph.T;
+type jobs = (group * job) Task_Graph.T;
 
-fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id);
-fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id);
-fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs;
+fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
+fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
+fun map_job task f (jobs: jobs) = Task_Graph.map_node task (apsnd f) jobs;
 
-fun add_job (Task id) (Task dep) (jobs: jobs) =
-  IntGraph.add_edge (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
+fun add_job task dep (jobs: jobs) =
+  Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
 
-fun add_job_acyclic (Task id) (Task dep) (jobs: jobs) =
-  IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
-
-fun check_job (jobs: jobs) (task as Task id) =
-  if can (IntGraph.get_node jobs) id then SOME task else NONE;
+fun add_job_acyclic task dep (jobs: jobs) =
+  Task_Graph.add_edge_acyclic (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
 
 
 (* queue of grouped jobs *)
 
 datatype queue = Queue of
  {groups: task list Inttab.table,   (*groups with presently active members*)
-  jobs: jobs,                       (*job dependency graph*)
-  focus: task list};                (*particular collection of high-priority tasks*)
+  jobs: jobs};                      (*job dependency graph*)
 
-fun make_queue groups jobs focus = Queue {groups = groups, jobs = jobs, focus = focus};
+fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
 
-val empty = make_queue Inttab.empty IntGraph.empty [];
-fun is_empty (Queue {jobs, ...}) = IntGraph.is_empty jobs;
+val empty = make_queue Inttab.empty Task_Graph.empty;
+fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
 
 
 (* enqueue *)
 
-fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, focus}) =
+fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs}) =
   let
-    val task as Task id = new_task ();
+    val task = new_task pri;
     val groups' = Inttab.cons_list (gid, task) groups;
     val jobs' = jobs
-      |> IntGraph.new_node (id, (group, Job (pri, job))) |> fold (add_job task) deps;
-  in (task, make_queue groups' jobs' focus) end;
+      |> Task_Graph.new_node (task, (group, Job job)) |> fold (add_job task) deps;
+  in (task, make_queue groups' jobs') end;
 
-fun depend deps task (Queue {groups, jobs, focus}) =
-  make_queue groups (fold (add_job_acyclic task) deps jobs) focus;
-
-fun focus tasks (Queue {groups, jobs, ...}) =
-  make_queue groups jobs (map_filter (check_job jobs) tasks);
+fun depend deps task (Queue {groups, jobs}) =
+  make_queue groups (fold (add_job_acyclic task) deps jobs);
 
 
 (* dequeue *)
@@ -106,38 +105,30 @@
 local
 
 fun dequeue_result NONE queue = (NONE, queue)
-  | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs, focus}) =
-      (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs) focus);
-
-fun dequeue_global req_pri (queue as Queue {jobs, ...}) =
-  let
-    fun ready (id, ((group as Group (_, ref ok), Job (pri, job)), ([], _))) =
-          if pri = req_pri then SOME (Task id, group, (fn () => job ok)) else NONE
-      | ready _ = NONE;
-  in dequeue_result (IntGraph.get_first ready jobs) queue end;
-
-fun dequeue_local focus (queue as Queue {jobs, ...}) =
-  let
-    fun ready id =
-      (case IntGraph.get_node jobs id of
-        (group as Group (_, ref ok), Job (_, job)) =>
-          if null (IntGraph.imm_preds jobs id) then SOME (Task id, group, (fn () => job ok))
-          else NONE
-      | _ => NONE);
-    val ids = map (fn Task id => id) focus;
-  in dequeue_result (get_first ready (IntGraph.all_preds jobs ids)) queue end;
+  | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) =
+      (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs));
 
 in
 
-fun dequeue (queue as Queue {focus, ...}) =
-  (case dequeue_local focus queue of
-    (NONE, _) =>
-      (case dequeue_global true queue of (NONE, _) => dequeue_global false queue | res => res)
-  | res => res);
+fun dequeue (queue as Queue {jobs, ...}) =
+  let
+    fun ready (task, ((group as Group (_, ref ok), Job job), ([], _))) =
+          SOME (task, group, (fn () => job ok))
+      | ready _ = NONE;
+  in dequeue_result (Task_Graph.get_first ready jobs) queue end;
 
 fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
-  let val tasks' = map_filter (check_job jobs) tasks in
-    (case dequeue_local tasks' queue of
+  let
+    val tasks' = filter (can (Task_Graph.get_node jobs)) tasks;
+
+    fun ready task =
+      (case Task_Graph.get_node jobs task of
+        (group as Group (_, ref ok), Job job) =>
+          if null (Task_Graph.imm_preds jobs task) then SOME (task, group, (fn () => job ok))
+          else NONE
+      | _ => NONE);
+  in
+    (case dequeue_result (get_first ready (Task_Graph.all_preds jobs tasks')) queue of
       (NONE, queue') => (NONE, queue')
     | (SOME work, queue') => (SOME (work, tasks'), queue'))
   end;
@@ -150,8 +141,13 @@
 fun interrupt (Queue {jobs, ...}) task =
   (case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ());
 
-fun interrupt_external queue str =
-  (case Int.fromString str of SOME id => interrupt queue (Task id) | NONE => ());
+fun interrupt_external (queue as Queue {jobs, ...}) str =
+  (case Int.fromString str of
+    SOME i =>
+      (case Task_Graph.get_first
+          (fn (task as Task (_, j), _) => if i = j then SOME task else NONE) jobs
+        of SOME task => interrupt queue task | NONE => ())
+  | NONE => ());
 
 
 (* misc operations *)
@@ -164,12 +160,11 @@
     val _ = List.app SimpleThread.interrupt running;
   in null running end;
 
-fun finish (task as Task id) (Queue {groups, jobs, focus}) =
+fun finish task (Queue {groups, jobs}) =
   let
     val Group (gid, _) = get_group jobs task;
     val groups' = Inttab.remove_list (op =) (gid, task) groups;
-    val jobs' = IntGraph.del_node id jobs;
-    val focus' = remove (op =) task focus;
-  in make_queue groups' jobs' focus' end;
+    val jobs' = Task_Graph.del_node task jobs;
+  in make_queue groups' jobs' end;
 
 end;
--- a/src/Pure/IsaMakefile	Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/IsaMakefile	Tue Dec 16 18:04:31 2008 +0100
@@ -23,26 +23,24 @@
 
 $(OUT)/Pure: Concurrent/ROOT.ML Concurrent/future.ML			\
   Concurrent/mailbox.ML Concurrent/par_list.ML				\
-  Concurrent/par_list_dummy.ML Concurrent/schedule.ML			\
-  Concurrent/simple_thread.ML Concurrent/synchronized.ML		\
-  Concurrent/task_queue.ML General/ROOT.ML General/alist.ML		\
-  General/balanced_tree.ML General/basics.ML General/buffer.ML		\
-  General/file.ML General/graph.ML General/heap.ML General/integer.ML	\
-  General/lazy.ML General/markup.ML General/name_space.ML		\
-  General/ord_list.ML General/output.ML General/path.ML			\
-  General/position.ML General/pretty.ML General/print_mode.ML		\
-  General/properties.ML General/queue.ML General/scan.ML		\
-  General/secure.ML General/seq.ML General/source.ML General/stack.ML	\
-  General/symbol.ML General/symbol_pos.ML General/table.ML		\
-  General/url.ML General/xml.ML General/yxml.ML Isar/ROOT.ML		\
-  Isar/antiquote.ML Isar/args.ML Isar/attrib.ML Isar/auto_bind.ML	\
-  Isar/calculation.ML Isar/class.ML Isar/code.ML Isar/code_unit.ML	\
-  Isar/constdefs.ML Isar/context_rules.ML Isar/element.ML		\
-  Isar/expression.ML							\
-  Isar/find_theorems.ML Isar/instance.ML Isar/isar.ML Isar/isar_cmd.ML	\
-  Isar/isar_syn.ML Isar/local_defs.ML Isar/local_syntax.ML		\
-  Isar/local_theory.ML Isar/locale.ML Isar/method.ML Isar/net_rules.ML	\
-  Isar/new_locale.ML    \
+  Concurrent/par_list_dummy.ML Concurrent/simple_thread.ML		\
+  Concurrent/synchronized.ML Concurrent/task_queue.ML General/ROOT.ML	\
+  General/alist.ML General/balanced_tree.ML General/basics.ML		\
+  General/buffer.ML General/file.ML General/graph.ML General/heap.ML	\
+  General/integer.ML General/lazy.ML General/markup.ML			\
+  General/name_space.ML General/ord_list.ML General/output.ML		\
+  General/path.ML General/position.ML General/pretty.ML			\
+  General/print_mode.ML General/properties.ML General/queue.ML		\
+  General/scan.ML General/secure.ML General/seq.ML General/source.ML	\
+  General/stack.ML General/symbol.ML General/symbol_pos.ML		\
+  General/table.ML General/url.ML General/xml.ML General/yxml.ML	\
+  Isar/ROOT.ML Isar/antiquote.ML Isar/args.ML Isar/attrib.ML		\
+  Isar/auto_bind.ML Isar/calculation.ML Isar/class.ML Isar/code.ML	\
+  Isar/code_unit.ML Isar/constdefs.ML Isar/context_rules.ML		\
+  Isar/element.ML Isar/expression.ML Isar/find_theorems.ML		\
+  Isar/instance.ML Isar/isar.ML Isar/isar_cmd.ML Isar/isar_syn.ML	\
+  Isar/local_defs.ML Isar/local_syntax.ML Isar/local_theory.ML		\
+  Isar/locale.ML Isar/method.ML Isar/net_rules.ML Isar/new_locale.ML	\
   Isar/object_logic.ML Isar/obtain.ML Isar/outer_keyword.ML		\
   Isar/outer_lex.ML Isar/outer_parse.ML Isar/outer_syntax.ML		\
   Isar/overloading.ML Isar/proof.ML Isar/proof_context.ML		\
@@ -76,17 +74,16 @@
   Syntax/syn_trans.ML Syntax/syntax.ML Syntax/type_ext.ML Thy/html.ML	\
   Thy/latex.ML Thy/present.ML Thy/term_style.ML Thy/thm_deps.ML		\
   Thy/thy_edit.ML Thy/thy_header.ML Thy/thy_info.ML Thy/thy_load.ML	\
-  Thy/thy_output.ML Tools/ROOT.ML Tools/invoke.ML 	\
-  Tools/isabelle_process.ML Tools/named_thms.ML		\
-  Tools/xml_syntax.ML assumption.ML axclass.ML codegen.ML config.ML	\
-  conjunction.ML consts.ML context.ML context_position.ML conv.ML	\
-  defs.ML display.ML drule.ML envir.ML facts.ML goal.ML			\
-  interpretation.ML library.ML logic.ML meta_simplifier.ML more_thm.ML	\
-  morphism.ML name.ML net.ML old_goals.ML pattern.ML primitive_defs.ML	\
-  proofterm.ML pure_setup.ML pure_thy.ML search.ML sign.ML		\
-  simplifier.ML sorts.ML subgoal.ML tactic.ML tctical.ML term.ML	\
-  term_subst.ML theory.ML thm.ML type.ML type_infer.ML unify.ML		\
-  variable.ML ../Tools/quickcheck.ML
+  Thy/thy_output.ML Tools/ROOT.ML Tools/invoke.ML			\
+  Tools/isabelle_process.ML Tools/named_thms.ML Tools/xml_syntax.ML	\
+  assumption.ML axclass.ML codegen.ML config.ML conjunction.ML		\
+  consts.ML context.ML context_position.ML conv.ML defs.ML display.ML	\
+  drule.ML envir.ML facts.ML goal.ML interpretation.ML library.ML	\
+  logic.ML meta_simplifier.ML more_thm.ML morphism.ML name.ML net.ML	\
+  old_goals.ML pattern.ML primitive_defs.ML proofterm.ML pure_setup.ML	\
+  pure_thy.ML search.ML sign.ML simplifier.ML sorts.ML subgoal.ML	\
+  tactic.ML tctical.ML term.ML term_subst.ML theory.ML thm.ML type.ML	\
+  type_infer.ML unify.ML variable.ML ../Tools/quickcheck.ML
 	@./mk
 
 
--- a/src/Pure/Isar/toplevel.ML	Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/Isar/toplevel.ML	Tue Dec 16 18:04:31 2008 +0100
@@ -718,7 +718,7 @@
 
         val future_proof = Proof.future_proof
           (fn prf =>
-            Future.fork_background (fn () =>
+            Future.fork_pri 1 (fn () =>
               let val (states, State (result_node, _)) =
                 (case st' of State (SOME (Proof (_, (_, orig_gthy)), exit), prev)
                   => State (SOME (Proof (ProofNode.init prf, (finish, orig_gthy)), exit), prev))
--- a/src/Pure/Thy/thy_info.ML	Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/Thy/thy_info.ML	Tue Dec 16 18:04:31 2008 +0100
@@ -315,7 +315,13 @@
 datatype task = Task of (unit -> unit) | Finished | Running;
 fun task_finished Finished = true | task_finished _ = false;
 
-fun future_schedule task_graph =
+local
+
+fun schedule_seq tasks =
+  Graph.topological_order tasks
+  |> List.app (fn name => (case Graph.get_node tasks name of Task body => body () | _ => ()));
+
+fun schedule_futures task_graph =
   let
     val tasks = Graph.topological_order task_graph |> map_filter (fn name =>
       (case Graph.get_node task_graph name of Task body => SOME (name, body) | _ => NONE));
@@ -339,45 +345,14 @@
     val proof_results = PureThy.join_proofs (map_filter (try get_theory o #1) tasks);
   in ignore (Exn.release_all (thy_results @ proof_results)) end;
 
-local
-
-fun max_task (name, (Task body, m)) NONE = SOME (name: string, (body, m))
-  | max_task (name, (Task body, m)) (task' as SOME (name', (_, m'))) =
-      if m > m' orelse m = m' andalso name < name' then SOME (name, (body, m)) else task'
-  | max_task _ task' = task';
-
-fun next_task G =
-  let
-    val tasks = Graph.minimals G |> map (fn name =>
-      (name, (Graph.get_node G name, length (Graph.imm_succs G name))));
-    val finished = filter (task_finished o fst o snd) tasks;
-  in
-    if not (null finished) then next_task (Graph.del_nodes (map fst finished) G)
-    else if null tasks then (Schedule.Terminate, G)
-    else
-      (case fold max_task tasks NONE of
-        NONE => (Schedule.Wait, G)
-      | SOME (name, (body, _)) =>
-         (Schedule.Task {body = PrintMode.closure body,
-           cont = Graph.del_nodes [name], fail = K Graph.empty},
-          Graph.map_node name (K Running) G))
-  end;
-
-fun schedule_seq tasks =
-  Graph.topological_order tasks
-  |> List.app (fn name => (case Graph.get_node tasks name of Task body => body () | _ => ()));
-
 in
 
 fun schedule_tasks tasks n =
-  let val m = Multithreading.max_threads_value () in
-    if m <= 1 then schedule_seq tasks
-    else if Multithreading.self_critical () then
+  if not (Multithreading.enabled ()) then schedule_seq tasks
+  else if Multithreading.self_critical () then
      (warning (loader_msg "no multithreading within critical section" []);
       schedule_seq tasks)
-    else if Future.enabled () then future_schedule tasks
-    else ignore (Exn.release_all (map Exn.Exn (Schedule.schedule (Int.min (m, n)) next_task tasks)))
-  end;
+  else schedule_futures tasks;
 
 end;
 
--- a/src/Pure/goal.ML	Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/goal.ML	Tue Dec 16 18:04:31 2008 +0100
@@ -179,7 +179,7 @@
     val res =
       if immediate orelse #maxidx (Thm.rep_cterm stmt) >= 0 orelse not (Future.enabled ())
       then result ()
-      else future_result ctxt' (Future.fork_background result) (Thm.term_of stmt);
+      else future_result ctxt' (Future.fork_pri 1 result) (Thm.term_of stmt);
   in
     Conjunction.elim_balanced (length props) res
     |> map (Assumption.export false ctxt' ctxt)
--- a/src/Pure/pure_setup.ML	Tue Dec 16 08:46:07 2008 +0100
+++ b/src/Pure/pure_setup.ML	Tue Dec 16 18:04:31 2008 +0100
@@ -1,5 +1,4 @@
 (*  Title:      Pure/pure_setup.ML
-    ID:         $Id$
     Author:     Makarius
 
 Pure theory and ML toplevel setup.
@@ -28,8 +27,8 @@
 
 (* ML toplevel pretty printing *)
 
-install_pp (make_pp ["TaskQueue", "task"] (Pretty.pprint o Pretty.str o TaskQueue.str_of_task));
-install_pp (make_pp ["TaskQueue", "group"] (Pretty.pprint o Pretty.str o TaskQueue.str_of_group));
+install_pp (make_pp ["Task_Queue", "task"] (Pretty.pprint o Pretty.str o Task_Queue.str_of_task));
+install_pp (make_pp ["Task_Queue", "group"] (Pretty.pprint o Pretty.str o Task_Queue.str_of_group));
 install_pp (make_pp ["Position", "T"] (Pretty.pprint o Pretty.enum "," "{" "}" o
   map (fn (x, y) => Pretty.str (x ^ "=" ^ y)) o Position.properties_of));
 install_pp (make_pp ["Thm", "thm"] ProofDisplay.pprint_thm);