merged
authorwenzelm
Tue, 06 Jan 2009 14:45:45 +0100
changeset 29369 393f72663b49
parent 29364 cea7b4034461 (current diff)
parent 29368 503ce3f8f092 (diff)
child 29370 98aaf2cd873f
merged
--- a/src/HOL/Tools/res_axioms.ML	Tue Jan 06 13:36:42 2009 +0100
+++ b/src/HOL/Tools/res_axioms.ML	Tue Jan 06 14:45:45 2009 +0100
@@ -456,7 +456,7 @@
           |> fold (mark_seen o #1) new_facts
           |> fold_map skolem_def (sort_distinct (Thm.thm_ord o pairself snd) new_thms)
           |>> map_filter I;
-        val cache_entries = ParList.map skolem_cnfs defs;
+        val cache_entries = Par_List.map skolem_cnfs defs;
       in SOME (fold update_cache cache_entries thy') end
   end;
 
--- a/src/Pure/Concurrent/future.ML	Tue Jan 06 13:36:42 2009 +0100
+++ b/src/Pure/Concurrent/future.ML	Tue Jan 06 14:45:45 2009 +0100
@@ -137,15 +137,16 @@
   change workers (AList.update Thread.equal (Thread.self (), active));
 
 
-(* execute *)
+(* execute jobs *)
 
 fun do_cancel group = (*requires SYNCHRONIZED*)
   change canceled (insert Task_Queue.eq_group group);
 
-fun execute name (task, group, run) =
+fun execute name (task, group, jobs) =
   let
     val _ = trace_active ();
-    val ok = setmp_thread_data (name, task) run ();
+    val ok = setmp_thread_data (name, task) (fn () =>
+      fold (fn job => fn ok => job ok) jobs (Task_Queue.is_valid group)) ();
     val _ = SYNCHRONIZED "execute" (fn () =>
      (change queue (Task_Queue.finish task);
       if ok then ()
@@ -225,16 +226,15 @@
   else ());
 
 
-(* future values: fork independent computation *)
+
+(** futures **)
 
-fun future opt_group deps pri (e: unit -> 'a) =
+(* future job: fill result *)
+
+fun future_job group (e: unit -> 'a) =
   let
-    val _ = scheduler_check "future check";
-
-    val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());
-
     val result = ref (NONE: 'a Exn.result option);
-    val run = Multithreading.with_attributes (Thread.getAttributes ())
+    val job = Multithreading.with_attributes (Thread.getAttributes ())
       (fn _ => fn ok =>
         let
           val res = if ok then Exn.capture e () else Exn.Exn Exn.Interrupt;
@@ -245,63 +245,88 @@
             | Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true)
             | _ => false);
         in res_ok end);
+  in (result, job) end;
 
+
+(* fork *)
+
+fun fork_future opt_group deps pri e =
+  let
+    val _ = scheduler_check "future check";
+
+    val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());
+    val (result, job) = future_job group e;
     val task = SYNCHRONIZED "future" (fn () =>
-      change_result queue (Task_Queue.enqueue group deps pri run) before notify_all ());
+      change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ());
   in Future {task = task, group = group, result = result} end;
 
-fun fork e = future NONE [] 0 e;
-fun fork_group group e = future (SOME group) [] 0 e;
-fun fork_deps deps e = future NONE (map task_of deps) 0 e;
-fun fork_pri pri e = future NONE [] pri e;
+fun fork e = fork_future NONE [] 0 e;
+fun fork_group group e = fork_future (SOME group) [] 0 e;
+fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
+fun fork_pri pri e = fork_future NONE [] pri e;
 
 
-(* join: retrieve results *)
+(* join *)
+
+fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);
 
