src/Pure/Concurrent/future.ML
author wenzelm
Wed Apr 06 16:33:33 2016 +0200 (2016-04-06)
changeset 62889 99c7f31615c2
parent 62826 eb94e570c1a4
child 62923 3a122e1e352a
permissions -rw-r--r--
clarified modules;
tuned signature;
wenzelm@28156
     1
(*  Title:      Pure/Concurrent/future.ML
wenzelm@28156
     2
    Author:     Makarius
wenzelm@28156
     3
wenzelm@57350
     4
Value-oriented parallel execution via futures and promises.
wenzelm@28156
     5
*)
wenzelm@28156
     6
wenzelm@28156
     7
signature FUTURE =
wenzelm@28156
     8
sig
wenzelm@44300
     9
  type task = Task_Queue.task
wenzelm@44300
    10
  type group = Task_Queue.group
wenzelm@44300
    11
  val new_group: group option -> group
wenzelm@44300
    12
  val worker_task: unit -> task option
wenzelm@44300
    13
  val worker_group: unit -> group option
wenzelm@52603
    14
  val the_worker_group: unit -> group
wenzelm@44300
    15
  val worker_subgroup: unit -> group
wenzelm@28972
    16
  type 'a future
wenzelm@44300
    17
  val task_of: 'a future -> task
wenzelm@28972
    18
  val peek: 'a future -> 'a Exn.result option
wenzelm@28972
    19
  val is_finished: 'a future -> bool
wenzelm@50280
    20
  val ML_statistics: bool Unsynchronized.ref
wenzelm@44301
    21
  val interruptible_task: ('a -> 'b) -> 'a -> 'b
wenzelm@47404
    22
  val cancel_group: group -> unit
wenzelm@47404
    23
  val cancel: 'a future -> unit
wenzelm@56333
    24
  val error_message: Position.T -> (serial * string) * string option -> unit
wenzelm@50914
    25
  val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result
wenzelm@44427
    26
  type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
wenzelm@44427
    27
  val default_params: params
