tuned check_cache;
authorwenzelm
Mon Sep 08 00:10:41 2008 +0200 (2008-09-08 ago)
changeset 2816255772e4e95e0
parent 28161 7718587e510e
child 28163 8bf8c21296ca
tuned check_cache;
removed broken self_synchronized, which cannot be used in conjunction with condition variables;
more precise use of SYNCHRONIZED vs. wait;
tuned worker_loop;
src/Pure/Concurrent/future.ML
     1.1 --- a/src/Pure/Concurrent/future.ML	Sun Sep 07 22:20:15 2008 +0200
     1.2 +++ b/src/Pure/Concurrent/future.ML	Mon Sep 08 00:10:41 2008 +0200
     1.3 @@ -22,27 +22,16 @@
     1.4  (* synchronized execution *)
     1.5  
     1.6  local
     1.7 -  val thread = ref (NONE: Thread.thread option);
     1.8    val lock = Mutex.mutex ();
     1.9    val cond = ConditionVar.conditionVar ();
    1.10  in
    1.11  
    1.12 -fun self_synchronized () =
    1.13 -  (case ! thread of
    1.14 -    NONE => false
    1.15 -  | SOME t => Thread.equal (t, Thread.self ()));
    1.16 -
    1.17 -fun SYNCHRONIZED e =
    1.18 -  if self_synchronized () then e ()
    1.19 -  else
    1.20 -    uninterruptible (fn restore_attributes => fn () =>
    1.21 -      let
    1.22 -        val _ = Mutex.lock lock;
    1.23 -        val _ = thread := SOME (Thread.self ());
    1.24 -        val result = Exn.capture (restore_attributes e) ();
    1.25 -        val _ = thread := NONE;
    1.26 -        val _ = Mutex.unlock lock;
    1.27 -      in Exn.release result end) ();
    1.28 +fun SYNCHRONIZED e = uninterruptible (fn restore_attributes => fn () =>
    1.29 +  let
    1.30 +    val _ = Mutex.lock lock;
    1.31 +    val result = Exn.capture (restore_attributes e) ();
    1.32 +    val _ = Mutex.unlock lock;
    1.33 +  in Exn.release result end) ();
    1.34  
    1.35  fun wait () = ConditionVar.wait (cond, lock);
    1.36  fun wait_timeout timeout = ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
    1.37 @@ -78,12 +67,8 @@
    1.38  fun check_cache (queue as Queue (tasks, cache)) =
    1.39    if not (Queue.is_empty cache) then queue
    1.40    else
    1.41 -    let
    1.42 -      val cache' = fold (fn id =>
    1.43 -        (case IntGraph.get_node tasks id of
    1.44 -          Task task => Queue.enqueue (id, task)
    1.45 -        | Running _ => I)) (IntGraph.minimals tasks) Queue.empty;
    1.46 -    in Queue (tasks, cache') end;
    1.47 +    let fun ready (id, (Task task, ([], _))) = Queue.enqueue (id, task) | ready _ = I
    1.48 +    in Queue (tasks, IntGraph.fold ready tasks Queue.empty) end;
    1.49  
    1.50  val next_task = check_cache #> (fn queue as Queue (tasks, cache) =>
    1.51    if Queue.is_empty cache then (NONE, queue)
    1.52 @@ -109,17 +94,6 @@
    1.53  
    1.54  (* global state *)
    1.55  
    1.56 -local val active = ref 0 in
    1.57 -
    1.58 -fun change_active b = SYNCHRONIZED (fn () =>
    1.59 -  let
    1.60 -    val _ = change active (fn n => if b then n + 1 else n - 1);
    1.61 -    val n = ! active;
    1.62 -    val _ = Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ string_of_int n ^ " active tasks");
    1.63 -  in () end);
    1.64 -
    1.65 -end;
    1.66 -
    1.67  val tasks = ref empty_queue;
    1.68  val scheduler = ref (NONE: Thread.thread option);
    1.69  val workers = ref ([]: Thread.thread list);
    1.70 @@ -133,33 +107,46 @@
    1.71  
    1.72  (* worker thread *)
    1.73  
    1.74 +local val active = ref 0 in
    1.75 +
    1.76 +fun change_active b = (*requires SYNCHRONIZED*)
    1.77 +  let
    1.78 +    val _ = change active (fn n => if b then n + 1 else n - 1);
    1.79 +    val n = ! active;
    1.80 +    val _ = Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ string_of_int n ^ " active");
    1.81 +  in () end;
    1.82 +
    1.83 +end;
    1.84 +
    1.85  fun excessive_threads () = false;  (* FIXME *)
    1.86  
    1.87 -fun worker_stop () =
    1.88 -  (change_active false; change workers (filter (fn t => not (Thread.equal (t, Thread.self ())))));
    1.89 +fun worker_stop () = SYNCHRONIZED (fn () =>
    1.90 +  (change_active false; change workers (filter (fn t => not (Thread.equal (t, Thread.self ()))))));
    1.91  
    1.92 -fun worker_wait () =
    1.93 -  (change_active false; wait (); change_active true);
    1.94 +fun worker_wait () = SYNCHRONIZED (fn () =>
    1.95 +  (change_active false; wait (); change_active true));
    1.96  
    1.97  fun worker_loop () =
    1.98 -  (case SYNCHRONIZED (fn () => change_result tasks next_task) of
    1.99 -    SOME (id, task) =>
   1.100 -      let
   1.101 -        val _ = SYNCHRONIZED (fn () => change tasks (running_task id (Thread.self ())));
   1.102 -        val _ = task ();
   1.103 -        val _ = SYNCHRONIZED (fn () => change tasks (finished_task id));
   1.104 -        val _ = notify_all ();
   1.105 -      in if excessive_threads () then worker_stop () else worker_loop () end
   1.106 -  | NONE => (worker_wait (); worker_loop ()));
   1.107 +  if excessive_threads () then worker_stop ()
   1.108 +  else
   1.109 +    (case SYNCHRONIZED (fn () => change_result tasks next_task) of
   1.110 +      NONE => (worker_wait (); worker_loop ())
   1.111 +    | SOME (id, task) =>
   1.112 +        let
   1.113 +          val _ = SYNCHRONIZED (fn () => change tasks (running_task id (Thread.self ())));
   1.114 +          val _ = task ();
   1.115 +          val _ = SYNCHRONIZED (fn () => change tasks (finished_task id));
   1.116 +          val _ = notify_all ();
   1.117 +        in worker_loop () end);
   1.118  
   1.119 -fun worker_start () =
   1.120 +fun worker_start () = SYNCHRONIZED (fn () =>
   1.121   (change_active true;
   1.122 -  change workers (cons (Thread.fork (worker_loop, Multithreading.no_interrupts))));
   1.123 +  change workers (cons (Thread.fork (worker_loop, Multithreading.no_interrupts)))));
   1.124  
   1.125  
   1.126  (* scheduler *)
   1.127  
   1.128 -fun scheduler_loop () =
   1.129 +fun scheduler_loop () = (*requires SYNCHRONIZED*)
   1.130    let
   1.131      val m = Multithreading.max_threads_value ();
   1.132      val k = m - length (! workers);
   1.133 @@ -174,7 +161,8 @@
   1.134        | SOME t => Thread.isActive t);
   1.135    in
   1.136      if scheduler_active then ()
   1.137 -    else scheduler := SOME (Thread.fork (SYNCHRONIZED scheduler_loop, Multithreading.no_interrupts))
   1.138 +    else scheduler :=
   1.139 +      SOME (Thread.fork (SYNCHRONIZED o scheduler_loop, Multithreading.no_interrupts))
   1.140    end);
   1.141  
   1.142  
   1.143 @@ -187,9 +175,11 @@
   1.144      val r = ref (NONE: 'a Exn.result option);
   1.145      val task = Multithreading.with_attributes (Thread.getAttributes ())
   1.146        (fn _ => fn () => r := SOME (Exn.capture e ()));
   1.147 +
   1.148      val id = serial ();
   1.149      val _ = SYNCHRONIZED (fn () => change tasks (new_task deps id task));
   1.150      val _ = notify_all ();
   1.151 +
   1.152    in Future (id, r) end;
   1.153  
   1.154  fun future e = dependent_future [] e;