--- a/src/Pure/Concurrent/event_timer.ML Sat Jan 10 13:31:37 2015 +0100
+++ b/src/Pure/Concurrent/event_timer.ML Sat Jan 10 16:35:21 2015 +0100
@@ -12,8 +12,8 @@
eqtype request
val request: Time.time -> (unit -> unit) -> request
val cancel: request -> bool
+ val future: Time.time -> unit future
val shutdown: unit -> unit
- val future: Time.time -> unit future
end;
structure Event_Timer: EVENT_TIMER =
@@ -92,37 +92,26 @@
(fn st as State {requests, status, manager} =>
(case next_request_event (Time.now ()) requests of
SOME (event, requests') =>
- (Exn.capture event (); SOME (true, make_state (requests', status, manager)))
+ let
+ val _ = Exn.capture event ();
+ val state' = make_state (requests', status, manager);
+ in SOME (true, state') end
| NONE =>
- if is_shutdown_req st then SOME (false, shutdown_ack_state) else NONE)) = SOME false
- then () else manager_loop ();
+ if is_shutdown_req st
+ then SOME (false, shutdown_ack_state)
+ else NONE)) <> SOME false
+ then manager_loop () else ();
fun manager_check manager =
if is_some manager andalso Thread.isActive (the manager) then manager
else SOME (Simple_Thread.fork false manager_loop);
-
-(* main operations *)
-
-fun request time event =
- Synchronized.change_result state (fn State {requests, status, manager} =>
- let
- val req = new_request ();
- val requests' = add_request time (req, event) requests;
- in (req, make_state (requests', status, manager_check manager)) end);
-
-fun cancel req =
- Synchronized.change_result state (fn State {requests, status, manager} =>
- let
- val (canceled, requests') = del_request req requests;
- in (canceled, make_state (requests', status, manager_check manager)) end);
-
fun shutdown () =
uninterruptible (fn restore_attributes => fn () =>
if Synchronized.change_result state (fn st as State {requests, status, manager} =>
- if is_shutdown Shutdown_Ack st orelse is_shutdown_req st then
+ if is_shutdown Normal st then (false, st)
+ else if is_shutdown Shutdown_Ack st orelse is_shutdown_req st then
raise Fail "Concurrent attempt to shutdown event timer"
- else if is_shutdown Normal st then (false, st)
else (true, make_state (requests, Shutdown_Req, manager_check manager)))
then
restore_attributes (fn () =>
@@ -136,7 +125,22 @@
else ()) ();
-(* future *)
+(* main operations *)
+
+fun request time event =
+ Synchronized.change_result state (fn State {requests, status, manager} =>
+ let
+ val req = new_request ();
+ val requests' = add_request time (req, event) requests;
+ val manager' = manager_check manager;
+ in (req, make_state (requests', status, manager')) end);
+
+fun cancel req =
+ Synchronized.change_result state (fn State {requests, status, manager} =>
+ let
+ val (canceled, requests') = del_request req requests;
+ val manager' = manager_check manager;
+ in (canceled, make_state (requests', status, manager')) end);
val future = uninterruptible (fn _ => fn time =>
let
--- a/src/Pure/Concurrent/future.ML Sat Jan 10 13:31:37 2015 +0100
+++ b/src/Pure/Concurrent/future.ML Sat Jan 10 16:35:21 2015 +0100
@@ -140,7 +140,6 @@
val do_shutdown = Unsynchronized.ref false;
val max_workers = Unsynchronized.ref 0;
val max_active = Unsynchronized.ref 0;
-val worker_trend = Unsynchronized.ref 0;
val status_ticks = Unsynchronized.ref 0;
val last_round = Unsynchronized.ref Time.zeroTime;
@@ -265,7 +264,7 @@
fun worker_start name = (*requires SYNCHRONIZED*)
Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
Unsynchronized.ref Working))
- handle Fail msg => Multithreading.tracing 0 (fn () => msg);
+ handle Fail msg => Multithreading.tracing 0 (fn () => "SCHEDULER: " ^ msg);
@@ -287,7 +286,7 @@
then report_status () else ();
val _ =
- if forall (Thread.isActive o #1) (! workers) then ()
+ if not tick orelse forall (Thread.isActive o #1) (! workers) then ()
else
let
val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
@@ -303,24 +302,11 @@
val max_active0 = ! max_active;
val max_workers0 = ! max_workers;
- val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
+ val m =
+ if ! do_shutdown andalso Task_Queue.all_passive (! queue) then 0
+ else Multithreading.max_threads_value ();
val _ = max_active := m;
-
- val mm =
- if ! do_shutdown then 0
- else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
- val _ =
- if tick andalso mm > ! max_workers then
- Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
- else if tick andalso mm < ! max_workers then
- Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
- else ();
- val _ =
- if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
- max_workers := mm
- else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
- max_workers := Int.min (mm, 2 * m)
- else ();
+ val _ = max_workers := 2 * m;
val missing = ! max_workers - length (! workers);
val _ =
@@ -352,7 +338,6 @@
(* shutdown *)
- val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
val continue = not (! do_shutdown andalso null (! workers));
val _ = if continue then () else (report_status (); scheduler := NONE);
@@ -673,7 +658,8 @@
else
SYNCHRONIZED "shutdown" (fn () =>
while scheduler_active () do
- (Multithreading.tracing 1 (fn () => "SHUTDOWN: wait");
+ (do_shutdown := true;
+ Multithreading.tracing 1 (fn () => "SHUTDOWN: wait");
wait scheduler_event));
--- a/src/Pure/System/system_channel.scala Sat Jan 10 13:31:37 2015 +0100
+++ b/src/Pure/System/system_channel.scala Sat Jan 10 16:35:21 2015 +0100
@@ -15,8 +15,8 @@
object System_Channel
{
- def apply(): System_Channel =
- if (Platform.is_windows) new Socket_Channel else new Fifo_Channel
+ def apply(): System_Channel = new Socket_Channel
+ // FIXME if (Platform.is_windows) new Socket_Channel else new Fifo_Channel
}
abstract class System_Channel