wenzelm@44427
    28
  val forks: params -> (unit -> 'a) list -> 'a future list
wenzelm@32724
    29
  val fork: (unit -> 'a) -> 'a future
wenzelm@28972
    30
  val join_results: 'a future list -> 'a Exn.result list
wenzelm@28972
    31
  val join_result: 'a future -> 'a Exn.result
wenzelm@44330
    32
  val joins: 'a future list -> 'a list
wenzelm@28972
    33
  val join: 'a future -> 'a
wenzelm@54369
    34
  val join_tasks: task list -> unit
wenzelm@54649
    35
  val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b
wenzelm@44294
    36
  val value_result: 'a Exn.result -> 'a future
wenzelm@34277
    37
  val value: 'a -> 'a future
wenzelm@44427
    38
  val cond_forks: params -> (unit -> 'a) list -> 'a future list
wenzelm@28972
    39
  val map: ('a -> 'b) -> 'a future -> 'b future
wenzelm@44300
    40
  val promise_group: group -> (unit -> unit) -> 'a future
wenzelm@44298
    41
  val promise: (unit -> unit) -> 'a future
wenzelm@34277
    42
  val fulfill_result: 'a future -> 'a Exn.result -> unit
wenzelm@34277
    43
  val fulfill: 'a future -> 'a -> unit
wenzelm@54369
    44
  val group_snapshot: group -> task list
wenzelm@28203
    45
  val shutdown: unit -> unit
wenzelm@28156
    46
end;
wenzelm@28156
    47
wenzelm@28156
    48
structure Future: FUTURE =
wenzelm@28156
    49
struct
wenzelm@28156
    50
wenzelm@28177
    51
(** future values **)
wenzelm@28177
    52
wenzelm@44300
    53
type task = Task_Queue.task;
wenzelm@44300
    54
type group = Task_Queue.group;
wenzelm@44300
    55
val new_group = Task_Queue.new_group;
wenzelm@44300
    56
wenzelm@44300
    57
wenzelm@28167
    58
(* identifiers *)
wenzelm@28167
    59
wenzelm@32058
    60
local
wenzelm@62889
    61
  val worker_task_var = Thread_Data.var () : task Thread_Data.var;
wenzelm@32058
    62
in
wenzelm@62889
    63
  fun worker_task () = Thread_Data.get worker_task_var;
wenzelm@62889
    64
  fun setmp_worker_task task f x = Thread_Data.setmp worker_task_var (SOME task) f x;
wenzelm@28167
    65
end;
wenzelm@28167
    66
wenzelm@41683
    67
val worker_group = Option.map Task_Queue.group_of_task o worker_task;
wenzelm@52603
    68
wenzelm@52603
    69
fun the_worker_group () =
wenzelm@52603
    70
  (case worker_group () of
wenzelm@52603
    71
    SOME group => group
wenzelm@52603
    72
  | NONE => raise Fail "Missing worker thread context");
wenzelm@52603
    73
wenzelm@44300
    74
fun worker_subgroup () = new_group (worker_group ());
wenzelm@34277
    75
wenzelm@41679
    76
fun worker_joining e =
wenzelm@41679
    77
  (case worker_task () of
wenzelm@41679
    78
    NONE => e ()
wenzelm@41679
    79
  | SOME task => Task_Queue.joining task e);
wenzelm@41679
    80
wenzelm@41680
    81
fun worker_waiting deps e =
wenzelm@41670
    82
  (case worker_task () of
wenzelm@41670
    83
    NONE => e ()
wenzelm@41680
    84
  | SOME task => Task_Queue.waiting task deps e);
wenzelm@41670
    85
wenzelm@28167
    86
wenzelm@28167
    87
(* datatype future *)
wenzelm@28167
    88
wenzelm@35016
    89
type 'a result = 'a Exn.result Single_Assignment.var;
wenzelm@35016
    90
wenzelm@28972
    91
datatype 'a future = Future of
wenzelm@34277
    92
 {promised: bool,
wenzelm@44300
    93
  task: task,
wenzelm@35016
    94
  result: 'a result};
wenzelm@28167
    95
wenzelm@28167
    96
fun task_of (Future {task, ...}) = task;
wenzelm@32253
    97
fun result_of (Future {result, ...}) = result;
wenzelm@28167
    98
wenzelm@35016
    99
fun peek x = Single_Assignment.peek (result_of x);
wenzelm@28558
   100
fun is_finished x = is_some (peek x);
wenzelm@28320
   101
wenzelm@62663
   102
val _ =
wenzelm@62819
   103
  ML_system_pp (fn depth => fn pretty => fn x =>
wenzelm@62663
   104
    (case peek x of
wenzelm@62823
   105
      NONE => PolyML_Pretty.PrettyString "<future>"
wenzelm@62823
   106
    | SOME (Exn.Exn _) => PolyML_Pretty.PrettyString "<failed>"
wenzelm@62663
   107
    | SOME (Exn.Res y) => pretty (y, depth)));
wenzelm@62663
   108
wenzelm@28167
   109
wenzelm@28177
   110
wenzelm@28177
   111
(** scheduling **)
wenzelm@28177
   112
wenzelm@28177
   113
(* synchronization *)
wenzelm@28156
   114
wenzelm@32219
   115
val scheduler_event = ConditionVar.conditionVar ();
wenzelm@32219
   116
val work_available = ConditionVar.conditionVar ();
wenzelm@32219
   117
val work_finished = ConditionVar.conditionVar ();
wenzelm@32219
   118
wenzelm@28156
   119
local
wenzelm@28156
   120
  val lock = Mutex.mutex ();
wenzelm@28156
   121
in
wenzelm@28156
   122
wenzelm@59054
   123
fun SYNCHRONIZED name = Multithreading.synchronized name lock;
wenzelm@28156
   124
wenzelm@32219
   125
fun wait cond = (*requires SYNCHRONIZED*)
wenzelm@32295
   126
  Multithreading.sync_wait NONE NONE cond lock;
wenzelm@28206
   127
wenzelm@32295
   128
fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
wenzelm@62826
   129
  Multithreading.sync_wait NONE (SOME (Time.now () + timeout)) cond lock;
wenzelm@28166
   130
wenzelm@32219
   131
fun signal cond = (*requires SYNCHRONIZED*)
wenzelm@32219
   132
  ConditionVar.signal cond;
wenzelm@32219
   133
wenzelm@32219
   134
fun broadcast cond = (*requires SYNCHRONIZED*)
wenzelm@28166
   135
  ConditionVar.broadcast cond;
wenzelm@28156
   136
wenzelm@28156
   137
end;
wenzelm@28156
   138
wenzelm@28156
   139
wenzelm@33410
   140
(* global state *)
wenzelm@33410
   141
wenzelm@33410
   142
val queue = Unsynchronized.ref Task_Queue.empty;
wenzelm@33410
   143
val next = Unsynchronized.ref 0;
wenzelm@33410
   144
val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
wenzelm@44300
   145
val canceled = Unsynchronized.ref ([]: group list);
wenzelm@33410
   146
val do_shutdown = Unsynchronized.ref false;
wenzelm@33410
   147
val max_workers = Unsynchronized.ref 0;
wenzelm@33410
   148
val max_active = Unsynchronized.ref 0;
wenzelm@33410
   149
wenzelm@50280
   150
val status_ticks = Unsynchronized.ref 0;
wenzelm@50280
   151
val last_round = Unsynchronized.ref Time.zeroTime;
wenzelm@50280
   152
val next_round = seconds 0.05;
wenzelm@50280
   153
wenzelm@33410
   154
datatype worker_state = Working | Waiting | Sleeping;
wenzelm@33410
   155
val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
wenzelm@33410
   156
wenzelm@33410
   157
fun count_workers state = (*requires SYNCHRONIZED*)
wenzelm@33410
   158
  fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
wenzelm@33410
   159
wenzelm@33410
   160
wenzelm@50280
   161
wenzelm@50280
   162
(* status *)
wenzelm@50280
   163
wenzelm@50280
   164
val ML_statistics = Unsynchronized.ref false;
wenzelm@50280
   165
wenzelm@50280
   166
fun report_status () = (*requires SYNCHRONIZED*)
wenzelm@50280
   167
  if ! ML_statistics then
wenzelm@50280
   168
    let
wenzelm@60610
   169
      val {ready, pending, running, passive, urgent} = Task_Queue.status (! queue);
wenzelm@50280
   170
      val total = length (! workers);
wenzelm@50280
   171
      val active = count_workers Working;
wenzelm@50280
   172
      val waiting = count_workers Waiting;
wenzelm@50280
   173
      val stats =
wenzelm@51990
   174
       [("now", Markup.print_real (Time.toReal (Time.now ()))),
wenzelm@50280
   175
        ("tasks_ready", Markup.print_int ready),
wenzelm@50280
   176
        ("tasks_pending", Markup.print_int pending),
wenzelm@50280
   177
        ("tasks_running", Markup.print_int running),
wenzelm@50280
   178
        ("tasks_passive", Markup.print_int passive),
wenzelm@60610
   179
        ("tasks_urgent", Markup.print_int urgent),
wenzelm@50280
   180
        ("workers_total", Markup.print_int total),
wenzelm@50280
   181
        ("workers_active", Markup.print_int active),
wenzelm@50280
   182
        ("workers_waiting", Markup.print_int waiting)] @
wenzelm@50280
   183
        ML_Statistics.get ();
wenzelm@56333
   184
    in Output.try_protocol_message (Markup.ML_statistics :: stats) [] end
wenzelm@50280
   185
  else ();
wenzelm@50280
   186
wenzelm@50280
   187
wenzelm@44110
   188
(* cancellation primitives *)
wenzelm@32099
   189
wenzelm@34279
   190
fun cancel_now group = (*requires SYNCHRONIZED*)
wenzelm@44341
   191
  let
wenzelm@47404
   192
    val running = Task_Queue.cancel (! queue) group;
wenzelm@49894
   193
    val _ = running |> List.app (fn thread =>
wenzelm@61556
   194
      if Standard_Thread.is_self thread then ()
wenzelm@61556
   195
      else Standard_Thread.interrupt_unsynchronized thread);
wenzelm@47404
   196
  in running end;
wenzelm@44341
   197
wenzelm@44341
   198
fun cancel_all () = (*requires SYNCHRONIZED*)
wenzelm@44341
   199
  let
wenzelm@44341
   200
    val (groups, threads) = Task_Queue.cancel_all (! queue);
wenzelm@61556
   201
    val _ = List.app Standard_Thread.interrupt_unsynchronized threads;
wenzelm@44341
   202
  in groups end;
wenzelm@34279
   203
wenzelm@34279
   204
fun cancel_later group = (*requires SYNCHRONIZED*)
wenzelm@32738
   205
 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
wenzelm@32738
   206
  broadcast scheduler_event);
wenzelm@29341
   207
wenzelm@44301
   208
fun interruptible_task f x =
wenzelm@62359
   209
  Multithreading.with_attributes
wenzelm@62359
   210
    (if is_some (worker_task ())
wenzelm@62359
   211
     then Multithreading.private_interrupts
wenzelm@62359
   212
     else Multithreading.public_interrupts)
wenzelm@62359
   213
    (fn _ => f x)
wenzelm@44301
   214
  before Multithreading.interrupted ();
wenzelm@44301
   215
wenzelm@44301
   216
wenzelm@44110
   217
(* worker threads *)
wenzelm@44110
   218
wenzelm@44110
   219
fun worker_exec (task, jobs) =
wenzelm@28167
   220
  let
wenzelm@41683
   221
    val group = Task_Queue.group_of_task task;
wenzelm@32102
   222
    val valid = not (Task_Queue.is_canceled group);
wenzelm@41670
   223
    val ok =
wenzelm@41670
   224
      Task_Queue.running task (fn () =>
wenzelm@41683
   225
        setmp_worker_task task (fn () =>
wenzelm@41670
   226
          fold (fn job => fn ok => job valid andalso ok) jobs true) ());
wenzelm@50975
   227
    val _ =
wenzelm@50975
   228
      if ! Multithreading.trace >= 2 then
wenzelm@56333
   229
        Output.try_protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) []
wenzelm@50975
   230
      else ();
wenzelm@32246
   231
    val _ = SYNCHRONIZED "finish" (fn () =>
wenzelm@32219
   232
      let
wenzelm@32738
   233
        val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
wenzelm@44295
   234
        val test = Exn.capture Multithreading.interrupted ();
wenzelm@32219
   235
        val _ =
wenzelm@44295
   236
          if ok andalso not (Exn.is_interrupt_exn test) then ()
wenzelm@44299
   237
          else if null (cancel_now group) then ()
wenzelm@34279
   238
          else cancel_later group;
wenzelm@32219
   239
        val _ = broadcast work_finished;
wenzelm@33413
   240
        val _ = if maximal then () else signal work_available;
wenzelm@32219
   241
      in () end);
wenzelm@28167
   242
  in () end;
wenzelm@28167
   243
wenzelm@59465
   244
fun worker_wait worker_state cond = (*requires SYNCHRONIZED*)
wenzelm@52558
   245
  (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
wenzelm@59465
   246
    SOME state => Unsynchronized.setmp state worker_state wait cond
wenzelm@59465
   247
  | NONE => wait cond);
wenzelm@28162
   248
wenzelm@33415
   249
fun worker_next () = (*requires SYNCHRONIZED*)
wenzelm@33406
   250
  if length (! workers) > ! max_workers then
wenzelm@33406
   251
    (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
wenzelm@33415
   252
     signal work_available;
wenzelm@28167
   253
     NONE)
wenzelm@28166
   254
  else
wenzelm@60610
   255
    let val urgent_only = count_workers Working > ! max_active in
wenzelm@60610
   256
      (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ()) urgent_only) of
wenzelm@60610
   257
        NONE => (worker_wait Sleeping work_available; worker_next ())
wenzelm@60610
   258
      | some => (signal work_available; some))
