src/Pure/Concurrent/future.ML
author wenzelm
Mon Sep 08 16:08:23 2008 +0200 (2008-09-08 ago)
changeset 28166 43087721a66e
parent 28163 8bf8c21296ca
child 28167 27e2ca41b58c
permissions -rw-r--r--
moved task, thread_data, group, queue to task_queue.ML;
tuned signature;
SYNCHRONIZED notify_all!
misc tuning;
wenzelm@28156
     1
(*  Title:      Pure/Concurrent/future.ML
wenzelm@28156
     2
    ID:         $Id$
wenzelm@28156
     3
    Author:     Makarius
wenzelm@28156
     4
wenzelm@28156
     5
Functional threads as future values.
wenzelm@28156
     6
*)
wenzelm@28156
     7
wenzelm@28156
     8
signature FUTURE =
wenzelm@28156
     9
sig
wenzelm@28166
    10
  type task = TaskQueue.task
wenzelm@28166
    11
  type group = TaskQueue.group
wenzelm@28156
    12
  type 'a T
wenzelm@28166
    13
  val task_of: 'a T -> task
wenzelm@28166
    14
  val group_of: 'a T -> group option
wenzelm@28166
    15
  val future: group option -> task list -> (unit -> 'a) -> 'a T
wenzelm@28166
    16
  val fork: (unit -> 'a) -> 'a T
wenzelm@28166
    17
  val join: 'a T -> 'a
wenzelm@28166
    18
  val interrupt: task -> unit
wenzelm@28166
    19
  val interrupt_group: group -> unit
wenzelm@28156
    20
end;
wenzelm@28156
    21
wenzelm@28156
    22
structure Future: FUTURE =
wenzelm@28156
    23
struct
wenzelm@28156
    24
wenzelm@28156
    25
(* synchronized execution *)
wenzelm@28156
    26
wenzelm@28156
    27
local
wenzelm@28156
    28
  val lock = Mutex.mutex ();
wenzelm@28156
    29
  val cond = ConditionVar.conditionVar ();
wenzelm@28156
    30
in
wenzelm@28156
    31
wenzelm@28162
    32
fun SYNCHRONIZED e = uninterruptible (fn restore_attributes => fn () =>
wenzelm@28162
    33
  let
wenzelm@28162
    34
    val _ = Mutex.lock lock;
wenzelm@28162
    35
    val result = Exn.capture (restore_attributes e) ();
wenzelm@28162
    36
    val _ = Mutex.unlock lock;
wenzelm@28162
    37
  in Exn.release result end) ();
wenzelm@28156
    38
wenzelm@28166
    39
fun wait () = (*requires SYNCHRONIZED*)
wenzelm@28166
    40
  ConditionVar.wait (cond, lock);
wenzelm@28156
    41
wenzelm@28166
    42
fun wait_timeout timeout = (*requires SYNCHRONIZED*)
wenzelm@28166
    43
  ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
wenzelm@28166
    44
wenzelm@28166
    45
fun notify_all () = (*requires SYNCHRONIZED*)
wenzelm@28166
    46
  ConditionVar.broadcast cond;
wenzelm@28156
    47
wenzelm@28156
    48
end;
wenzelm@28156
    49
wenzelm@28156
    50
wenzelm@28166
    51
(* datatype future *)
wenzelm@28156
    52
wenzelm@28166
    53
type task = TaskQueue.task;
wenzelm@28166
    54
type group = TaskQueue.group;
wenzelm@28156
    55
wenzelm@28166
    56
datatype 'a T = Future of
wenzelm@28166
    57
 {task: task,
wenzelm@28166
    58
  group: group option,
wenzelm@28166
    59
  result: 'a Exn.result option ref};
wenzelm@28156
    60
wenzelm@28166
    61
fun task_of (Future {task, ...}) = task;
wenzelm@28166
    62
fun group_of (Future {group, ...}) = group;
wenzelm@28156
    63
wenzelm@28156
    64
wenzelm@28156
    65
(* global state *)
wenzelm@28156
    66
wenzelm@28166
    67
val queue = ref TaskQueue.empty;
wenzelm@28156
    68
val scheduler = ref (NONE: Thread.thread option);
wenzelm@28156
    69
val workers = ref ([]: Thread.thread list);
wenzelm@28156
    70
wenzelm@28156
    71
wenzelm@28156
    72
(* worker thread *)
wenzelm@28156
    73
wenzelm@28162
    74
local val active = ref 0 in
wenzelm@28162
    75
wenzelm@28162
    76
fun change_active b = (*requires SYNCHRONIZED*)
wenzelm@28166
    77
 (change active (fn n => if b then n + 1 else n - 1);
wenzelm@28166
    78
  Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ string_of_int (! active) ^ " active"));
wenzelm@28162
    79
wenzelm@28162
    80
end;
wenzelm@28162
    81
wenzelm@28156
    82
fun excessive_threads () = false;  (* FIXME *)
wenzelm@28156
    83
wenzelm@28166
    84
fun worker_next () = (*requires SYNCHRONIZED*)
wenzelm@28166
    85
  if excessive_threads () then
wenzelm@28166
    86
   (change_active false;
wenzelm@28166
    87
    change workers (filter_out (fn thread => Thread.equal (thread, Thread.self ())));
wenzelm@28166
    88
    NONE)
wenzelm@28166
    89
  else
wenzelm@28166
    90
    (case change_result queue (TaskQueue.dequeue (Thread.self ())) of
wenzelm@28166
    91
      NONE => (change_active false; wait (); change_active true; worker_next ())
wenzelm@28166
    92
    | some => some);
wenzelm@28156
    93
wenzelm@28156
    94
fun worker_loop () =
wenzelm@28166
    95
  (case SYNCHRONIZED worker_next of
wenzelm@28166
    96
    NONE => ()
wenzelm@28166
    97
  | SOME (task, run) =>
wenzelm@28166
    98
      let
wenzelm@28166
    99
        val _ = TaskQueue.set_thread_data (SOME task);
wenzelm@28166
   100
        val _ = run ();
wenzelm@28166
   101
        val _ = TaskQueue.set_thread_data NONE;
wenzelm@28166
   102
        val _ = SYNCHRONIZED (fn () => (change queue (TaskQueue.finished task); notify_all ()));
wenzelm@28166
   103
      in worker_loop () end);
wenzelm@28156
   104
wenzelm@28162
   105
fun worker_start () = SYNCHRONIZED (fn () =>
wenzelm@28156
   106
 (change_active true;
wenzelm@28162
   107
  change workers (cons (Thread.fork (worker_loop, Multithreading.no_interrupts)))));
wenzelm@28156
   108
wenzelm@28156
   109
wenzelm@28156
   110
(* scheduler *)
wenzelm@28156
   111
wenzelm@28162
   112
fun scheduler_loop () = (*requires SYNCHRONIZED*)
wenzelm@28156
   113
  let
wenzelm@28156
   114
    val m = Multithreading.max_threads_value ();
wenzelm@28156
   115
    val k = m - length (! workers);
wenzelm@28156
   116
    val _ = if k > 0 then funpow k worker_start () else ();
wenzelm@28166
   117
  in wait_timeout (Time.fromMilliseconds 300); scheduler_loop () end;
wenzelm@28156
   118
wenzelm@28156
   119
fun check_scheduler () = SYNCHRONIZED (fn () =>
wenzelm@28156
   120
  let
wenzelm@28156
   121
    val scheduler_active =
wenzelm@28156
   122
      (case ! scheduler of
wenzelm@28156
   123
        NONE => false
wenzelm@28156
   124
      | SOME t => Thread.isActive t);
wenzelm@28156
   125
  in
wenzelm@28156
   126
    if scheduler_active then ()
wenzelm@28162
   127
    else scheduler :=
wenzelm@28162
   128
      SOME (Thread.fork (SYNCHRONIZED o scheduler_loop, Multithreading.no_interrupts))
wenzelm@28156
   129
  end);
wenzelm@28156
   130
wenzelm@28156
   131
wenzelm@28156
   132
(* future values *)
wenzelm@28156
   133
wenzelm@28166
   134
fun future group deps (e: unit -> 'a) =
wenzelm@28156
   135
  let
wenzelm@28156
   136
    val _ = check_scheduler ();
wenzelm@28156
   137
wenzelm@28166
   138
    val result = ref (NONE: 'a Exn.result option);
wenzelm@28166
   139
    val run = Multithreading.with_attributes (Thread.getAttributes ())
wenzelm@28166
   140
      (fn _ => fn () => result := SOME (Exn.capture e ()));
wenzelm@28166
   141
    val task = SYNCHRONIZED (fn () =>
wenzelm@28166
   142
      change_result queue (TaskQueue.enqueue group deps run) before notify_all ());
wenzelm@28166
   143
  in Future {task = task, group = group, result = result} end;
wenzelm@28162
   144
wenzelm@28166
   145
fun fork e = future NONE [] e;
wenzelm@28162
   146
wenzelm@28166
   147
fun join (Future {result, ...}) =
wenzelm@28156
   148
  let
wenzelm@28156
   149
    val _ = check_scheduler ();
wenzelm@28166
   150
    fun loop () =
wenzelm@28166
   151
      (case ! result of
wenzelm@28166
   152
        NONE => (wait (); loop ())
wenzelm@28166
   153
      | SOME res => res);
wenzelm@28166
   154
  in Exn.release (SYNCHRONIZED loop) end;
wenzelm@28156
   155
wenzelm@28166
   156
wenzelm@28166
   157
(* interrupts *)
wenzelm@28166
   158
wenzelm@28166
   159
fun interrupt task = SYNCHRONIZED (fn () => TaskQueue.interrupt (! queue) task);
wenzelm@28166
   160
fun interrupt_group group = SYNCHRONIZED (fn () => TaskQueue.interrupt_group (! queue) group);
wenzelm@28156
   161
wenzelm@28156
   162
end;