author | wenzelm |
Wed, 20 Feb 2019 21:20:30 +0100 | |
changeset 69825 | 183c345b063e |
parent 69824 | 29c13d8813cb |
child 69826 | 1bea05713dde |
permissions | -rw-r--r-- |
52050 | 1 |
(* Title: Pure/Concurrent/event_timer.ML |
2 |
Author: Makarius |
|
3 |
||
4 |
Initiate event after given point in time. |
|
5 |
||
6 |
Note: events are run as synchronized action within a dedicated thread |
|
7 |
and should finish quickly without further ado. |
|
8 |
*) |
|
9 |
||
10 |
signature EVENT_TIMER = |
|
11 |
sig |
|
69824 | 12 |
type request |
13 |
val eq_request: request * request -> bool |
|
56768 | 14 |
val request: Time.time -> (unit -> unit) -> request |
52050 | 15 |
val cancel: request -> bool |
59339 | 16 |
val future: Time.time -> unit future |
52050 | 17 |
val shutdown: unit -> unit |
18 |
end; |
|
19 |
||
20 |
structure Event_Timer: EVENT_TIMER = |
|
21 |
struct |
|
22 |
||
23 |
(* type request *) |
|
24 |
||
69825 | 25 |
datatype request = |
26 |
Request of {id: int, time: Time.time, event: unit -> unit}; |
|
27 |
||
28 |
val new_id = Counter.make (); |
|
69824 | 29 |
|
69825 | 30 |
fun new_request time event = |
31 |
Request { |
|
32 |
id = new_id (), |
|
33 |
time = time, |
|
34 |
event = event}; |
|
69824 | 35 |
|
36 |
fun eq_request (Request {id = id1, ...}, Request {id = id2, ...}) = id1 = id2; |
|
52050 | 37 |
|
69825 | 38 |
fun request_time (Request {time, ...}) = time; |
39 |
fun request_event (Request {event, ...}) = event; |
|
40 |
||
52050 | 41 |
|
42 |
(* type requests *) |
|
43 |
||
44 |
structure Requests = Table(type key = Time.time val ord = Time.compare); |
|
69824 | 45 |
type requests = request list Requests.table; |
52050 | 46 |
|
69824 | 47 |
fun add req_time req (requests: requests) = |
48 |
Requests.cons_list (req_time, req) requests; |
|
52050 | 49 |
|
69824 | 50 |
fun del req (requests: requests) = |
52050 | 51 |
let |
69824 | 52 |
val old = |
53 |
requests |> Requests.get_first (fn (key, reqs) => |
|
54 |
reqs |> get_first (fn req' => |
|
55 |
if eq_request (req', req) then SOME (key, req') else NONE)); |
|
52050 | 56 |
in |
69824 | 57 |
(case old of |
52050 | 58 |
NONE => (false, requests) |
69824 | 59 |
| SOME req' => (true, Requests.remove_list eq_request req' requests)) |
52050 | 60 |
end; |
61 |
||
69824 | 62 |
fun next_time (requests: requests) = |
63 |
Option.map #1 (Requests.min requests); |
|
52050 | 64 |
|
69825 | 65 |
fun next_event now (requests: requests) = |
52050 | 66 |
(case Requests.min requests of |
67 |
NONE => NONE |
|
69824 | 68 |
| SOME (req_time, reqs) => |
69825 | 69 |
if now < req_time then NONE |
52050 | 70 |
else |
71 |
let |
|
69825 | 72 |
val (reqs', req) = split_last reqs; |
52050 | 73 |
val requests' = |
69824 | 74 |
if null reqs' |
75 |
then Requests.delete req_time requests |
|
76 |
else Requests.update (req_time, reqs') requests; |
|
69825 | 77 |
in SOME (request_event req, requests') end); |
52050 | 78 |
|
79 |
||
80 |
(* global state *) |
|
81 |
||
59329
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
82 |
datatype status = Normal | Shutdown_Req | Shutdown_Ack; |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
83 |
|
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
84 |
datatype state = |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
85 |
State of {requests: requests, status: status, manager: Thread.thread option}; |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
86 |
|
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
87 |
fun make_state (requests, status, manager) = |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
88 |
State {requests = requests, status = status, manager = manager}; |
52050 | 89 |
|
59329
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
90 |
val normal_state = make_state (Requests.empty, Normal, NONE); |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
91 |
val shutdown_ack_state = make_state (Requests.empty, Shutdown_Ack, NONE); |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
92 |
|
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
93 |
fun is_shutdown s (State {requests, status, manager}) = |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
94 |
Requests.is_empty requests andalso status = s andalso is_none manager; |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
95 |
|
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
96 |
fun is_shutdown_req (State {requests, status, ...}) = |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
97 |
Requests.is_empty requests andalso status = Shutdown_Req; |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
98 |
|
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
99 |
val state = Synchronized.var "Event_Timer.state" normal_state; |
52050 | 100 |
|
101 |
||
102 |
(* manager thread *) |
|
103 |
||
104 |
fun manager_loop () = |
|
59329
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
105 |
if Synchronized.timed_access state |
69824 | 106 |
(fn State {requests, ...} => next_time requests) |
59329
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
107 |
(fn st as State {requests, status, manager} => |
69824 | 108 |
(case next_event (Time.now ()) requests of |
59329
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
109 |
SOME (event, requests') => |
59337 | 110 |
let |
111 |
val _ = Exn.capture event (); |
|
112 |
val state' = make_state (requests', status, manager); |
|
113 |
in SOME (true, state') end |
|
59329
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
114 |
| NONE => |
59337 | 115 |
if is_shutdown_req st |
116 |
then SOME (false, shutdown_ack_state) |
|
59339 | 117 |
else NONE)) <> SOME false |
118 |
then manager_loop () else (); |
|
52050 | 119 |
|
120 |
fun manager_check manager = |
|
121 |
if is_some manager andalso Thread.isActive (the manager) then manager |
|
60764 | 122 |
else |
61556 | 123 |
SOME (Standard_Thread.fork {name = "event_timer", stack_limit = NONE, interrupts = false} |
60764 | 124 |
manager_loop); |
52050 | 125 |
|
126 |
fun shutdown () = |
|
62923 | 127 |
Thread_Attributes.uninterruptible (fn restore_attributes => fn () => |
62239 | 128 |
if Synchronized.change_result state (fn st as State {requests, manager, ...} => |
59337 | 129 |
if is_shutdown Normal st then (false, st) |
130 |
else if is_shutdown Shutdown_Ack st orelse is_shutdown_req st then |
|
59329
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
131 |
raise Fail "Concurrent attempt to shutdown event timer" |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
132 |
else (true, make_state (requests, Shutdown_Req, manager_check manager))) |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
133 |
then |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
134 |
restore_attributes (fn () => |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
135 |
Synchronized.guarded_access state |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
136 |
(fn st => if is_shutdown Shutdown_Ack st then SOME ((), normal_state) else NONE)) () |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
137 |
handle exn => |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
138 |
if Exn.is_interrupt exn then |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
139 |
Synchronized.change state (fn State {requests, manager, ...} => |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
140 |
make_state (requests, Normal, manager)) |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
141 |
else () |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
142 |
else ()) (); |
52050 | 143 |
|
52798
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
144 |
|
59339 | 145 |
(* main operations *) |
146 |
||
147 |
fun request time event = |
|
148 |
Synchronized.change_result state (fn State {requests, status, manager} => |
|
149 |
let |
|
69825 | 150 |
val req = new_request time event; |
69824 | 151 |
val requests' = add time req requests; |
59339 | 152 |
val manager' = manager_check manager; |
153 |
in (req, make_state (requests', status, manager')) end); |
|
154 |
||
155 |
fun cancel req = |
|
156 |
Synchronized.change_result state (fn State {requests, status, manager} => |
|
157 |
let |
|
69824 | 158 |
val (canceled, requests') = del req requests; |
59339 | 159 |
val manager' = manager_check manager; |
160 |
in (canceled, make_state (requests', status, manager')) end); |
|
52798
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
161 |
|
62923 | 162 |
val future = Thread_Attributes.uninterruptible (fn _ => fn time => |
52798
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
163 |
let |
69824 | 164 |
val req: request Single_Assignment.var = Single_Assignment.var "req"; |
52798
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
165 |
fun abort () = ignore (cancel (Single_Assignment.await req)); |
66166 | 166 |
val promise: unit future = Future.promise_name "event_timer" abort; |
52798
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
167 |
val _ = Single_Assignment.assign req (request time (Future.fulfill promise)); |
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
168 |
in promise end); |
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
169 |
|
52050 | 170 |
end; |