62 in SOME (event, requests') end); |
62 in SOME (event, requests') end); |
63 |
63 |
64 |
64 |
65 (* global state *) |
65 (* global state *) |
66 |
66 |
67 type state = requests * Thread.thread option; |
67 datatype status = Normal | Shutdown_Req | Shutdown_Ack; |
68 val init_state: state = (Requests.empty, NONE); |
|
69 |
68 |
70 val state = Synchronized.var "Event_Timer.state" init_state; |
69 datatype state = |
|
70 State of {requests: requests, status: status, manager: Thread.thread option}; |
|
71 |
|
72 fun make_state (requests, status, manager) = |
|
73 State {requests = requests, status = status, manager = manager}; |
|
74 |
|
75 val normal_state = make_state (Requests.empty, Normal, NONE); |
|
76 val shutdown_ack_state = make_state (Requests.empty, Shutdown_Ack, NONE); |
|
77 |
|
78 fun is_shutdown s (State {requests, status, manager}) = |
|
79 Requests.is_empty requests andalso status = s andalso is_none manager; |
|
80 |
|
81 fun is_shutdown_req (State {requests, status, ...}) = |
|
82 Requests.is_empty requests andalso status = Shutdown_Req; |
|
83 |
|
84 val state = Synchronized.var "Event_Timer.state" normal_state; |
71 |
85 |
72 |
86 |
73 (* manager thread *) |
87 (* manager thread *) |
74 |
88 |
75 val manager_timeout = seconds 0.3; |
|
76 |
|
77 fun manager_loop () = |
89 fun manager_loop () = |
78 let |
90 if Synchronized.timed_access state |
79 val success = |
91 (fn State {requests, ...} => next_request_time requests) |
80 Synchronized.timed_access state |
92 (fn st as State {requests, status, manager} => |
81 (fn (requests, _) => |
93 (case next_request_event (Time.now ()) requests of |
82 (case next_request_time requests of |
94 SOME (event, requests') => |
83 NONE => SOME (Time.+ (Time.now (), manager_timeout)) |
95 (Exn.capture event (); SOME (true, make_state (requests', status, manager))) |
84 | some => some)) |
96 | NONE => |
85 (fn (requests, manager) => |
97 if is_shutdown_req st then SOME (false, shutdown_ack_state) else NONE)) = SOME false |
86 (case next_request_event (Time.now ()) requests of |
98 then () else manager_loop (); |
87 NONE => NONE |
|
88 | SOME (event, requests') => (Exn.capture event (); SOME ((), (requests', manager))))); |
|
89 val finished = |
|
90 is_none success andalso |
|
91 Synchronized.change_result state (fn (requests, manager) => |
|
92 if Requests.is_empty requests then (true, init_state) |
|
93 else (false, (requests, manager))); |
|
94 in if finished then () else manager_loop () end; |
|
95 |
99 |
96 fun manager_check manager = |
100 fun manager_check manager = |
97 if is_some manager andalso Thread.isActive (the manager) then manager |
101 if is_some manager andalso Thread.isActive (the manager) then manager |
98 else SOME (Simple_Thread.fork false manager_loop); |
102 else SOME (Simple_Thread.fork false manager_loop); |
99 |
103 |
100 |
104 |
101 (* main operations *) |
105 (* main operations *) |
102 |
106 |
103 fun request time event = |
107 fun request time event = |
104 Synchronized.change_result state (fn (requests, manager) => |
108 Synchronized.change_result state (fn State {requests, status, manager} => |
105 let |
109 let |
106 val req = new_request (); |
110 val req = new_request (); |
107 val requests' = add_request time (req, event) requests; |
111 val requests' = add_request time (req, event) requests; |
108 in (req, (requests', manager_check manager)) end); |
112 in (req, make_state (requests', status, manager_check manager)) end); |
109 |
113 |
110 fun cancel req = |
114 fun cancel req = |
111 Synchronized.change_result state (fn (requests, manager) => |
115 Synchronized.change_result state (fn State {requests, status, manager} => |
112 let |
116 let |
113 val (canceled, requests') = del_request req requests; |
117 val (canceled, requests') = del_request req requests; |
114 in (canceled, (requests', manager)) end); |
118 in (canceled, make_state (requests', status, manager_check manager)) end); |
115 |
119 |
116 fun shutdown () = |
120 fun shutdown () = |
117 Synchronized.guarded_access state (fn (requests, manager) => |
121 uninterruptible (fn restore_attributes => fn () => |
118 if not (Requests.is_empty requests) |
122 if Synchronized.change_result state (fn st as State {requests, status, manager} => |
119 then raise Fail "Cannot shutdown event timer: pending requests" |
123 if is_shutdown Shutdown_Ack st orelse is_shutdown_req st then |
120 else if is_none manager then SOME ((), init_state) |
124 raise Fail "Concurrent attempt to shutdown event timer" |
121 else NONE); |
125 else if is_shutdown Normal st then (false, st) |
|
126 else (true, make_state (requests, Shutdown_Req, manager_check manager))) |
|
127 then |
|
128 restore_attributes (fn () => |
|
129 Synchronized.guarded_access state |
|
130 (fn st => if is_shutdown Shutdown_Ack st then SOME ((), normal_state) else NONE)) () |
|
131 handle exn => |
|
132 if Exn.is_interrupt exn then |
|
133 Synchronized.change state (fn State {requests, manager, ...} => |
|
134 make_state (requests, Normal, manager)) |
|
135 else () |
|
136 else ()) (); |
122 |
137 |
123 |
138 |
124 (* future *) |
139 (* future *) |
125 |
140 |
126 val future = uninterruptible (fn _ => fn time => |
141 val future = uninterruptible (fn _ => fn time => |