src/Pure/Concurrent/future.ML
changeset 28156 5205f7979b4f
child 28162 55772e4e95e0
equal deleted inserted replaced
28155:27b3005de862 28156:5205f7979b4f
       
     1 (*  Title:      Pure/Concurrent/future.ML
       
     2     ID:         $Id$
       
     3     Author:     Makarius
       
     4 
       
     5 Functional threads as future values.
       
     6 *)
       
     7 
       
     8 signature FUTURE =
       
     9 sig
       
    10   type 'a T
       
    11   eqtype id
       
    12   val id_of: 'a T -> id
       
    13   val interrupt: id -> unit
       
    14   val dependent_future: id list -> (unit -> 'a) -> 'a T
       
    15   val future: (unit -> 'a) -> 'a T
       
    16   val await: 'a T -> 'a
       
    17 end;
       
    18 
       
    19 structure Future: FUTURE =
       
    20 struct
       
    21 
       
    22 (* synchronized execution *)
       
    23 
       
    24 local
       
    25   val thread = ref (NONE: Thread.thread option);
       
    26   val lock = Mutex.mutex ();
       
    27   val cond = ConditionVar.conditionVar ();
       
    28 in
       
    29 
       
    30 fun self_synchronized () =
       
    31   (case ! thread of
       
    32     NONE => false
       
    33   | SOME t => Thread.equal (t, Thread.self ()));
       
    34 
       
    35 fun SYNCHRONIZED e =
       
    36   if self_synchronized () then e ()
       
    37   else
       
    38     uninterruptible (fn restore_attributes => fn () =>
       
    39       let
       
    40         val _ = Mutex.lock lock;
       
    41         val _ = thread := SOME (Thread.self ());
       
    42         val result = Exn.capture (restore_attributes e) ();
       
    43         val _ = thread := NONE;
       
    44         val _ = Mutex.unlock lock;
       
    45       in Exn.release result end) ();
       
    46 
       
    47 fun wait () = ConditionVar.wait (cond, lock);
       
    48 fun wait_timeout timeout = ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
       
    49 
       
    50 fun notify_all () = ConditionVar.broadcast cond;
       
    51 
       
    52 end;
       
    53 
       
    54 
       
    55 (* typed futures, unytped ids *)
       
    56 
       
    57 datatype 'a T = Future of serial * 'a Exn.result option ref;
       
    58 
       
    59 datatype id = Id of serial;
       
    60 fun id_of (Future (id, _)) = Id id;
       
    61 
       
    62 local val tag = Universal.tag () : serial Universal.tag in
       
    63   fun get_id () = Thread.getLocal tag;
       
    64   fun put_id id = Thread.setLocal (tag, id);
       
    65 end;
       
    66 
       
    67 
       
    68 (* ordered queue of tasks *)
       
    69 
       
    70 datatype task =
       
    71   Task of (unit -> unit) |
       
    72   Running of Thread.thread;
       
    73 
       
    74 datatype queue = Queue of task IntGraph.T * (serial * (unit -> unit)) Queue.T;
       
    75 
       
    76 val empty_queue = Queue (IntGraph.empty, Queue.empty);
       
    77 
       
    78 fun check_cache (queue as Queue (tasks, cache)) =
       
    79   if not (Queue.is_empty cache) then queue
       
    80   else
       
    81     let
       
    82       val cache' = fold (fn id =>
       
    83         (case IntGraph.get_node tasks id of
       
    84           Task task => Queue.enqueue (id, task)
       
    85         | Running _ => I)) (IntGraph.minimals tasks) Queue.empty;
       
    86     in Queue (tasks, cache') end;
       
    87 
       
    88 val next_task = check_cache #> (fn queue as Queue (tasks, cache) =>
       
    89   if Queue.is_empty cache then (NONE, queue)
       
    90   else
       
    91     let val (task, cache') = Queue.dequeue cache
       
    92     in (SOME task, Queue (tasks, cache')) end);
       
    93 
       
    94 fun get_task (Queue (tasks, _)) id = IntGraph.get_node tasks id;
       
    95 
       
    96 fun new_task deps id task (Queue (tasks, _)) =
       
    97   let
       
    98     fun add_dep (Id dep) G = IntGraph.add_edge_acyclic (dep, id) G
       
    99       handle IntGraph.UNDEF _ => G;  (*dep already finished*)
       
   100     val tasks' = tasks |> IntGraph.new_node (id, Task task) |> fold add_dep deps;
       
   101   in Queue (tasks', Queue.empty) end;
       
   102 
       
   103 fun running_task id thread (Queue (tasks, cache)) =
       
   104   Queue (IntGraph.map_node id (K (Running thread)) tasks, cache);
       
   105 
       
   106 fun finished_task id (Queue (tasks, _)) =
       
   107   Queue (IntGraph.del_nodes [id] tasks, Queue.empty);
       
   108 
       
   109 
       
   110 (* global state *)
       
   111 
       
   112 local val active = ref 0 in
       
   113 
       
   114 fun change_active b = SYNCHRONIZED (fn () =>
       
   115   let
       
   116     val _ = change active (fn n => if b then n + 1 else n - 1);
       
   117     val n = ! active;
       
   118     val _ = Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ string_of_int n ^ " active tasks");
       
   119   in () end);
       
   120 
       
   121 end;
       
   122 
       
   123 val tasks = ref empty_queue;
       
   124 val scheduler = ref (NONE: Thread.thread option);
       
   125 val workers = ref ([]: Thread.thread list);
       
   126 
       
   127 
       
   128 fun interrupt (Id id) = SYNCHRONIZED (fn () =>
       
   129   (case try (get_task (! tasks)) id of
       
   130     SOME (Running thread) => Thread.interrupt thread
       
   131   | _ => ()));
       
   132 
       
   133 
       
   134 (* worker thread *)
       
   135 
       
   136 fun excessive_threads () = false;  (* FIXME *)
       
   137 
       
   138 fun worker_stop () =
       
   139   (change_active false; change workers (filter (fn t => not (Thread.equal (t, Thread.self ())))));
       
   140 
       
   141 fun worker_wait () =
       
   142   (change_active false; wait (); change_active true);
       
   143 
       
   144 fun worker_loop () =
       
   145   (case SYNCHRONIZED (fn () => change_result tasks next_task) of
       
   146     SOME (id, task) =>
       
   147       let
       
   148         val _ = SYNCHRONIZED (fn () => change tasks (running_task id (Thread.self ())));
       
   149         val _ = task ();
       
   150         val _ = SYNCHRONIZED (fn () => change tasks (finished_task id));
       
   151         val _ = notify_all ();
       
   152       in if excessive_threads () then worker_stop () else worker_loop () end
       
   153   | NONE => (worker_wait (); worker_loop ()));
       
   154 
       
   155 fun worker_start () =
       
   156  (change_active true;
       
   157   change workers (cons (Thread.fork (worker_loop, Multithreading.no_interrupts))));
       
   158 
       
   159 
       
   160 (* scheduler *)
       
   161 
       
   162 fun scheduler_loop () =
       
   163   let
       
   164     val m = Multithreading.max_threads_value ();
       
   165     val k = m - length (! workers);
       
   166     val _ = if k > 0 then funpow k worker_start () else ();
       
   167   in wait_timeout (Time.fromSeconds 1); scheduler_loop () end;
       
   168 
       
   169 fun check_scheduler () = SYNCHRONIZED (fn () =>
       
   170   let
       
   171     val scheduler_active =
       
   172       (case ! scheduler of
       
   173         NONE => false
       
   174       | SOME t => Thread.isActive t);
       
   175   in
       
   176     if scheduler_active then ()
       
   177     else scheduler := SOME (Thread.fork (SYNCHRONIZED scheduler_loop, Multithreading.no_interrupts))
       
   178   end);
       
   179 
       
   180 
       
   181 (* future values *)
       
   182 
       
   183 fun dependent_future deps (e: unit -> 'a) =
       
   184   let
       
   185     val _ = check_scheduler ();
       
   186 
       
   187     val r = ref (NONE: 'a Exn.result option);
       
   188     val task = Multithreading.with_attributes (Thread.getAttributes ())
       
   189       (fn _ => fn () => r := SOME (Exn.capture e ()));
       
   190     val id = serial ();
       
   191     val _ = SYNCHRONIZED (fn () => change tasks (new_task deps id task));
       
   192     val _ = notify_all ();
       
   193   in Future (id, r) end;
       
   194 
       
   195 fun future e = dependent_future [] e;
       
   196 
       
   197 fun await (Future (_, r)) =
       
   198   let
       
   199     val _ = check_scheduler ();
       
   200 
       
   201     fun loop () =
       
   202       (case SYNCHRONIZED (fn () => ! r) of
       
   203         NONE => (wait (); loop ())
       
   204       | SOME res => Exn.release res);
       
   205   in loop () end;
       
   206 
       
   207 end;