src/Pure/Concurrent/future.ML
author wenzelm
Sun Sep 07 22:19:42 2008 +0200 (2008-09-07 ago)
changeset 28156 5205f7979b4f
child 28162 55772e4e95e0
permissions -rw-r--r--
Functional threads as future values.
wenzelm@28156
     1
(*  Title:      Pure/Concurrent/future.ML
wenzelm@28156
     2
    ID:         $Id$
wenzelm@28156
     3
    Author:     Makarius
wenzelm@28156
     4
wenzelm@28156
     5
Functional threads as future values.
wenzelm@28156
     6
*)
wenzelm@28156
     7
wenzelm@28156
     8
signature FUTURE =
wenzelm@28156
     9
sig
wenzelm@28156
    10
  type 'a T
wenzelm@28156
    11
  eqtype id
wenzelm@28156
    12
  val id_of: 'a T -> id
wenzelm@28156
    13
  val interrupt: id -> unit
wenzelm@28156
    14
  val dependent_future: id list -> (unit -> 'a) -> 'a T
wenzelm@28156
    15
  val future: (unit -> 'a) -> 'a T
wenzelm@28156
    16
  val await: 'a T -> 'a
wenzelm@28156
    17
end;
wenzelm@28156
    18
wenzelm@28156
    19
structure Future: FUTURE =
wenzelm@28156
    20
struct
wenzelm@28156
    21
wenzelm@28156
    22
(* synchronized execution *)
wenzelm@28156
    23
wenzelm@28156
    24
local
wenzelm@28156
    25
  val thread = ref (NONE: Thread.thread option);
wenzelm@28156
    26
  val lock = Mutex.mutex ();
wenzelm@28156
    27
  val cond = ConditionVar.conditionVar ();
wenzelm@28156
    28
in
wenzelm@28156
    29
wenzelm@28156
    30
fun self_synchronized () =
wenzelm@28156
    31
  (case ! thread of
wenzelm@28156
    32
    NONE => false
wenzelm@28156
    33
  | SOME t => Thread.equal (t, Thread.self ()));
wenzelm@28156
    34
wenzelm@28156
    35
fun SYNCHRONIZED e =
wenzelm@28156
    36
  if self_synchronized () then e ()
wenzelm@28156
    37
  else
wenzelm@28156
    38
    uninterruptible (fn restore_attributes => fn () =>
wenzelm@28156
    39
      let
wenzelm@28156
    40
        val _ = Mutex.lock lock;
wenzelm@28156
    41
        val _ = thread := SOME (Thread.self ());
wenzelm@28156
    42
        val result = Exn.capture (restore_attributes e) ();
wenzelm@28156
    43
        val _ = thread := NONE;
wenzelm@28156
    44
        val _ = Mutex.unlock lock;
wenzelm@28156
    45
      in Exn.release result end) ();
wenzelm@28156
    46
wenzelm@28156
    47
fun wait () = ConditionVar.wait (cond, lock);
wenzelm@28156
    48
fun wait_timeout timeout = ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
wenzelm@28156
    49
wenzelm@28156
    50
fun notify_all () = ConditionVar.broadcast cond;
wenzelm@28156
    51
wenzelm@28156
    52
end;
wenzelm@28156
    53
wenzelm@28156
    54
wenzelm@28156
    55
(* typed futures, unytped ids *)
wenzelm@28156
    56
wenzelm@28156
    57
datatype 'a T = Future of serial * 'a Exn.result option ref;
wenzelm@28156
    58
wenzelm@28156
    59
datatype id = Id of serial;
wenzelm@28156
    60
fun id_of (Future (id, _)) = Id id;
wenzelm@28156
    61
wenzelm@28156
    62
local val tag = Universal.tag () : serial Universal.tag in
wenzelm@28156
    63
  fun get_id () = Thread.getLocal tag;
wenzelm@28156
    64
  fun put_id id = Thread.setLocal (tag, id);
wenzelm@28156
    65
end;
wenzelm@28156
    66
wenzelm@28156
    67
wenzelm@28156
    68
(* ordered queue of tasks *)
wenzelm@28156
    69
wenzelm@28156
    70
datatype task =
wenzelm@28156
    71
  Task of (unit -> unit) |
wenzelm@28156
    72
  Running of Thread.thread;
wenzelm@28156
    73
wenzelm@28156
    74
datatype queue = Queue of task IntGraph.T * (serial * (unit -> unit)) Queue.T;
wenzelm@28156
    75
wenzelm@28156
    76
val empty_queue = Queue (IntGraph.empty, Queue.empty);
wenzelm@28156
    77
wenzelm@28156
    78
fun check_cache (queue as Queue (tasks, cache)) =
wenzelm@28156
    79
  if not (Queue.is_empty cache) then queue
wenzelm@28156
    80
  else
wenzelm@28156
    81
    let
wenzelm@28156
    82
      val cache' = fold (fn id =>
wenzelm@28156
    83
        (case IntGraph.get_node tasks id of
wenzelm@28156
    84
          Task task => Queue.enqueue (id, task)
wenzelm@28156
    85
        | Running _ => I)) (IntGraph.minimals tasks) Queue.empty;
wenzelm@28156
    86
    in Queue (tasks, cache') end;
wenzelm@28156
    87
wenzelm@28156
    88
val next_task = check_cache #> (fn queue as Queue (tasks, cache) =>
wenzelm@28156
    89
  if Queue.is_empty cache then (NONE, queue)
wenzelm@28156
    90
  else
wenzelm@28156
    91
    let val (task, cache') = Queue.dequeue cache
wenzelm@28156
    92
    in (SOME task, Queue (tasks, cache')) end);
wenzelm@28156
    93
wenzelm@28156
    94
fun get_task (Queue (tasks, _)) id = IntGraph.get_node tasks id;
wenzelm@28156
    95
wenzelm@28156
    96
fun new_task deps id task (Queue (tasks, _)) =
wenzelm@28156
    97
  let
wenzelm@28156
    98
    fun add_dep (Id dep) G = IntGraph.add_edge_acyclic (dep, id) G
wenzelm@28156
    99
      handle IntGraph.UNDEF _ => G;  (*dep already finished*)
wenzelm@28156
   100
    val tasks' = tasks |> IntGraph.new_node (id, Task task) |> fold add_dep deps;
wenzelm@28156
   101
  in Queue (tasks', Queue.empty) end;
wenzelm@28156
   102
wenzelm@28156
   103
fun running_task id thread (Queue (tasks, cache)) =
wenzelm@28156
   104
  Queue (IntGraph.map_node id (K (Running thread)) tasks, cache);
wenzelm@28156
   105
wenzelm@28156
   106
fun finished_task id (Queue (tasks, _)) =
wenzelm@28156
   107
  Queue (IntGraph.del_nodes [id] tasks, Queue.empty);
wenzelm@28156
   108
wenzelm@28156
   109
wenzelm@28156
   110
(* global state *)
wenzelm@28156
   111
wenzelm@28156
   112
local val active = ref 0 in
wenzelm@28156
   113
wenzelm@28156
   114
fun change_active b = SYNCHRONIZED (fn () =>
wenzelm@28156
   115
  let
wenzelm@28156
   116
    val _ = change active (fn n => if b then n + 1 else n - 1);
wenzelm@28156
   117
    val n = ! active;
wenzelm@28156
   118
    val _ = Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ string_of_int n ^ " active tasks");
wenzelm@28156
   119
  in () end);
wenzelm@28156
   120
wenzelm@28156
   121
end;
wenzelm@28156
   122
wenzelm@28156
   123
val tasks = ref empty_queue;
wenzelm@28156
   124
val scheduler = ref (NONE: Thread.thread option);
wenzelm@28156
   125
val workers = ref ([]: Thread.thread list);
wenzelm@28156
   126
wenzelm@28156
   127
wenzelm@28156
   128
fun interrupt (Id id) = SYNCHRONIZED (fn () =>
wenzelm@28156
   129
  (case try (get_task (! tasks)) id of
wenzelm@28156
   130
    SOME (Running thread) => Thread.interrupt thread
wenzelm@28156
   131
  | _ => ()));
wenzelm@28156
   132
wenzelm@28156
   133
wenzelm@28156
   134
(* worker thread *)
wenzelm@28156
   135
wenzelm@28156
   136
fun excessive_threads () = false;  (* FIXME *)
wenzelm@28156
   137
wenzelm@28156
   138
fun worker_stop () =
wenzelm@28156
   139
  (change_active false; change workers (filter (fn t => not (Thread.equal (t, Thread.self ())))));
wenzelm@28156
   140
wenzelm@28156
   141
fun worker_wait () =
wenzelm@28156
   142
  (change_active false; wait (); change_active true);
wenzelm@28156
   143
wenzelm@28156
   144
fun worker_loop () =
wenzelm@28156
   145
  (case SYNCHRONIZED (fn () => change_result tasks next_task) of
wenzelm@28156
   146
    SOME (id, task) =>
wenzelm@28156
   147
      let
wenzelm@28156
   148
        val _ = SYNCHRONIZED (fn () => change tasks (running_task id (Thread.self ())));
wenzelm@28156
   149
        val _ = task ();
wenzelm@28156
   150
        val _ = SYNCHRONIZED (fn () => change tasks (finished_task id));
wenzelm@28156
   151
        val _ = notify_all ();
wenzelm@28156
   152
      in if excessive_threads () then worker_stop () else worker_loop () end
wenzelm@28156
   153
  | NONE => (worker_wait (); worker_loop ()));
wenzelm@28156
   154
wenzelm@28156
   155
fun worker_start () =
wenzelm@28156
   156
 (change_active true;
wenzelm@28156
   157
  change workers (cons (Thread.fork (worker_loop, Multithreading.no_interrupts))));
wenzelm@28156
   158
wenzelm@28156
   159
wenzelm@28156
   160
(* scheduler *)
wenzelm@28156
   161
wenzelm@28156
   162
fun scheduler_loop () =
wenzelm@28156
   163
  let
wenzelm@28156
   164
    val m = Multithreading.max_threads_value ();
wenzelm@28156
   165
    val k = m - length (! workers);
wenzelm@28156
   166
    val _ = if k > 0 then funpow k worker_start () else ();
wenzelm@28156
   167
  in wait_timeout (Time.fromSeconds 1); scheduler_loop () end;
wenzelm@28156
   168
wenzelm@28156
   169
fun check_scheduler () = SYNCHRONIZED (fn () =>
wenzelm@28156
   170
  let
wenzelm@28156
   171
    val scheduler_active =
wenzelm@28156
   172
      (case ! scheduler of
wenzelm@28156
   173
        NONE => false
wenzelm@28156
   174
      | SOME t => Thread.isActive t);
wenzelm@28156
   175
  in
wenzelm@28156
   176
    if scheduler_active then ()
wenzelm@28156
   177
    else scheduler := SOME (Thread.fork (SYNCHRONIZED scheduler_loop, Multithreading.no_interrupts))
wenzelm@28156
   178
  end);
wenzelm@28156
   179
wenzelm@28156
   180
wenzelm@28156
   181
(* future values *)
wenzelm@28156
   182
wenzelm@28156
   183
fun dependent_future deps (e: unit -> 'a) =
wenzelm@28156
   184
  let
wenzelm@28156
   185
    val _ = check_scheduler ();
wenzelm@28156
   186
wenzelm@28156
   187
    val r = ref (NONE: 'a Exn.result option);
wenzelm@28156
   188
    val task = Multithreading.with_attributes (Thread.getAttributes ())
wenzelm@28156
   189
      (fn _ => fn () => r := SOME (Exn.capture e ()));
wenzelm@28156
   190
    val id = serial ();
wenzelm@28156
   191
    val _ = SYNCHRONIZED (fn () => change tasks (new_task deps id task));
wenzelm@28156
   192
    val _ = notify_all ();
wenzelm@28156
   193
  in Future (id, r) end;
wenzelm@28156
   194
wenzelm@28156
   195
fun future e = dependent_future [] e;
wenzelm@28156
   196
wenzelm@28156
   197
fun await (Future (_, r)) =
wenzelm@28156
   198
  let
wenzelm@28156
   199
    val _ = check_scheduler ();
wenzelm@28156
   200
wenzelm@28156
   201
    fun loop () =
wenzelm@28156
   202
      (case SYNCHRONIZED (fn () => ! r) of
wenzelm@28156
   203
        NONE => (wait (); loop ())
wenzelm@28156
   204
      | SOME res => Exn.release res);
wenzelm@28156
   205
  in loop () end;
wenzelm@28156
   206
wenzelm@28156
   207
end;