52050
|
1 |
(* Title: Pure/Concurrent/event_timer.ML
|
|
2 |
Author: Makarius
|
|
3 |
|
|
4 |
Initiate event after given point in time.
|
|
5 |
|
|
6 |
Note: events are run as synchronized action within a dedicated thread
|
|
7 |
and should finish quickly without further ado.
|
|
8 |
*)
|
|
9 |
|
|
10 |
signature EVENT_TIMER =
|
|
11 |
sig
|
|
12 |
type event = unit -> unit
|
|
13 |
eqtype request
|
|
14 |
val request: Time.time -> event -> request
|
|
15 |
val cancel: request -> bool
|
|
16 |
val shutdown: unit -> unit
|
|
17 |
end;
|
|
18 |
|
|
19 |
structure Event_Timer: EVENT_TIMER =
|
|
20 |
struct
|
|
21 |
|
|
22 |
(* type event *)
|
|
23 |
|
|
24 |
type event = unit -> unit;
|
|
25 |
|
|
26 |
|
|
27 |
(* type request *)
|
|
28 |
|
|
29 |
val request_counter = Synchronized.counter ();
|
|
30 |
datatype request = Request of int;
|
|
31 |
fun new_request () = Request (request_counter ());
|
|
32 |
|
|
33 |
|
|
34 |
(* type requests *)
|
|
35 |
|
|
36 |
structure Requests = Table(type key = Time.time val ord = Time.compare);
|
|
37 |
type requests = (request * event) list Requests.table;
|
|
38 |
|
|
39 |
fun add_request time entry (requests: requests) =
|
|
40 |
Requests.cons_list (time, entry) requests;
|
|
41 |
|
|
42 |
fun del_request req (requests: requests) =
|
|
43 |
let
|
|
44 |
val old_request =
|
|
45 |
requests |> Requests.get_first (fn (key, entries) =>
|
|
46 |
entries |> get_first (fn entry => if fst entry = req then SOME (key, entry) else NONE));
|
|
47 |
in
|
|
48 |
(case old_request of
|
|
49 |
NONE => (false, requests)
|
|
50 |
| SOME old => (true, Requests.remove_list (eq_fst op =) old requests))
|
|
51 |
end;
|
|
52 |
|
|
53 |
fun next_request_time (requests: requests) =
|
|
54 |
Option.map fst (Requests.min requests);
|
|
55 |
|
|
56 |
fun next_request_event t0 (requests: requests) =
|
|
57 |
(case Requests.min requests of
|
|
58 |
NONE => NONE
|
|
59 |
| SOME (time, entries) =>
|
|
60 |
if Time.< (t0, time) then NONE
|
|
61 |
else
|
|
62 |
let
|
|
63 |
val (rest, (_, event)) = split_last entries;
|
|
64 |
val requests' =
|
|
65 |
if null rest then Requests.delete time requests
|
|
66 |
else Requests.update (time, rest) requests;
|
|
67 |
in SOME (event, requests') end);
|
|
68 |
|
|
69 |
|
|
70 |
(* global state *)
|
|
71 |
|
|
72 |
type state = requests * Thread.thread option;
|
|
73 |
val init_state: state = (Requests.empty, NONE);
|
|
74 |
|
|
75 |
val state = Synchronized.var "Event_Timer.state" init_state;
|
|
76 |
|
|
77 |
|
|
78 |
(* manager thread *)
|
|
79 |
|
|
80 |
val manager_timeout = seconds 0.3;
|
|
81 |
|
|
82 |
fun manager_loop () =
|
|
83 |
let
|
|
84 |
val success =
|
|
85 |
Synchronized.timed_access state
|
|
86 |
(fn (requests, _) =>
|
|
87 |
(case next_request_time requests of
|
|
88 |
NONE => SOME (Time.+ (Time.now (), manager_timeout))
|
|
89 |
| some => some))
|
|
90 |
(fn (requests, manager) =>
|
|
91 |
(case next_request_event (Time.now ()) requests of
|
|
92 |
NONE => NONE
|
|
93 |
| SOME (event, requests') => (Exn.capture event (); SOME ((), (requests', manager)))));
|
|
94 |
val finished =
|
|
95 |
is_none success andalso
|
|
96 |
Synchronized.change_result state (fn (requests, manager) =>
|
|
97 |
if Requests.is_empty requests then (true, init_state)
|
|
98 |
else (false, (requests, manager)));
|
|
99 |
in if finished then () else manager_loop () end;
|
|
100 |
|
|
101 |
fun manager_check manager =
|
|
102 |
if is_some manager andalso Thread.isActive (the manager) then manager
|
|
103 |
else SOME (Simple_Thread.fork false manager_loop);
|
|
104 |
|
|
105 |
|
|
106 |
(* main operations *)
|
|
107 |
|
|
108 |
fun request time event =
|
|
109 |
Synchronized.change_result state (fn (requests, manager) =>
|
|
110 |
let
|
|
111 |
val req = new_request ();
|
|
112 |
val requests' = add_request time (req, event) requests;
|
|
113 |
in (req, (requests', manager_check manager)) end);
|
|
114 |
|
|
115 |
fun cancel req =
|
|
116 |
Synchronized.change_result state (fn (requests, manager) =>
|
|
117 |
let
|
|
118 |
val (cancelled, requests') = del_request req requests;
|
|
119 |
in (cancelled, (requests', manager)) end);
|
|
120 |
|
|
121 |
fun shutdown () =
|
|
122 |
Synchronized.guarded_access state (fn (requests, manager) =>
|
|
123 |
if not (Requests.is_empty requests)
|
|
124 |
then raise Fail "Cannot shutdown event timer: pending requests"
|
|
125 |
else if is_none manager then SOME ((), init_state)
|
|
126 |
else NONE);
|
|
127 |
|
|
128 |
end;
|
|
129 |
|