wenzelm@60610
   259
    end;
wenzelm@28156
   260
wenzelm@28167
   261
fun worker_loop name =
wenzelm@33415
   262
  (case SYNCHRONIZED name (fn () => worker_next ()) of
wenzelm@29119
   263
    NONE => ()
wenzelm@44295
   264
  | SOME work => (worker_exec work; worker_loop name));
wenzelm@28156
   265
wenzelm@33407
   266
fun worker_start name = (*requires SYNCHRONIZED*)
wenzelm@59468
   267
  let
wenzelm@59468
   268
    val threads_stack_limit =
wenzelm@59468
   269
      Real.floor (Options.default_real "threads_stack_limit" * 1024.0 * 1024.0 * 1024.0);
wenzelm@59468
   270
    val stack_limit = if threads_stack_limit <= 0 then NONE else SOME threads_stack_limit;
wenzelm@59468
   271
    val worker =
wenzelm@61556
   272
      Standard_Thread.fork {name = "worker", stack_limit = stack_limit, interrupts = false}
wenzelm@59468
   273
        (fn () => worker_loop name);
wenzelm@59468
   274
  in Unsynchronized.change workers (cons (worker, Unsynchronized.ref Working)) end
wenzelm@59338
   275
  handle Fail msg => Multithreading.tracing 0 (fn () => "SCHEDULER: " ^ msg);
