future tasks: support boolean priorities (true = high, false = low/irrelevant);
authorwenzelm
Fri, 19 Sep 2008 21:22:31 +0200
changeset 28304 4b0477452943
parent 28303 8c4a4f256c16
child 28305 5097b8c0f59f
future tasks: support boolean priorities (true = high, false = low/irrelevant);
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/par_list.ML
src/Pure/Concurrent/task_queue.ML
src/Pure/Thy/thy_info.ML
--- a/src/Pure/Concurrent/future.ML	Fri Sep 19 21:00:50 2008 +0200
+++ b/src/Pure/Concurrent/future.ML	Fri Sep 19 21:22:31 2008 +0200
@@ -32,8 +32,9 @@
   type 'a T
   val task_of: 'a T -> task
   val group_of: 'a T -> group
-  val future: group option -> task list -> (unit -> 'a) -> 'a T
+  val future: group option -> task list -> bool -> (unit -> 'a) -> 'a T
   val fork: (unit -> 'a) -> 'a T
+  val fork_irrelevant: (unit -> 'a) -> 'a T
   val join_results: 'a T list -> 'a Exn.result list
   val join: 'a T -> 'a
   val focus: task list -> unit
@@ -209,7 +210,7 @@
 
 (* future values: fork independent computation *)
 
-fun future opt_group deps (e: unit -> 'a) =
+fun future opt_group deps pri (e: unit -> 'a) =
   let
     val _ = scheduler_check ();
 
@@ -222,10 +223,11 @@
         in result := SOME res; is_some (Exn.get_result res) end);
 
     val task = SYNCHRONIZED "future" (fn () =>
-      change_result queue (TaskQueue.enqueue group deps run) before notify_all ());
+      change_result queue (TaskQueue.enqueue group deps pri run) before notify_all ());
   in Future {task = task, group = group, result = result} end;
 
-fun fork e = future (Option.map #2 (thread_data ())) [] e;
+fun fork e = future (Option.map #2 (thread_data ())) [] true e;
+fun fork_irrelevant e = future (Option.map #2 (thread_data ())) [] false e;
 
 
 (* join: retrieve results *)
--- a/src/Pure/Concurrent/par_list.ML	Fri Sep 19 21:00:50 2008 +0200
+++ b/src/Pure/Concurrent/par_list.ML	Fri Sep 19 21:22:31 2008 +0200
@@ -32,8 +32,9 @@
   | proper_exn (Exn.Exn exn) = SOME exn;
 
 fun raw_map f xs =
-  let val group = TaskQueue.new_group ()
-  in Future.join_results (List.map (fn x => Future.future (SOME group) [] (fn () => f x)) xs) end;
+  let val group = TaskQueue.new_group () in
+    Future.join_results (List.map (fn x => Future.future (SOME group) [] true (fn () => f x)) xs)
+  end;
 
 fun map f xs =
   let val results = raw_map f xs in
--- a/src/Pure/Concurrent/task_queue.ML	Fri Sep 19 21:00:50 2008 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Fri Sep 19 21:22:31 2008 +0200
@@ -15,7 +15,7 @@
   type queue
   val empty: queue
   val is_empty: queue -> bool
-  val enqueue: group -> task list -> (bool -> bool) -> queue -> task * queue
+  val enqueue: group -> task list -> bool -> (bool -> bool) -> queue -> task * queue
   val depend: task list -> task -> queue -> queue
   val focus: task list -> queue -> queue
   val dequeue: queue -> (task * group * (unit -> bool)) option * queue
@@ -44,7 +44,7 @@
 (* jobs *)
 
 datatype job =
-  Job of bool -> bool |
+  Job of bool * (bool -> bool) |   (*priority, job: status -> status*)
   Running of Thread.thread;
 
 type jobs = (group * job) IntGraph.T;
@@ -75,13 +75,13 @@
 
 (* enqueue *)
 
-fun enqueue (group as Group (gid, _)) deps job (Queue {groups, jobs, focus}) =
+fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, focus}) =
   let
     val id = serial ();
     val task = Task id;
     val groups' = Inttab.cons_list (gid, task) groups;
     val jobs' = jobs
-      |> IntGraph.new_node (id, (group, Job job)) |> fold (add_job task) deps;
+      |> IntGraph.new_node (id, (group, Job (pri, job))) |> fold (add_job task) deps;
   in (task, make_queue groups' jobs' focus) end;
 
 fun depend deps task (Queue {groups, jobs, focus}) =
@@ -99,10 +99,10 @@
   | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs, focus}) =
       (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs) focus);
 
-fun dequeue_global (queue as Queue {jobs, ...}) =
+fun dequeue_global req_pri (queue as Queue {jobs, ...}) =
   let
-    fun ready (id, ((group as Group (_, ref ok), Job job), ([], _))) =
-          SOME (Task id, group, (fn () => job ok))
+    fun ready (id, ((group as Group (_, ref ok), Job (pri, job)), ([], _))) =
+          if pri = req_pri then SOME (Task id, group, (fn () => job ok)) else NONE
       | ready _ = NONE;
   in dequeue_result (IntGraph.get_first ready jobs) queue end;
 
@@ -110,7 +110,7 @@
   let
     fun ready id =
       (case IntGraph.get_node jobs id of
-        (group as Group (_, ref ok), Job job) =>
+        (group as Group (_, ref ok), Job (_, job)) =>
           if null (IntGraph.imm_preds jobs id) then SOME (Task id, group, (fn () => job ok))
           else NONE
       | _ => NONE);
@@ -120,7 +120,10 @@
 in
 
 fun dequeue (queue as Queue {focus, ...}) =
-  (case dequeue_local focus queue of (NONE, _) => dequeue_global queue | res => res);
+  (case dequeue_local focus queue of
+    (NONE, _) =>
+      (case dequeue_global true queue of (NONE, _) => dequeue_global false queue | res => res)
+  | res => res);
 
 fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
   dequeue_local (map_filter (check_job jobs) tasks) queue;
--- a/src/Pure/Thy/thy_info.ML	Fri Sep 19 21:00:50 2008 +0200
+++ b/src/Pure/Thy/thy_info.ML	Fri Sep 19 21:22:31 2008 +0200
@@ -327,7 +327,7 @@
     fun future (name, body) tab =
       let
         val deps = map_filter (Symtab.lookup tab) (Graph.imm_preds task_graph name);
-        val x = Future.future (SOME group) deps body;
+        val x = Future.future (SOME group) deps true body;
       in (x, Symtab.update (name, Future.task_of x) tab) end;
 
     val exns = #1 (fold_map future tasks Symtab.empty)