-fun join_results [] = []
-  | join_results xs = uninterruptible (fn _ => fn () =>
-      let
-        val _ = scheduler_check "join check";
-        val _ = Multithreading.self_critical () andalso
-          exists (not o is_finished) xs andalso
-          error "Cannot join future values within critical section";
+fun join_results xs =
+  if forall is_finished xs then map get_result xs
+  else uninterruptible (fn _ => fn () =>
+    let
+      val _ = scheduler_check "join check";
+      val _ = Multithreading.self_critical () andalso
+        error "Cannot join future values within critical section";
 
-        fun join_loop _ [] = ()
-          | join_loop name tasks =
-              (case SYNCHRONIZED name (fn () =>
-                  change_result queue (Task_Queue.dequeue_towards tasks)) of
-                NONE => ()
-              | SOME (work, tasks') => (execute name work; join_loop name tasks'));
-        val _ =
-          (case thread_data () of
-            NONE =>
-              (*alien thread -- refrain from contending for resources*)
-              while exists (not o is_finished) xs
-              do SYNCHRONIZED "join_thread" (fn () => wait ())
-          | SOME (name, task) =>
-              (*proper task -- actively work towards results*)
-              let
-                val unfinished = xs |> map_filter
-                  (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
-                val _ = SYNCHRONIZED "join" (fn () =>
-                  (change queue (Task_Queue.depend unfinished task); notify_all ()));
-                val _ = join_loop ("join_loop: " ^ name) unfinished;
-                val _ =
-                  while exists (not o is_finished) xs
-                  do SYNCHRONIZED "join_task" (fn () => worker_wait ());
-              in () end);
+      fun join_loop _ [] = ()
+        | join_loop name deps =
+            (case SYNCHRONIZED name (fn () =>
+                change_result queue (Task_Queue.dequeue_towards deps)) of
+              NONE => ()
+            | SOME (work, deps') => (execute name work; join_loop name deps'));
+      val _ =
+        (case thread_data () of
+          NONE =>
+            (*alien thread -- refrain from contending for resources*)
+            while not (forall is_finished xs)
+            do SYNCHRONIZED "join_thread" (fn () => wait ())
+        | SOME (name, task) =>
+            (*proper task -- actively work towards results*)
+            let
+              val pending = filter_out is_finished xs;
+              val deps = map task_of pending;
+              val _ = SYNCHRONIZED "join" (fn () =>
+                (change queue (Task_Queue.depend deps task); notify_all ()));
+              val _ = join_loop ("join_loop: " ^ name) deps;
+              val _ =
+                while not (forall is_finished pending)
+                do SYNCHRONIZED "join_task" (fn () => worker_wait ());
+            in () end);
 
-      in xs |> map (fn Future {result = ref (SOME res), ...} => res) end) ();
+    in map get_result xs end) ();
 
 fun join_result x = singleton join_results x;
 fun join x = Exn.release (join_result x);
 
-fun map f x =
-  let val task = task_of x
-  in future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) end;
+
+(* map *)
+
+fun map_future f (x as Future {task, group, ...}) =
+  let
+    val _ = scheduler_check "map_future check";
+
+    val (result', job) = future_job group (fn () => f (join x));
+    val extended = SYNCHRONIZED "map_future" (fn () =>
+      (case Task_Queue.extend task job (! queue) of
+        SOME queue' => (queue := queue'; true)
+      | NONE => false));
+  in
+    if extended then Future {task = task, group = group, result = result'}
+    else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
+  end;
 
 
-(* misc operations *)
+(* cancel *)
 
 (*interrupt: permissive signal, may get ignored*)
 fun interrupt_task id = SYNCHRONIZED "interrupt"
@@ -313,7 +338,9 @@
   SYNCHRONIZED "cancel" (fn () => (do_cancel (group_of x); notify_all ())));
 
 
-(*global join and shutdown*)
+
+(** global join and shutdown **)
+
 fun shutdown () =
   if Multithreading.available then
    (scheduler_check "shutdown check";
@@ -327,6 +354,10 @@
       OS.Process.sleep (Time.fromMilliseconds 300))))
   else ();
 
+
+(*final declarations of this structure!*)
+val map = map_future;
+
 end;
 
 type 'a future = 'a Future.future;
