# HG changeset patch # User wenzelm # Date 1248782673 -7200 # Node ID d4f7934e95551d5e29cf6628cda69c825a898800 # Parent 0c1cb95a434d8f1ca0dec5370d5cf3bf2f37b2f8 misc tuning; diff -r 0c1cb95a434d -r d4f7934e9555 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Tue Jul 28 08:49:03 2009 +0200 +++ b/src/Pure/Concurrent/future.ML Tue Jul 28 14:04:33 2009 +0200 @@ -1,7 +1,8 @@ (* Title: Pure/Concurrent/future.ML Author: Makarius -Future values. +Future values, see also +http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf Notes: @@ -144,18 +145,6 @@ end; -(* worker activity *) - -fun count_active ws = - fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0; - -fun change_active active = (*requires SYNCHRONIZED*) - change workers (AList.update Thread.equal (Thread.self (), active)); - -fun overloaded () = - count_active (! workers) > Multithreading.max_threads_value (); - - (* execute future jobs *) fun future_job group (e: unit -> 'a) = @@ -186,7 +175,7 @@ val valid = not (Task_Queue.is_canceled group); val ok = setmp_thread_data (name, task, group) (fn () => fold (fn job => fn ok => job valid andalso ok) jobs true) (); - val _ = SYNCHRONIZED "execute" (fn () => + val _ = SYNCHRONIZED "finish" (fn () => let val maximal = change_result queue (Task_Queue.finish task); val _ = @@ -199,6 +188,15 @@ in () end; +(* worker activity *) + +fun count_active () = (*requires SYNCHRONIZED*) + fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0; + +fun change_active active = (*requires SYNCHRONIZED*) + change workers (AList.update Thread.equal (Thread.self (), active)); + + (* worker threads *) fun worker_wait cond = (*requires SYNCHRONIZED*) @@ -212,7 +210,8 @@ change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); broadcast scheduler_event; NONE) - else if overloaded () then (worker_wait scheduler_event; worker_next ()) + else if count_active () > Multithreading.max_threads_value () then + (worker_wait scheduler_event; worker_next ()) else (case change_result queue Task_Queue.dequeue of NONE => (worker_wait work_available; worker_next ()) @@ -243,7 +242,7 @@ let val {ready, pending, running} = Task_Queue.status (! queue); val total = length (! workers); - val active = count_active (! workers); + val active = count_active (); in "SCHEDULE: " ^ string_of_int ready ^ " ready, " ^ @@ -319,7 +318,7 @@ SOME group => group | NONE => Task_Queue.new_group (worker_group ())); val (result, job) = future_job group e; - val task = SYNCHRONIZED "future" (fn () => + val task = SYNCHRONIZED "enqueue" (fn () => let val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job); val _ = if minimal then signal work_available else (); @@ -366,14 +365,13 @@ fun join_results xs = if forall is_finished xs then map get_result xs + else if Multithreading.self_critical () then + error "Cannot join future values within critical section" else uninterruptible (fn _ => fn () => - let - val _ = Multithreading.self_critical () andalso - error "Cannot join future values within critical section"; - val _ = - if is_worker () then join_work (map task_of xs) - else List.app join_wait xs; - in map get_result xs end) (); + (if is_worker () + then join_work (map task_of xs) + else List.app join_wait xs; + map get_result xs)) (); end; @@ -389,7 +387,7 @@ val group = Task_Queue.new_group (SOME (group_of x)); val (result, job) = future_job group (fn () => f (join x)); - val extended = SYNCHRONIZED "map_future" (fn () => + val extended = SYNCHRONIZED "extend" (fn () => (case Task_Queue.extend task job (! queue) of SOME queue' => (queue := queue'; true) | NONE => false));