author | hoelzl |
Wed, 02 Apr 2014 18:35:01 +0200 | |
changeset 56369 | 2704ca85be98 |
parent 52798 | 9d3c9862d1dd |
child 56768 | 06388a5cfb7c |
permissions | -rw-r--r-- |
52050 | 1 |
(* Title: Pure/Concurrent/event_timer.ML |
2 |
Author: Makarius |
|
3 |
||
4 |
Initiate event after given point in time. |
|
5 |
||
6 |
Note: events are run as synchronized action within a dedicated thread |
|
7 |
and should finish quickly without further ado. |
|
8 |
*) |
|
9 |
||
10 |
signature EVENT_TIMER = |
|
11 |
sig |
|
12 |
type event = unit -> unit |
|
13 |
eqtype request |
|
14 |
val request: Time.time -> event -> request |
|
15 |
val cancel: request -> bool |
|
16 |
val shutdown: unit -> unit |
|
52798
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
17 |
val future: Time.time -> unit future |
52050 | 18 |
end; |
19 |
||
20 |
structure Event_Timer: EVENT_TIMER = |
|
21 |
struct |
|
22 |
||
23 |
(* type event *) |
|
24 |
||
25 |
type event = unit -> unit; |
|
26 |
||
27 |
||
28 |
(* type request *) |
|
29 |
||
52537 | 30 |
val request_counter = Counter.make (); |
52050 | 31 |
datatype request = Request of int; |
32 |
fun new_request () = Request (request_counter ()); |
|
33 |
||
34 |
||
35 |
(* type requests *) |
|
36 |
||
37 |
structure Requests = Table(type key = Time.time val ord = Time.compare); |
|
38 |
type requests = (request * event) list Requests.table; |
|
39 |
||
40 |
fun add_request time entry (requests: requests) = |
|
41 |
Requests.cons_list (time, entry) requests; |
|
42 |
||
43 |
fun del_request req (requests: requests) = |
|
44 |
let |
|
45 |
val old_request = |
|
46 |
requests |> Requests.get_first (fn (key, entries) => |
|
47 |
entries |> get_first (fn entry => if fst entry = req then SOME (key, entry) else NONE)); |
|
48 |
in |
|
49 |
(case old_request of |
|
50 |
NONE => (false, requests) |
|
51 |
| SOME old => (true, Requests.remove_list (eq_fst op =) old requests)) |
|
52 |
end; |
|
53 |
||
54 |
fun next_request_time (requests: requests) = |
|
55 |
Option.map fst (Requests.min requests); |
|
56 |
||
57 |
fun next_request_event t0 (requests: requests) = |
|
58 |
(case Requests.min requests of |
|
59 |
NONE => NONE |
|
60 |
| SOME (time, entries) => |
|
61 |
if Time.< (t0, time) then NONE |
|
62 |
else |
|
63 |
let |
|
64 |
val (rest, (_, event)) = split_last entries; |
|
65 |
val requests' = |
|
66 |
if null rest then Requests.delete time requests |
|
67 |
else Requests.update (time, rest) requests; |
|
68 |
in SOME (event, requests') end); |
|
69 |
||
70 |
||
71 |
(* global state *) |
|
72 |
||
73 |
type state = requests * Thread.thread option; |
|
74 |
val init_state: state = (Requests.empty, NONE); |
|
75 |
||
76 |
val state = Synchronized.var "Event_Timer.state" init_state; |
|
77 |
||
78 |
||
79 |
(* manager thread *) |
|
80 |
||
81 |
val manager_timeout = seconds 0.3; |
|
82 |
||
83 |
fun manager_loop () = |
|
84 |
let |
|
85 |
val success = |
|
86 |
Synchronized.timed_access state |
|
87 |
(fn (requests, _) => |
|
88 |
(case next_request_time requests of |
|
89 |
NONE => SOME (Time.+ (Time.now (), manager_timeout)) |
|
90 |
| some => some)) |
|
91 |
(fn (requests, manager) => |
|
92 |
(case next_request_event (Time.now ()) requests of |
|
93 |
NONE => NONE |
|
94 |
| SOME (event, requests') => (Exn.capture event (); SOME ((), (requests', manager))))); |
|
95 |
val finished = |
|
96 |
is_none success andalso |
|
97 |
Synchronized.change_result state (fn (requests, manager) => |
|
98 |
if Requests.is_empty requests then (true, init_state) |
|
99 |
else (false, (requests, manager))); |
|
100 |
in if finished then () else manager_loop () end; |
|
101 |
||
102 |
fun manager_check manager = |
|
103 |
if is_some manager andalso Thread.isActive (the manager) then manager |
|
104 |
else SOME (Simple_Thread.fork false manager_loop); |
|
105 |
||
106 |
||
107 |
(* main operations *) |
|
108 |
||
109 |
fun request time event = |
|
110 |
Synchronized.change_result state (fn (requests, manager) => |
|
111 |
let |
|
112 |
val req = new_request (); |
|
113 |
val requests' = add_request time (req, event) requests; |
|
114 |
in (req, (requests', manager_check manager)) end); |
|
115 |
||
116 |
fun cancel req = |
|
117 |
Synchronized.change_result state (fn (requests, manager) => |
|
118 |
let |
|
52589 | 119 |
val (canceled, requests') = del_request req requests; |
120 |
in (canceled, (requests', manager)) end); |
|
52050 | 121 |
|
122 |
fun shutdown () = |
|
123 |
Synchronized.guarded_access state (fn (requests, manager) => |
|
124 |
if not (Requests.is_empty requests) |
|
125 |
then raise Fail "Cannot shutdown event timer: pending requests" |
|
126 |
else if is_none manager then SOME ((), init_state) |
|
127 |
else NONE); |
|
128 |
||
52798
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
129 |
|
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
130 |
(* future *) |
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
131 |
|
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
132 |
val future = uninterruptible (fn _ => fn time => |
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
133 |
let |
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
134 |
val req: request Single_Assignment.var = Single_Assignment.var "request"; |
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
135 |
fun abort () = ignore (cancel (Single_Assignment.await req)); |
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
136 |
val promise: unit future = Future.promise abort; |
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
137 |
val _ = Single_Assignment.assign req (request time (Future.fulfill promise)); |
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
138 |
in promise end); |
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
wenzelm
parents:
52589
diff
changeset
|
139 |
|
52050 | 140 |
end; |
141 |