--- a/src/Pure/Concurrent/par_list.ML	Tue Jan 06 13:36:42 2009 +0100
+++ b/src/Pure/Concurrent/par_list.ML	Tue Jan 06 14:45:45 2009 +0100
@@ -1,5 +1,4 @@
 (*  Title:      Pure/Concurrent/par_list.ML
-    ID:         $Id$
     Author:     Makarius
 
 Parallel list combinators.
@@ -24,7 +23,7 @@
   val forall: ('a -> bool) -> 'a list -> bool
 end;
 
-structure ParList: PAR_LIST =
+structure Par_List: PAR_LIST =
 struct
 
 fun raw_map f xs =
--- a/src/Pure/Concurrent/par_list_dummy.ML	Tue Jan 06 13:36:42 2009 +0100
+++ b/src/Pure/Concurrent/par_list_dummy.ML	Tue Jan 06 14:45:45 2009 +0100
@@ -1,11 +1,10 @@
 (*  Title:      Pure/Concurrent/par_list_dummy.ML
-    ID:         $Id$
     Author:     Makarius
 
 Dummy version of parallel list combinators -- plain sequential evaluation.
 *)
 
-structure ParList: PAR_LIST =
+structure Par_List: PAR_LIST =
 struct
 
 val map = map;
--- a/src/Pure/Concurrent/task_queue.ML	Tue Jan 06 13:36:42 2009 +0100
+++ b/src/Pure/Concurrent/task_queue.ML	Tue Jan 06 14:45:45 2009 +0100
@@ -13,16 +13,18 @@
   type group
   val eq_group: group * group -> bool
   val new_group: unit -> group
+  val is_valid: group -> bool
   val invalidate_group: group -> unit
   val str_of_group: group -> string
   type queue
   val empty: queue
   val is_empty: queue -> bool
   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue
+  val extend: task -> (bool -> bool) -> queue -> queue option
   val depend: task list -> task -> queue -> queue
-  val dequeue: queue -> (task * group * (unit -> bool)) option * queue
+  val dequeue: queue -> (task * group * (bool -> bool) list) option * queue
   val dequeue_towards: task list -> queue ->
-    (((task * group * (unit -> bool)) * task list) option * queue)
+    (((task * group * (bool -> bool) list) * task list) option * queue)
   val interrupt: queue -> task -> unit
   val interrupt_external: queue -> string -> unit
   val cancel: queue -> group -> bool
@@ -52,6 +54,7 @@
 
 fun new_group () = Group (serial (), ref true);
 
+fun is_valid (Group (_, ref ok)) = ok;
 fun invalidate_group (Group (_, ok)) = ok := false;
 
 fun str_of_group (Group (i, ref ok)) =
@@ -61,14 +64,14 @@
 (* jobs *)
 
 datatype job =
-  Job of bool -> bool |
+  Job of (bool -> bool) list |
   Running of Thread.thread;
 
 type jobs = (group * job) Task_Graph.T;
 
 fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
 fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
-fun map_job task f (jobs: jobs) = Task_Graph.map_node task (apsnd f) jobs;
+fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs;
 
 fun add_job task dep (jobs: jobs) =
   Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
@@ -96,9 +99,14 @@
     val task = new_task pri;
     val groups' = Inttab.cons_list (gid, task) groups;
     val jobs' = jobs
-      |> Task_Graph.new_node (task, (group, Job job)) |> fold (add_job task) deps;
+      |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps;
   in (task, make_queue groups' jobs') end;
 
+fun extend task job (Queue {groups, jobs}) =
+  (case try (get_job jobs) task of
+    SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs))
+  | _ => NONE);
+
 fun depend deps task (Queue {groups, jobs}) =
   make_queue groups (fold (add_job_acyclic task) deps jobs);
 
@@ -109,14 +117,13 @@
 
 fun dequeue_result NONE queue = (NONE, queue)
   | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) =
-      (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs));
+      (SOME result, make_queue groups (set_job task (Running (Thread.self ())) jobs));
 
 in
 
 fun dequeue (queue as Queue {jobs, ...}) =
   let
-    fun ready (task, ((group as Group (_, ref ok), Job job), ([], _))) =
-          SOME (task, group, (fn () => job ok))
+    fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
       | ready _ = NONE;
   in dequeue_result (Task_Graph.get_first ready jobs) queue end;
 
@@ -126,8 +133,8 @@
 
     fun ready task =
       (case Task_Graph.get_node jobs task of
-        (group as Group (_, ref ok), Job job) =>
-          if null (Task_Graph.imm_preds jobs task) then SOME (task, group, (fn () => job ok))
+        (group, Job list) =>
+          if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list)
           else NONE
       | _ => NONE);
   in
--- a/src/Pure/context.ML	Tue Jan 06 13:36:42 2009 +0100
+++ b/src/Pure/context.ML	Tue Jan 06 14:45:45 2009 +0100
@@ -132,7 +132,15 @@
 
 val copy_data = Datatab.map' invoke_copy;
 val extend_data = Datatab.map' invoke_extend;
-fun merge_data pp = Datatab.join (invoke_merge pp) o pairself extend_data;
+
+fun merge_data pp (data1, data2) =
+  Datatab.keys (Datatab.merge (K true) (data1, data2))
+  |> Par_List.map (fn k =>
+    (case (Datatab.lookup data1 k, Datatab.lookup data2 k) of
+      (SOME x, NONE) => (k, invoke_extend k x)
+    | (NONE, SOME y) => (k, invoke_extend k y)
+    | (SOME x, SOME y) => (k, invoke_merge pp k (invoke_extend k x, invoke_extend k y))))
+  |> Datatab.make;
 
 end;