author | wenzelm |
Sun, 18 Dec 2016 13:07:13 +0100 | |
changeset 64596 | 51f8e259de50 |
parent 62923 | 3a122e1e352a |
child 66166 | c88d1c36c9c3 |
permissions | -rw-r--r-- |
52050 | 1 |
(* Title: Pure/Concurrent/event_timer.ML |
2 |
Author: Makarius |
|
3 |
||
4 |
Initiate event after given point in time. |
|
5 |
||
6 |
Note: events are run as synchronized action within a dedicated thread |
|
7 |
and should finish quickly without further ado. |
|
8 |
*) |
|
9 |
||
10 |
signature EVENT_TIMER = |
|
11 |
sig |
|
12 |
eqtype request |
|
56768 | 13 |
val request: Time.time -> (unit -> unit) -> request |
52050 | 14 |
val cancel: request -> bool |
59339 | 15 |
val future: Time.time -> unit future |
52050 | 16 |
val shutdown: unit -> unit |
17 |
end; |
|
18 |
||
19 |
structure Event_Timer: EVENT_TIMER = |
|
20 |
struct |
|
21 |
||
22 |
(* type request *) |
|
23 |
||
52537 | 24 |
val request_counter = Counter.make (); |
52050 | 25 |
datatype request = Request of int; |
26 |
fun new_request () = Request (request_counter ()); |
|
27 |
||
28 |
||
29 |
(* type requests *) |
|
30 |
||
31 |
structure Requests = Table(type key = Time.time val ord = Time.compare); |
|
56768 | 32 |
type requests = (request * (unit -> unit)) list Requests.table; |
52050 | 33 |
|
34 |
fun add_request time entry (requests: requests) = |
|
35 |
Requests.cons_list (time, entry) requests; |
|
36 |
||
37 |
fun del_request req (requests: requests) = |
|
38 |
let |
|
39 |
val old_request = |
|
40 |
requests |> Requests.get_first (fn (key, entries) => |
|
41 |
entries |> get_first (fn entry => if fst entry = req then SOME (key, entry) else NONE)); |
|
42 |
in |
|
43 |
(case old_request of |
|
44 |
NONE => (false, requests) |
|
45 |
| SOME old => (true, Requests.remove_list (eq_fst op =) old requests)) |
|
46 |
end; |
|
47 |
||
48 |
fun next_request_time (requests: requests) = |
|
49 |
Option.map fst (Requests.min requests); |
|
50 |
||
51 |
fun next_request_event t0 (requests: requests) = |
|
52 |
(case Requests.min requests of |
|
53 |
NONE => NONE |
|
54 |
| SOME (time, entries) => |
|
62826 | 55 |
if t0 < time then NONE |
52050 | 56 |
else |
57 |
let |
|
58 |
val (rest, (_, event)) = split_last entries; |
|
59 |
val requests' = |
|
60 |
if null rest then Requests.delete time requests |
|
61 |
else Requests.update (time, rest) requests; |
|
62 |
in SOME (event, requests') end); |
|
63 |
||
64 |
||
65 |
(* global state *) |
|
66 |
||
59329
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
67 |
datatype status = Normal | Shutdown_Req | Shutdown_Ack; |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
68 |
|
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
69 |
datatype state = |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
70 |
State of {requests: requests, status: status, manager: Thread.thread option}; |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
71 |
|
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
72 |
fun make_state (requests, status, manager) = |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
73 |
State {requests = requests, status = status, manager = manager}; |
52050 | 74 |
|
59329
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
75 |
val normal_state = make_state (Requests.empty, Normal, NONE); |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
76 |
val shutdown_ack_state = make_state (Requests.empty, Shutdown_Ack, NONE); |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
77 |
|
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
78 |
fun is_shutdown s (State {requests, status, manager}) = |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
79 |
Requests.is_empty requests andalso status = s andalso is_none manager; |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
80 |
|
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
81 |
fun is_shutdown_req (State {requests, status, ...}) = |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
82 |
Requests.is_empty requests andalso status = Shutdown_Req; |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
83 |
|
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
84 |
val state = Synchronized.var "Event_Timer.state" normal_state; |
52050 | 85 |
|
86 |
||
87 |
(* manager thread *) |
|
88 |
||
89 |
fun manager_loop () = |
|
59329
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
90 |
if Synchronized.timed_access state |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
91 |
(fn State {requests, ...} => next_request_time requests) |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
92 |
(fn st as State {requests, status, manager} => |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
93 |
(case next_request_event (Time.now ()) requests of |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
94 |
SOME (event, requests') => |
59337 | 95 |
let |
96 |
val _ = Exn.capture event (); |
|
97 |
val state' = make_state (requests', status, manager); |
|
98 |
in SOME (true, state') end |
|
59329
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
99 |
| NONE => |
59337 | 100 |
if is_shutdown_req st |
101 |
then SOME (false, shutdown_ack_state) |
|
59339 | 102 |
else NONE)) <> SOME false |
103 |
then manager_loop () else (); |
|
52050 | 104 |
|
105 |
fun manager_check manager = |
|
106 |
if is_some manager andalso Thread.isActive (the manager) then manager |
|
60764 | 107 |
else |
61556 | 108 |
SOME (Standard_Thread.fork {name = "event_timer", stack_limit = NONE, interrupts = false} |
60764 | 109 |
manager_loop); |
52050 | 110 |
|
111 |
fun shutdown () = |
|
62923 | 112 |
Thread_Attributes.uninterruptible (fn restore_attributes => fn () => |
62239 | 113 |
if Synchronized.change_result state (fn st as State {requests, manager, ...} => |
59337 | 114 |
if is_shutdown Normal st then (false, st) |
115 |
else if is_shutdown Shutdown_Ack st orelse is_shutdown_req st then |
|
59329
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
116 |
raise Fail "Concurrent attempt to shutdown event timer" |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
117 |
else (true, make_state (requests, Shutdown_Req, manager_check manager))) |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
118 |
then |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
119 |
restore_attributes (fn () => |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
120 |
Synchronized.guarded_access state |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
121 |
(fn st => if is_shutdown Shutdown_Ack st then SOME ((), normal_state) else NONE)) () |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
122 |
handle exn => |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
123 |
if Exn.is_interrupt exn then |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
124 |
Synchronized.change state (fn State {requests, manager, ...} => |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
125 |
make_state (requests, Normal, manager)) |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
126 |
else () |
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
wenzelm
parents:
56768
diff
changeset
|
127 |
else ()) (); |
52050 | 128 |
|
52798
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
129 |
|
59339 | 130 |
(* main operations *) |
131 |
||
132 |
fun request time event = |
|
133 |
Synchronized.change_result state (fn State {requests, status, manager} => |
|
134 |
let |
|
135 |
val req = new_request (); |
|
136 |
val requests' = add_request time (req, event) requests; |
|
137 |
val manager' = manager_check manager; |
|
138 |
in (req, make_state (requests', status, manager')) end); |
|
139 |
||
140 |
fun cancel req = |
|
141 |
Synchronized.change_result state (fn State {requests, status, manager} => |
|
142 |
let |
|
143 |
val (canceled, requests') = del_request req requests; |
|
144 |
val manager' = manager_check manager; |
|
145 |
in (canceled, make_state (requests', status, manager')) end); |
|
52798
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
146 |
|
62923 | 147 |
val future = Thread_Attributes.uninterruptible (fn _ => fn time => |
52798
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
148 |
let |
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
149 |
val req: request Single_Assignment.var = Single_Assignment.var "request"; |
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
150 |
fun abort () = ignore (cancel (Single_Assignment.await req)); |
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
151 |
val promise: unit future = Future.promise abort; |
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
152 |
val _ = Single_Assignment.assign req (request time (Future.fulfill promise)); |
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
153 |
in promise end); |
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
154 |
|
52050 | 155 |
end; |
156 |