src/Pure/Concurrent/event_timer.ML
author wenzelm
Sat Apr 02 23:29:05 2016 +0200 (2016-04-02 ago)
changeset 62826 eb94e570c1a4
parent 62239 6ee95b93fbed
child 62891 7a11ea5c9626
permissions -rw-r--r--
prefer infix operations;
     1 (*  Title:      Pure/Concurrent/event_timer.ML
     2     Author:     Makarius
     3 
     4 Initiate event after given point in time.
     5 
     6 Note: events are run as synchronized action within a dedicated thread
     7 and should finish quickly without further ado.
     8 *)
     9 
    10 signature EVENT_TIMER =
    11 sig
    12   eqtype request
    13   val request: Time.time -> (unit -> unit) -> request
    14   val cancel: request -> bool
    15   val future: Time.time -> unit future
    16   val shutdown: unit -> unit
    17 end;
    18 
    19 structure Event_Timer: EVENT_TIMER =
    20 struct
    21 
    22 (* type request *)
    23 
    24 val request_counter = Counter.make ();
    25 datatype request = Request of int;
    26 fun new_request () = Request (request_counter ());
    27 
    28 
    29 (* type requests *)
    30 
    31 structure Requests = Table(type key = Time.time val ord = Time.compare);
    32 type requests = (request * (unit -> unit)) list Requests.table;
    33 
    34 fun add_request time entry (requests: requests) =
    35   Requests.cons_list (time, entry) requests;
    36 
    37 fun del_request req (requests: requests) =
    38   let
    39     val old_request =
    40       requests |> Requests.get_first (fn (key, entries) =>
    41         entries |> get_first (fn entry => if fst entry = req then SOME (key, entry) else NONE));
    42   in
    43     (case old_request of
    44       NONE => (false, requests)
    45     | SOME old => (true, Requests.remove_list (eq_fst op =) old requests))
    46   end;
    47 
    48 fun next_request_time (requests: requests) =
    49   Option.map fst (Requests.min requests);
    50 
    51 fun next_request_event t0 (requests: requests) =
    52   (case Requests.min requests of
    53     NONE => NONE
    54   | SOME (time, entries) =>
    55       if t0 < time then NONE
    56       else
    57         let
    58           val (rest, (_, event)) = split_last entries;
    59           val requests' =
    60             if null rest then Requests.delete time requests
    61             else Requests.update (time, rest) requests;
    62         in SOME (event, requests') end);
    63 
    64 
    65 (* global state *)
    66 
    67 datatype status = Normal | Shutdown_Req | Shutdown_Ack;
    68 
    69 datatype state =
    70   State of {requests: requests, status: status, manager: Thread.thread option};
    71 
    72 fun make_state (requests, status, manager) =
    73   State {requests = requests, status = status, manager = manager};
    74 
    75 val normal_state = make_state (Requests.empty, Normal, NONE);
    76 val shutdown_ack_state = make_state (Requests.empty, Shutdown_Ack, NONE);
    77 
    78 fun is_shutdown s (State {requests, status, manager}) =
    79   Requests.is_empty requests andalso status = s andalso is_none manager;
    80 
    81 fun is_shutdown_req (State {requests, status, ...}) =
    82   Requests.is_empty requests andalso status = Shutdown_Req;
    83 
    84 val state = Synchronized.var "Event_Timer.state" normal_state;
    85 
    86 
    87 (* manager thread *)
    88 
    89 fun manager_loop () =
    90   if Synchronized.timed_access state
    91     (fn State {requests, ...} => next_request_time requests)
    92     (fn st as State {requests, status, manager} =>
    93       (case next_request_event (Time.now ()) requests of
    94         SOME (event, requests') =>
    95           let
    96             val _ = Exn.capture event ();
    97             val state' = make_state (requests', status, manager);
    98           in SOME (true, state') end
    99       | NONE =>
   100           if is_shutdown_req st
   101           then SOME (false, shutdown_ack_state)
   102           else NONE)) <> SOME false
   103   then manager_loop () else ();
   104 
   105 fun manager_check manager =
   106   if is_some manager andalso Thread.isActive (the manager) then manager
   107   else
   108     SOME (Standard_Thread.fork {name = "event_timer", stack_limit = NONE, interrupts = false}
   109       manager_loop);
   110 
   111 fun shutdown () =
   112   uninterruptible (fn restore_attributes => fn () =>
   113     if Synchronized.change_result state (fn st as State {requests, manager, ...} =>
   114       if is_shutdown Normal st then (false, st)
   115       else if is_shutdown Shutdown_Ack st orelse is_shutdown_req st then
   116         raise Fail "Concurrent attempt to shutdown event timer"
   117       else (true, make_state (requests, Shutdown_Req, manager_check manager)))
   118     then
   119       restore_attributes (fn () =>
   120         Synchronized.guarded_access state
   121           (fn st => if is_shutdown Shutdown_Ack st then SOME ((), normal_state) else NONE)) ()
   122       handle exn =>
   123         if Exn.is_interrupt exn then
   124           Synchronized.change state (fn State {requests, manager, ...} =>
   125             make_state (requests, Normal, manager))
   126         else ()
   127     else ()) ();
   128 
   129 
   130 (* main operations *)
   131 
   132 fun request time event =
   133   Synchronized.change_result state (fn State {requests, status, manager} =>
   134     let
   135       val req = new_request ();
   136       val requests' = add_request time (req, event) requests;
   137       val manager' = manager_check manager;
   138     in (req, make_state (requests', status, manager')) end);
   139 
   140 fun cancel req =
   141   Synchronized.change_result state (fn State {requests, status, manager} =>
   142     let
   143       val (canceled, requests') = del_request req requests;
   144       val manager' = manager_check manager;
   145     in (canceled, make_state (requests', status, manager')) end);
   146 
   147 val future = uninterruptible (fn _ => fn time =>
   148   let
   149     val req: request Single_Assignment.var = Single_Assignment.var "request";
   150     fun abort () = ignore (cancel (Single_Assignment.await req));
   151     val promise: unit future = Future.promise abort;
   152     val _ = Single_Assignment.assign req (request time (Future.fulfill promise));
   153   in promise end);
   154 
   155 end;
   156