# HG changeset patch # User wenzelm # Date 1420904121 -3600 # Node ID fd9102b419f525a3bfc64ef93f303ddd6922f59b # Parent a95b6f608a738aae2567b5b2fa7faa75a4ac84a5# Parent a74eb8e0907aa656a872e6658e31a9e76c64a7d1 merged diff -r a95b6f608a73 -r fd9102b419f5 src/Pure/Concurrent/event_timer.ML --- a/src/Pure/Concurrent/event_timer.ML Sat Jan 10 13:31:37 2015 +0100 +++ b/src/Pure/Concurrent/event_timer.ML Sat Jan 10 16:35:21 2015 +0100 @@ -12,8 +12,8 @@ eqtype request val request: Time.time -> (unit -> unit) -> request val cancel: request -> bool + val future: Time.time -> unit future val shutdown: unit -> unit - val future: Time.time -> unit future end; structure Event_Timer: EVENT_TIMER = @@ -92,37 +92,26 @@ (fn st as State {requests, status, manager} => (case next_request_event (Time.now ()) requests of SOME (event, requests') => - (Exn.capture event (); SOME (true, make_state (requests', status, manager))) + let + val _ = Exn.capture event (); + val state' = make_state (requests', status, manager); + in SOME (true, state') end | NONE => - if is_shutdown_req st then SOME (false, shutdown_ack_state) else NONE)) = SOME false - then () else manager_loop (); + if is_shutdown_req st + then SOME (false, shutdown_ack_state) + else NONE)) <> SOME false + then manager_loop () else (); fun manager_check manager = if is_some manager andalso Thread.isActive (the manager) then manager else SOME (Simple_Thread.fork false manager_loop); - -(* main operations *) - -fun request time event = - Synchronized.change_result state (fn State {requests, status, manager} => - let - val req = new_request (); - val requests' = add_request time (req, event) requests; - in (req, make_state (requests', status, manager_check manager)) end); - -fun cancel req = - Synchronized.change_result state (fn State {requests, status, manager} => - let - val (canceled, requests') = del_request req requests; - in (canceled, make_state (requests', status, manager_check manager)) end); - fun shutdown () = uninterruptible (fn restore_attributes => fn () => if Synchronized.change_result state (fn st as State {requests, status, manager} => - if is_shutdown Shutdown_Ack st orelse is_shutdown_req st then + if is_shutdown Normal st then (false, st) + else if is_shutdown Shutdown_Ack st orelse is_shutdown_req st then raise Fail "Concurrent attempt to shutdown event timer" - else if is_shutdown Normal st then (false, st) else (true, make_state (requests, Shutdown_Req, manager_check manager))) then restore_attributes (fn () => @@ -136,7 +125,22 @@ else ()) (); -(* future *) +(* main operations *) + +fun request time event = + Synchronized.change_result state (fn State {requests, status, manager} => + let + val req = new_request (); + val requests' = add_request time (req, event) requests; + val manager' = manager_check manager; + in (req, make_state (requests', status, manager')) end); + +fun cancel req = + Synchronized.change_result state (fn State {requests, status, manager} => + let + val (canceled, requests') = del_request req requests; + val manager' = manager_check manager; + in (canceled, make_state (requests', status, manager')) end); val future = uninterruptible (fn _ => fn time => let diff -r a95b6f608a73 -r fd9102b419f5 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Sat Jan 10 13:31:37 2015 +0100 +++ b/src/Pure/Concurrent/future.ML Sat Jan 10 16:35:21 2015 +0100 @@ -140,7 +140,6 @@ val do_shutdown = Unsynchronized.ref false; val max_workers = Unsynchronized.ref 0; val max_active = Unsynchronized.ref 0; -val worker_trend = Unsynchronized.ref 0; val status_ticks = Unsynchronized.ref 0; val last_round = Unsynchronized.ref Time.zeroTime; @@ -265,7 +264,7 @@ fun worker_start name = (*requires SYNCHRONIZED*) Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name), Unsynchronized.ref Working)) - handle Fail msg => Multithreading.tracing 0 (fn () => msg); + handle Fail msg => Multithreading.tracing 0 (fn () => "SCHEDULER: " ^ msg); @@ -287,7 +286,7 @@ then report_status () else (); val _ = - if forall (Thread.isActive o #1) (! workers) then () + if not tick orelse forall (Thread.isActive o #1) (! workers) then () else let val (alive, dead) = List.partition (Thread.isActive o #1) (! workers); @@ -303,24 +302,11 @@ val max_active0 = ! max_active; val max_workers0 = ! max_workers; - val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); + val m = + if ! do_shutdown andalso Task_Queue.all_passive (! queue) then 0 + else Multithreading.max_threads_value (); val _ = max_active := m; - - val mm = - if ! do_shutdown then 0 - else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m); - val _ = - if tick andalso mm > ! max_workers then - Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1) - else if tick andalso mm < ! max_workers then - Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1) - else (); - val _ = - if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then - max_workers := mm - else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then - max_workers := Int.min (mm, 2 * m) - else (); + val _ = max_workers := 2 * m; val missing = ! max_workers - length (! workers); val _ = @@ -352,7 +338,6 @@ (* shutdown *) - val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else (); val continue = not (! do_shutdown andalso null (! workers)); val _ = if continue then () else (report_status (); scheduler := NONE); @@ -673,7 +658,8 @@ else SYNCHRONIZED "shutdown" (fn () => while scheduler_active () do - (Multithreading.tracing 1 (fn () => "SHUTDOWN: wait"); + (do_shutdown := true; + Multithreading.tracing 1 (fn () => "SHUTDOWN: wait"); wait scheduler_event)); diff -r a95b6f608a73 -r fd9102b419f5 src/Pure/System/system_channel.scala --- a/src/Pure/System/system_channel.scala Sat Jan 10 13:31:37 2015 +0100 +++ b/src/Pure/System/system_channel.scala Sat Jan 10 16:35:21 2015 +0100 @@ -15,8 +15,8 @@ object System_Channel { - def apply(): System_Channel = - if (Platform.is_windows) new Socket_Channel else new Fifo_Channel + def apply(): System_Channel = new Socket_Channel + // FIXME if (Platform.is_windows) new Socket_Channel else new Fifo_Channel } abstract class System_Channel