wenzelm@59330
   276
wenzelm@28156
   277
wenzelm@28156
   278
(* scheduler *)
wenzelm@28156
   279
wenzelm@28206
   280
fun scheduler_next () = (*requires SYNCHRONIZED*)
wenzelm@28156
   281
  let
wenzelm@33407
   282
    val now = Time.now ();
wenzelm@62826
   283
    val tick = ! last_round + next_round <= now;
wenzelm@33407
   284
    val _ = if tick then last_round := now else ();
wenzelm@33407
   285
wenzelm@33415
   286
wenzelm@50280
   287
    (* runtime status *)
wenzelm@33415
   288
wenzelm@32226
   289
    val _ =
wenzelm@51046
   290
      if tick then Unsynchronized.change status_ticks (fn i => i + 1) else ();
wenzelm@33407
   291
    val _ =
wenzelm@51046
   292
      if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0
wenzelm@51046
   293
      then report_status () else ();
wenzelm@32053
   294
wenzelm@28191
   295
    val _ =
wenzelm@59338
   296
      if not tick orelse forall (Thread.isActive o #1) (! workers) then ()
wenzelm@32095
   297
      else
wenzelm@33409
   298
        let
wenzelm@37682
   299
          val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
wenzelm@33409
   300
          val _ = workers := alive;
wenzelm@33409
   301
        in
wenzelm@33409
   302
          Multithreading.tracing 0 (fn () =>
wenzelm@51279
   303
            "SCHEDULER: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
wenzelm@33409
   304
        end;
wenzelm@28191
   305
wenzelm@33415
   306
wenzelm@33415
   307
    (* worker pool adjustments *)
wenzelm@33415
   308
wenzelm@33415
   309
    val max_active0 = ! max_active;
wenzelm@33415
   310
    val max_workers0 = ! max_workers;
wenzelm@33415
   311
wenzelm@59340
   312
    val m =
wenzelm@59340
   313
      if ! do_shutdown andalso Task_Queue.all_passive (! queue) then 0
wenzelm@59340
   314
      else Multithreading.max_threads_value ();
wenzelm@33406
   315
    val _ = max_active := m;
wenzelm@59338
   316
    val _ = max_workers := 2 * m;
wenzelm@33406
   317
wenzelm@33407
   318
    val missing = ! max_workers - length (! workers);
wenzelm@28203
   319
    val _ =
wenzelm@33407
   320
      if missing > 0 then
wenzelm@33415
   321
        funpow missing (fn () =>
wenzelm@33415
   322
          ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
wenzelm@28203
   323
      else ();
wenzelm@28206
   324
wenzelm@33415
   325
    val _ =
wenzelm@33415
   326
      if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
wenzelm@33415
   327
      else signal work_available;
wenzelm@33415
   328
wenzelm@33415
   329
wenzelm@33415
   330
    (* canceled groups *)
wenzelm@33415
   331
wenzelm@32225
   332
    val _ =
wenzelm@32225
   333
      if null (! canceled) then ()
wenzelm@32293
   334
      else
wenzelm@32293
   335
       (Multithreading.tracing 1 (fn () =>
wenzelm@32293
   336
          string_of_int (length (! canceled)) ^ " canceled groups");
wenzelm@44299
   337
        Unsynchronized.change canceled (filter_out (null o cancel_now));
wenzelm@51281
   338
        signal work_available);
wenzelm@28206
   339
wenzelm@33415
   340
wenzelm@33415
   341
    (* delay loop *)
wenzelm@33415
   342
wenzelm@32295
   343
    val _ = Exn.release (wait_timeout next_round scheduler_event);
wenzelm@28167
   344
wenzelm@33415
   345
wenzelm@33415
   346
    (* shutdown *)
wenzelm@33415
   347
wenzelm@32219
   348
    val continue = not (! do_shutdown andalso null (! workers));
wenzelm@50429
   349
    val _ = if continue then () else (report_status (); scheduler := NONE);
wenzelm@33415
   350
wenzelm@32219
   351
    val _ = broadcast scheduler_event;
wenzelm@32295
   352
  in continue end
wenzelm@39232
   353
  handle exn =>
wenzelm@39232
   354
    if Exn.is_interrupt exn then
wenzelm@51279
   355
     (Multithreading.tracing 1 (fn () => "SCHEDULER: Interrupt");
wenzelm@44341
   356
      List.app cancel_later (cancel_all ());
wenzelm@51281
   357
      signal work_available; true)
wenzelm@62505
   358
    else Exn.reraise exn;
wenzelm@32295
   359
wenzelm@28206
   360
fun scheduler_loop () =
wenzelm@44173
   361
 (while
wenzelm@33416
   362
    Multithreading.with_attributes
wenzelm@33416
   363
      (Multithreading.sync_interrupts Multithreading.public_interrupts)
wenzelm@33416
   364
      (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
wenzelm@50429
   365
  do (); last_round := Time.zeroTime);
wenzelm@28156
   366
wenzelm@28203
   367
fun scheduler_active () = (*requires SYNCHRONIZED*)
wenzelm@28203
   368
  (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
wenzelm@28203
   369
wenzelm@32228
   370
fun scheduler_check () = (*requires SYNCHRONIZED*)
wenzelm@32228
   371
 (do_shutdown := false;
wenzelm@32248
   372
  if scheduler_active () then ()
wenzelm@59468
   373
  else
wenzelm@60764
   374
    scheduler :=
wenzelm@61556
   375
      SOME (Standard_Thread.fork {name = "scheduler", stack_limit = NONE, interrupts = false}
wenzelm@60764
   376
        scheduler_loop));
wenzelm@28156
   377
wenzelm@44301
   378
wenzelm@44301
   379
wenzelm@44301
   380
(** futures **)
wenzelm@44301
   381
wenzelm@44301
   382
(* cancel *)
wenzelm@44301
   383
wenzelm@49906
   384
fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)
wenzelm@44299
   385
  let
wenzelm@47421
   386
    val _ = if null (cancel_now group) then () else cancel_later group;
wenzelm@47421
   387
    val _ = signal work_available;
wenzelm@47421
   388
    val _ = scheduler_check ();
wenzelm@49906
   389
  in () end;
wenzelm@49906
   390
wenzelm@49906
   391
fun cancel_group group =
wenzelm@49906
   392
  SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);
wenzelm@44299
   393
wenzelm@44301
   394
fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
wenzelm@29366
   395
wenzelm@28156
   396
wenzelm@50914
   397
(* results *)
wenzelm@50914
   398
wenzelm@56333
   399
fun error_message pos ((serial, msg), exec_id) =
wenzelm@50916
   400
  Position.setmp_thread_data pos (fn () =>
wenzelm@50931
   401
    let val id = Position.get_id pos in
wenzelm@50931
   402
      if is_none id orelse is_none exec_id orelse id = exec_id
wenzelm@54387
   403
      then Output.error_message' (serial, msg) else ()
wenzelm@50931
   404
    end) ();
wenzelm@44110
   405
wenzelm@50914
   406
fun identify_result pos res =
wenzelm@61077
   407
  res |> Exn.map_exn (fn exn =>
wenzelm@61077
   408
    let val exec_id =
wenzelm@61077
   409
      (case Position.get_id pos of
wenzelm@61077
   410
        NONE => []
wenzelm@61077
   411
      | SOME id => [(Markup.exec_idN, id)])
wenzelm@61077
   412
    in Par_Exn.identify exec_id exn end);
wenzelm@50914
   413
wenzelm@50914
   414
fun assign_result group result res =
wenzelm@44110
   415
  let
wenzelm@44110
   416
    val _ = Single_Assignment.assign result res
wenzelm@44110
   417
      handle exn as Fail _ =>
wenzelm@44110
   418
        (case Single_Assignment.peek result of
wenzelm@62505
   419
          SOME (Exn.Exn e) => Exn.reraise (if Exn.is_interrupt e then e else exn)
wenzelm@62505
   420
        | _ => Exn.reraise exn);
wenzelm@44110
   421
    val ok =
wenzelm@44110
   422
      (case the (Single_Assignment.peek result) of
wenzelm@44111
   423
        Exn.Exn exn =>
wenzelm@44111
   424
          (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
wenzelm@44110
   425
      | Exn.Res _ => true);
wenzelm@44110
   426
  in ok end;
wenzelm@44110
   427
wenzelm@50914
   428
wenzelm@50914
   429
(* future jobs *)
wenzelm@50914
   430
wenzelm@54649
   431
fun future_job group atts (e: unit -> 'a) =
wenzelm@44110
   432
  let
wenzelm@44110
   433
    val result = Single_Assignment.var "future" : 'a result;
wenzelm@44110
   434
    val pos = Position.thread_data ();
wenzelm@62889
   435
    val context = Context.get_generic_context ();
wenzelm@44110
   436
    fun job ok =
wenzelm@44110
   437
      let
wenzelm@44110
   438
        val res =
wenzelm@44110
   439
          if ok then
wenzelm@44110
   440
            Exn.capture (fn () =>
wenzelm@60911
   441
              Multithreading.with_attributes atts (fn _ =>
wenzelm@62889
   442
                (Position.setmp_thread_data pos o Context.setmp_generic_context context) e ())) ()
wenzelm@44110
   443
          else Exn.interrupt_exn;
wenzelm@50914
   444
      in assign_result group result (identify_result pos res) end;
wenzelm@44110
   445
  in (result, job) end;
wenzelm@44110
   446
wenzelm@44110
   447
wenzelm@29366
   448
(* fork *)
wenzelm@29366
   449
wenzelm@44427
   450
type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
wenzelm@44427
   451
val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
wenzelm@44113
   452
wenzelm@44427
   453
fun forks ({name, group, deps, pri, interrupts}: params) es =
wenzelm@41674
   454
  if null es then []
wenzelm@41674
   455
  else
wenzelm@41674
   456
    let
wenzelm@41674
   457
      val grp =
wenzelm@41674
   458
        (case group of
wenzelm@41674
   459
          NONE => worker_subgroup ()
wenzelm@41674
   460
        | SOME grp => grp);
wenzelm@41708
   461
      fun enqueue e queue =
wenzelm@41674
   462
        let
wenzelm@54649
   463
          val atts =
wenzelm@54649
   464
            if interrupts
wenzelm@54649
   465
            then Multithreading.private_interrupts
wenzelm@54649
   466
            else Multithreading.no_interrupts;
wenzelm@54649
   467
          val (result, job) = future_job grp atts e;
wenzelm@41708
   468
          val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
wenzelm@41683
   469
          val future = Future {promised = false, task = task, result = result};
wenzelm@41708
   470
        in (future, queue') end;
wenzelm@41674
   471
    in
wenzelm@41674
   472
      SYNCHRONIZED "enqueue" (fn () =>
wenzelm@41674
   473
        let
wenzelm@41708
   474
          val (futures, queue') = fold_map enqueue es (! queue);
wenzelm@41708
   475
          val _ = queue := queue';
wenzelm@41708
   476
          val minimal = forall (not o Task_Queue.known_task queue') deps;
wenzelm@41674
   477
          val _ = if minimal then signal work_available else ();
wenzelm@41674
   478
          val _ = scheduler_check ();
wenzelm@41674
   479
        in futures end)
wenzelm@41674
   480
    end;
wenzelm@28162
   481
wenzelm@50983
   482
fun fork e =
wenzelm@50983
   483
  (singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e;
wenzelm@28186
   484
wenzelm@28186
   485
wenzelm@29366
   486
(* join *)
wenzelm@29366
   487
wenzelm@32099
   488
fun get_result x =
wenzelm@32099
   489
  (case peek x of
wenzelm@37852
   490
    NONE => Exn.Exn (Fail "Unfinished future")
wenzelm@39232
   491
  | SOME res =>
wenzelm@39232
   492
      if Exn.is_interrupt_exn res then
wenzelm@44247
   493
        (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
wenzelm@53190
   494
          [] => res
wenzelm@53190
   495
        | exns => Exn.Exn (Par_Exn.make exns))
wenzelm@39232
   496
      else res);
wenzelm@28186
   497
wenzelm@49935
   498
local
wenzelm@49935
   499
wenzelm@59465
   500
fun join_next atts deps = (*requires SYNCHRONIZED*)
wenzelm@41695
   501
  if null deps then NONE
wenzelm@32224
   502
  else
wenzelm@41681
   503
    (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
wenzelm@41695
   504
      (NONE, []) => NONE
wenzelm@41695
   505
    | (NONE, deps') =>
wenzelm@59465
   506
       (worker_waiting deps' (fn () =>
wenzelm@59465
   507
          Multithreading.with_attributes atts (fn _ =>
wenzelm@59465
   508
            Exn.release (worker_wait Waiting work_finished)));
wenzelm@59465
   509
        join_next atts deps')
wenzelm@32224
   510
    | (SOME work, deps') => SOME (work, deps'));
wenzelm@32095
   511
wenzelm@59465
   512
fun join_loop atts deps =
wenzelm@59465
   513
  (case SYNCHRONIZED "join" (fn () => join_next atts deps) of
wenzelm@59465
   514
    NONE => ()
wenzelm@59465
   515
  | SOME (work, deps') => (worker_joining (fn () => worker_exec work); join_loop atts deps'));
wenzelm@32814
   516
wenzelm@29551
   517
in
wenzelm@29551
   518
wenzelm@29366
   519
fun join_results xs =
wenzelm@41679
   520
  let
wenzelm@41679
   521
    val _ =
wenzelm@41679
   522
      if forall is_finished xs then ()
wenzelm@59465
   523
      else if is_some (worker_task ()) then
wenzelm@59465
   524
        Multithreading.with_attributes Multithreading.no_interrupts
wenzelm@59465
   525
          (fn orig_atts => join_loop orig_atts (map task_of xs))
wenzelm@41681
   526
      else List.app (ignore o Single_Assignment.await o result_of) xs;
wenzelm@41679
   527
  in map get_result xs end;
wenzelm@28186
   528
wenzelm@29551
   529
end;
wenzelm@29551
   530
wenzelm@28647
   531
fun join_result x = singleton join_results x;
wenzelm@44330
   532
fun joins xs = Par_Exn.release_all (join_results xs);
wenzelm@28647
   533
fun join x = Exn.release (join_result x);
wenzelm@28156
   534
wenzelm@54369
   535
fun join_tasks tasks =
wenzelm@54369
   536
  if null tasks then ()
wenzelm@54369
   537
  else
wenzelm@54369
   538
    (singleton o forks)
wenzelm@54369
   539
      {name = "join_tasks", group = SOME (new_group NONE),
wenzelm@54369
   540
        deps = tasks, pri = 0, interrupts = false} I
wenzelm@54369
   541
    |> join;
wenzelm@54369
   542
wenzelm@29366
   543
wenzelm@54649
   544
(* task context for running thread *)
wenzelm@54649
   545
wenzelm@54649
   546
fun task_context name group f x =
wenzelm@54649
   547
  Multithreading.with_attributes Multithreading.no_interrupts (fn orig_atts =>
wenzelm@54649
   548
    let
wenzelm@54649
   549
      val (result, job) = future_job group orig_atts (fn () => f x);
wenzelm@54649
   550
      val task =
wenzelm@54649
   551
        SYNCHRONIZED "enroll" (fn () =>
wenzelm@54649
   552
          Unsynchronized.change_result queue (Task_Queue.enroll (Thread.self ()) name group));
wenzelm@54649
   553
      val _ = worker_exec (task, [job]);
wenzelm@54649
   554
    in
wenzelm@54649
   555
      (case Single_Assignment.peek result of
wenzelm@54649
   556
        NONE => raise Fail "Missing task context result"
wenzelm@54649
   557
      | SOME res => Exn.release res)
wenzelm@54649
   558
    end);
wenzelm@54649
   559
wenzelm@54649
   560
wenzelm@51325
   561
(* fast-path operations -- bypass task queue if possible *)
wenzelm@34277
   562
wenzelm@44294
   563
fun value_result (res: 'a Exn.result) =
wenzelm@34277
   564
  let
wenzelm@45136
   565
    val task = Task_Queue.dummy_task;
wenzelm@41683
   566
    val group = Task_Queue.group_of_task task;
wenzelm@35016
   567
    val result = Single_Assignment.var "value" : 'a result;
wenzelm@50914
   568
    val _ = assign_result group result (identify_result (Position.thread_data ()) res);
wenzelm@41683
   569
  in Future {promised = false, task = task, result = result} end;
wenzelm@29366
   570
wenzelm@44294
   571
fun value x = value_result (Exn.Res x);
wenzelm@44294
   572
wenzelm@44330
   573
fun cond_forks args es =
wenzelm@44330
   574
  if Multithreading.enabled () then forks args es
wenzelm@44330
   575
  else map (fn e => value_result (Exn.interruptible_capture e ())) es;
wenzelm@44330
   576
wenzelm@29384
   577
fun map_future f x =
wenzelm@51325
   578
  if is_finished x then value_result (Exn.interruptible_capture (f o join) x)
wenzelm@49935
   579
  else
wenzelm@49935
   580
    let
wenzelm@49935
   581
      val task = task_of x;
wenzelm@49935
   582
      val group = Task_Queue.group_of_task task;
wenzelm@54649
   583
      val (result, job) =
wenzelm@54649
   584
        future_job group Multithreading.private_interrupts (fn () => f (join x));
wenzelm@29384
   585
wenzelm@49935
   586
      val extended = SYNCHRONIZED "extend" (fn () =>
wenzelm@49935
   587
        (case Task_Queue.extend task job (! queue) of
wenzelm@49935
   588
          SOME queue' => (queue := queue'; true)
wenzelm@49935
   589
        | NONE => false));
wenzelm@49935
   590
    in
wenzelm@49935
   591
      if extended then Future {promised = false, task = task, result = result}
wenzelm@49935
   592
      else
wenzelm@49935
   593
        (singleton o cond_forks)
wenzelm@49935
   594
          {name = "map_future", group = SOME group, deps = [task],
wenzelm@49935
   595
            pri = Task_Queue.pri_of_task task, interrupts = true}
wenzelm@49935
   596
          (fn () => f (join x))
wenzelm@49935
   597
    end;
wenzelm@28979
   598
wenzelm@28191
   599
wenzelm@34277
   600
(* promised futures -- fulfilled by external means *)
wenzelm@34277
   601
wenzelm@44298
   602
fun promise_group group abort : 'a future =
wenzelm@34277
   603
  let
wenzelm@35016
   604
    val result = Single_Assignment.var "promise" : 'a result;
wenzelm@44298
   605
    fun assign () = assign_result group result Exn.interrupt_exn
wenzelm@39243
   606
      handle Fail _ => true
wenzelm@39243
   607
        | exn =>
wenzelm@44298
   608
            if Exn.is_interrupt exn
wenzelm@44298
   609
            then raise Fail "Concurrent attempt to fulfill promise"
wenzelm@62505
   610
            else Exn.reraise exn;
wenzelm@44298
   611
    fun job () =
wenzelm@44298
   612
      Multithreading.with_attributes Multithreading.no_interrupts
wenzelm@47423
   613
        (fn _ => Exn.release (Exn.capture assign () before abort ()));
wenzelm@37854
   614
    val task = SYNCHRONIZED "enqueue_passive" (fn () =>
wenzelm@44298
   615
      Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
wenzelm@41683
   616
  in Future {promised = true, task = task, result = result} end;
wenzelm@34277
   617
wenzelm@44298
   618
fun promise abort = promise_group (worker_subgroup ()) abort;
wenzelm@34277
   619
wenzelm@41683
   620
fun fulfill_result (Future {promised, task, result}) res =
wenzelm@39243
   621
  if not promised then raise Fail "Not a promised future"
wenzelm@39243
   622
  else
wenzelm@39243
   623
    let
wenzelm@41683
   624
      val group = Task_Queue.group_of_task task;
wenzelm@50914
   625
      val pos = Position.thread_data ();
wenzelm@50914
   626
      fun job ok =
wenzelm@50914
   627
        assign_result group result (if ok then identify_result pos res else Exn.interrupt_exn);
wenzelm@39243
   628
      val _ =
wenzelm@39243
   629
        Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
wenzelm@39243
   630
          let
wenzelm@47423
   631
            val passive_job =
wenzelm@39243
   632
              SYNCHRONIZED "fulfill_result" (fn () =>
wenzelm@39243
   633
                Unsynchronized.change_result queue
wenzelm@39243
   634
                  (Task_Queue.dequeue_passive (Thread.self ()) task));
wenzelm@47423
   635
          in
wenzelm@47423
   636
            (case passive_job of
wenzelm@47423
   637
              SOME true => worker_exec (task, [job])
wenzelm@47423
   638
            | SOME false => ()
wenzelm@47423
   639
            | NONE => ignore (job (not (Task_Queue.is_canceled group))))
wenzelm@47423
   640
          end);
wenzelm@41681
   641
      val _ =
wenzelm@41695
   642
        if is_some (Single_Assignment.peek result) then ()
wenzelm@41695
   643
        else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
wenzelm@39243
   644
    in () end;
wenzelm@34277
   645
wenzelm@43761
   646
fun fulfill x res = fulfill_result x (Exn.Res res);
wenzelm@34277
   647
wenzelm@34277
   648
wenzelm@54369
   649
(* group snapshot *)
wenzelm@54369
   650
wenzelm@54369
   651
fun group_snapshot group =
wenzelm@54369
   652
  SYNCHRONIZED "group_snapshot" (fn () =>
wenzelm@54369
   653
    Task_Queue.group_tasks (! queue) group);
wenzelm@54369
   654
wenzelm@54369
   655
wenzelm@32228
   656
(* shutdown *)
wenzelm@29366
   657
wenzelm@28203
   658
fun shutdown () =
wenzelm@62359
   659
  if is_some (worker_task ()) then
wenzelm@51283
   660
    raise Fail "Cannot shutdown while running as worker thread"
wenzelm@51283
   661
  else
wenzelm@28276
   662
    SYNCHRONIZED "shutdown" (fn () =>
wenzelm@51279
   663
      while scheduler_active () do
wenzelm@59340
   664
       (do_shutdown := true;
wenzelm@59340
   665
        Multithreading.tracing 1 (fn () => "SHUTDOWN: wait");
wenzelm@51283
   666
        wait scheduler_event));
wenzelm@28203
   667
wenzelm@29366
   668
wenzelm@29366
   669
(*final declarations of this structure!*)
wenzelm@29366
   670
val map = map_future;
wenzelm@29366
   671
wenzelm@28156
   672
end;
wenzelm@28972
   673
wenzelm@28972
   674
type 'a future = 'a Future.future;