tuned check_cache;
authorwenzelm
Mon, 08 Sep 2008 00:10:41 +0200
changeset 28162 55772e4e95e0
parent 28161 7718587e510e
child 28163 8bf8c21296ca
tuned check_cache; removed broken self_synchronized, which cannot be used in conjunction with condition variables; more precise use of SYNCHRONIZED vs. wait; tuned worker_loop;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Sun Sep 07 22:20:15 2008 +0200
+++ b/src/Pure/Concurrent/future.ML	Mon Sep 08 00:10:41 2008 +0200
@@ -22,27 +22,16 @@
 (* synchronized execution *)
 
 local
-  val thread = ref (NONE: Thread.thread option);
   val lock = Mutex.mutex ();
   val cond = ConditionVar.conditionVar ();
 in
 
-fun self_synchronized () =
-  (case ! thread of
-    NONE => false
-  | SOME t => Thread.equal (t, Thread.self ()));
-
-fun SYNCHRONIZED e =
-  if self_synchronized () then e ()
-  else
-    uninterruptible (fn restore_attributes => fn () =>
-      let
-        val _ = Mutex.lock lock;
-        val _ = thread := SOME (Thread.self ());
-        val result = Exn.capture (restore_attributes e) ();
-        val _ = thread := NONE;
-        val _ = Mutex.unlock lock;
-      in Exn.release result end) ();
+fun SYNCHRONIZED e = uninterruptible (fn restore_attributes => fn () =>
+  let
+    val _ = Mutex.lock lock;
+    val result = Exn.capture (restore_attributes e) ();
+    val _ = Mutex.unlock lock;
+  in Exn.release result end) ();
 
 fun wait () = ConditionVar.wait (cond, lock);
 fun wait_timeout timeout = ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
@@ -78,12 +67,8 @@
 fun check_cache (queue as Queue (tasks, cache)) =
   if not (Queue.is_empty cache) then queue
   else
-    let
-      val cache' = fold (fn id =>
-        (case IntGraph.get_node tasks id of
-          Task task => Queue.enqueue (id, task)
-        | Running _ => I)) (IntGraph.minimals tasks) Queue.empty;
-    in Queue (tasks, cache') end;
+    let fun ready (id, (Task task, ([], _))) = Queue.enqueue (id, task) | ready _ = I
+    in Queue (tasks, IntGraph.fold ready tasks Queue.empty) end;
 
 val next_task = check_cache #> (fn queue as Queue (tasks, cache) =>
   if Queue.is_empty cache then (NONE, queue)
@@ -109,17 +94,6 @@
 
 (* global state *)
 
-local val active = ref 0 in
-
-fun change_active b = SYNCHRONIZED (fn () =>
-  let
-    val _ = change active (fn n => if b then n + 1 else n - 1);
-    val n = ! active;
-    val _ = Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ string_of_int n ^ " active tasks");
-  in () end);
-
-end;
-
 val tasks = ref empty_queue;
 val scheduler = ref (NONE: Thread.thread option);
 val workers = ref ([]: Thread.thread list);
@@ -133,33 +107,46 @@
 
 (* worker thread *)
 
+local val active = ref 0 in
+
+fun change_active b = (*requires SYNCHRONIZED*)
+  let
+    val _ = change active (fn n => if b then n + 1 else n - 1);
+    val n = ! active;
+    val _ = Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ string_of_int n ^ " active");
+  in () end;
+
+end;
+
 fun excessive_threads () = false;  (* FIXME *)
 
-fun worker_stop () =
-  (change_active false; change workers (filter (fn t => not (Thread.equal (t, Thread.self ())))));
+fun worker_stop () = SYNCHRONIZED (fn () =>
+  (change_active false; change workers (filter (fn t => not (Thread.equal (t, Thread.self ()))))));
 
-fun worker_wait () =
-  (change_active false; wait (); change_active true);
+fun worker_wait () = SYNCHRONIZED (fn () =>
+  (change_active false; wait (); change_active true));
 
 fun worker_loop () =
-  (case SYNCHRONIZED (fn () => change_result tasks next_task) of
-    SOME (id, task) =>
-      let
-        val _ = SYNCHRONIZED (fn () => change tasks (running_task id (Thread.self ())));
-        val _ = task ();
-        val _ = SYNCHRONIZED (fn () => change tasks (finished_task id));
-        val _ = notify_all ();
-      in if excessive_threads () then worker_stop () else worker_loop () end
-  | NONE => (worker_wait (); worker_loop ()));
+  if excessive_threads () then worker_stop ()
+  else
+    (case SYNCHRONIZED (fn () => change_result tasks next_task) of
+      NONE => (worker_wait (); worker_loop ())
+    | SOME (id, task) =>
+        let
+          val _ = SYNCHRONIZED (fn () => change tasks (running_task id (Thread.self ())));
+          val _ = task ();
+          val _ = SYNCHRONIZED (fn () => change tasks (finished_task id));
+          val _ = notify_all ();
+        in worker_loop () end);
 
-fun worker_start () =
+fun worker_start () = SYNCHRONIZED (fn () =>
  (change_active true;
-  change workers (cons (Thread.fork (worker_loop, Multithreading.no_interrupts))));
+  change workers (cons (Thread.fork (worker_loop, Multithreading.no_interrupts)))));
 
 
 (* scheduler *)
 
-fun scheduler_loop () =
+fun scheduler_loop () = (*requires SYNCHRONIZED*)
   let
     val m = Multithreading.max_threads_value ();
     val k = m - length (! workers);
@@ -174,7 +161,8 @@
       | SOME t => Thread.isActive t);
   in
     if scheduler_active then ()
-    else scheduler := SOME (Thread.fork (SYNCHRONIZED scheduler_loop, Multithreading.no_interrupts))
+    else scheduler :=
+      SOME (Thread.fork (SYNCHRONIZED o scheduler_loop, Multithreading.no_interrupts))
   end);
 
 
@@ -187,9 +175,11 @@
     val r = ref (NONE: 'a Exn.result option);
     val task = Multithreading.with_attributes (Thread.getAttributes ())
       (fn _ => fn () => r := SOME (Exn.capture e ()));
+
     val id = serial ();
     val _ = SYNCHRONIZED (fn () => change tasks (new_task deps id task));
     val _ = notify_all ();
+
   in Future (id, r) end;
 
 fun future e = dependent_future [] e;