| author | wenzelm | 
| Wed, 06 Dec 2017 18:59:33 +0100 | |
| changeset 67147 | dea94b1aabc3 | 
| parent 66166 | c88d1c36c9c3 | 
| child 69823 | 93784805c6c5 | 
| permissions | -rw-r--r-- | 
| 52050 | 1 | (* Title: Pure/Concurrent/event_timer.ML | 
| 2 | Author: Makarius | |
| 3 | ||
| 4 | Initiate event after given point in time. | |
| 5 | ||
| 6 | Note: events are run as synchronized action within a dedicated thread | |
| 7 | and should finish quickly without further ado. | |
| 8 | *) | |
| 9 | ||
| 10 | signature EVENT_TIMER = | |
| 11 | sig | |
| 12 | eqtype request | |
| 56768 | 13 | val request: Time.time -> (unit -> unit) -> request | 
| 52050 | 14 | val cancel: request -> bool | 
| 59339 | 15 | val future: Time.time -> unit future | 
| 52050 | 16 | val shutdown: unit -> unit | 
| 17 | end; | |
| 18 | ||
| 19 | structure Event_Timer: EVENT_TIMER = | |
| 20 | struct | |
| 21 | ||
| 22 | (* type request *) | |
| 23 | ||
| 52537 | 24 | val request_counter = Counter.make (); | 
| 52050 | 25 | datatype request = Request of int; | 
| 26 | fun new_request () = Request (request_counter ()); | |
| 27 | ||
| 28 | ||
| 29 | (* type requests *) | |
| 30 | ||
| 31 | structure Requests = Table(type key = Time.time val ord = Time.compare); | |
| 56768 | 32 | type requests = (request * (unit -> unit)) list Requests.table; | 
| 52050 | 33 | |
| 34 | fun add_request time entry (requests: requests) = | |
| 35 | Requests.cons_list (time, entry) requests; | |
| 36 | ||
| 37 | fun del_request req (requests: requests) = | |
| 38 | let | |
| 39 | val old_request = | |
| 40 | requests |> Requests.get_first (fn (key, entries) => | |
| 41 | entries |> get_first (fn entry => if fst entry = req then SOME (key, entry) else NONE)); | |
| 42 | in | |
| 43 | (case old_request of | |
| 44 | NONE => (false, requests) | |
| 45 | | SOME old => (true, Requests.remove_list (eq_fst op =) old requests)) | |
| 46 | end; | |
| 47 | ||
| 48 | fun next_request_time (requests: requests) = | |
| 49 | Option.map fst (Requests.min requests); | |
| 50 | ||
| 51 | fun next_request_event t0 (requests: requests) = | |
| 52 | (case Requests.min requests of | |
| 53 | NONE => NONE | |
| 54 | | SOME (time, entries) => | |
| 62826 | 55 | if t0 < time then NONE | 
| 52050 | 56 | else | 
| 57 | let | |
| 58 | val (rest, (_, event)) = split_last entries; | |
| 59 | val requests' = | |
| 60 | if null rest then Requests.delete time requests | |
| 61 | else Requests.update (time, rest) requests; | |
| 62 | in SOME (event, requests') end); | |
| 63 | ||
| 64 | ||
| 65 | (* global state *) | |
| 66 | ||
| 59329 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 67 | datatype status = Normal | Shutdown_Req | Shutdown_Ack; | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 68 | |
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 69 | datatype state = | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 70 |   State of {requests: requests, status: status, manager: Thread.thread option};
 | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 71 | |
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 72 | fun make_state (requests, status, manager) = | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 73 |   State {requests = requests, status = status, manager = manager};
 | 
| 52050 | 74 | |
| 59329 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 75 | val normal_state = make_state (Requests.empty, Normal, NONE); | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 76 | val shutdown_ack_state = make_state (Requests.empty, Shutdown_Ack, NONE); | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 77 | |
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 78 | fun is_shutdown s (State {requests, status, manager}) =
 | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 79 | Requests.is_empty requests andalso status = s andalso is_none manager; | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 80 | |
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 81 | fun is_shutdown_req (State {requests, status, ...}) =
 | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 82 | Requests.is_empty requests andalso status = Shutdown_Req; | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 83 | |
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 84 | val state = Synchronized.var "Event_Timer.state" normal_state; | 
| 52050 | 85 | |
| 86 | ||
| 87 | (* manager thread *) | |
| 88 | ||
| 89 | fun manager_loop () = | |
| 59329 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 90 | if Synchronized.timed_access state | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 91 |     (fn State {requests, ...} => next_request_time requests)
 | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 92 |     (fn st as State {requests, status, manager} =>
 | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 93 | (case next_request_event (Time.now ()) requests of | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 94 | SOME (event, requests') => | 
| 59337 | 95 | let | 
| 96 | val _ = Exn.capture event (); | |
| 97 | val state' = make_state (requests', status, manager); | |
| 98 | in SOME (true, state') end | |
| 59329 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 99 | | NONE => | 
| 59337 | 100 | if is_shutdown_req st | 
| 101 | then SOME (false, shutdown_ack_state) | |
| 59339 | 102 | else NONE)) <> SOME false | 
| 103 | then manager_loop () else (); | |
| 52050 | 104 | |
| 105 | fun manager_check manager = | |
| 106 | if is_some manager andalso Thread.isActive (the manager) then manager | |
| 60764 | 107 | else | 
| 61556 | 108 |     SOME (Standard_Thread.fork {name = "event_timer", stack_limit = NONE, interrupts = false}
 | 
| 60764 | 109 | manager_loop); | 
| 52050 | 110 | |
| 111 | fun shutdown () = | |
| 62923 | 112 | Thread_Attributes.uninterruptible (fn restore_attributes => fn () => | 
| 62239 | 113 |     if Synchronized.change_result state (fn st as State {requests, manager, ...} =>
 | 
| 59337 | 114 | if is_shutdown Normal st then (false, st) | 
| 115 | else if is_shutdown Shutdown_Ack st orelse is_shutdown_req st then | |
| 59329 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 116 | raise Fail "Concurrent attempt to shutdown event timer" | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 117 | else (true, make_state (requests, Shutdown_Req, manager_check manager))) | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 118 | then | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 119 | restore_attributes (fn () => | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 120 | Synchronized.guarded_access state | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 121 | (fn st => if is_shutdown Shutdown_Ack st then SOME ((), normal_state) else NONE)) () | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 122 | handle exn => | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 123 | if Exn.is_interrupt exn then | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 124 |           Synchronized.change state (fn State {requests, manager, ...} =>
 | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 125 | make_state (requests, Normal, manager)) | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 126 | else () | 
| 
72278d083d3a
clarified Event_Timer.shutdown: manager thread remains until final shutdown in Session.finish;
 wenzelm parents: 
56768diff
changeset | 127 | else ()) (); | 
| 52050 | 128 | |
| 52798 
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
 wenzelm parents: 
52589diff
changeset | 129 | |
| 59339 | 130 | (* main operations *) | 
| 131 | ||
| 132 | fun request time event = | |
| 133 |   Synchronized.change_result state (fn State {requests, status, manager} =>
 | |
| 134 | let | |
| 135 | val req = new_request (); | |
| 136 | val requests' = add_request time (req, event) requests; | |
| 137 | val manager' = manager_check manager; | |
| 138 | in (req, make_state (requests', status, manager')) end); | |
| 139 | ||
| 140 | fun cancel req = | |
| 141 |   Synchronized.change_result state (fn State {requests, status, manager} =>
 | |
| 142 | let | |
| 143 | val (canceled, requests') = del_request req requests; | |
| 144 | val manager' = manager_check manager; | |
| 145 | in (canceled, make_state (requests', status, manager')) end); | |
| 52798 
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
 wenzelm parents: 
52589diff
changeset | 146 | |
| 62923 | 147 | val future = Thread_Attributes.uninterruptible (fn _ => fn time => | 
| 52798 
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
 wenzelm parents: 
52589diff
changeset | 148 | let | 
| 
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
 wenzelm parents: 
52589diff
changeset | 149 | val req: request Single_Assignment.var = Single_Assignment.var "request"; | 
| 
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
 wenzelm parents: 
52589diff
changeset | 150 | fun abort () = ignore (cancel (Single_Assignment.await req)); | 
| 66166 | 151 | val promise: unit future = Future.promise_name "event_timer" abort; | 
| 52798 
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
 wenzelm parents: 
52589diff
changeset | 152 | val _ = Single_Assignment.assign req (request time (Future.fulfill promise)); | 
| 
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
 wenzelm parents: 
52589diff
changeset | 153 | in promise end); | 
| 
9d3c9862d1dd
recovered delay for Document.start_execution (see also 627fb639a2d9), which potentially improves throughput when many consecutive edits arrive;
 wenzelm parents: 
52589diff
changeset | 154 | |
| 52050 | 155 | end; | 
| 156 |