src/Pure/Concurrent/future.ML
author wenzelm
Wed Apr 11 12:15:56 2012 +0200 (2012-04-11)
changeset 47421 9624408d8827
parent 47404 e6e5750f1311
child 47423 8a179a0493e3
permissions -rw-r--r--
always signal after cancel_group: passive tasks may have become active;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Value-oriented parallelism via futures and promises.  See also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
     7 
     8 Notes:
     9 
    10   * Futures are similar to delayed evaluation, i.e. delay/force is
    11     generalized to fork/join.  The idea is to model parallel
    12     value-oriented computations (not communicating processes).
    13 
    14   * Forked futures are evaluated spontaneously by a farm of worker
    15     threads in the background; join resynchronizes the computation and
    16     delivers results (values or exceptions).
    17 
    18   * The pool of worker threads is limited, usually in correlation with
    19     the number of physical cores on the machine.  Note that allocation
    20     of runtime resources may be distorted either if workers yield CPU
    21     time (e.g. via system sleep or wait operations), or if non-worker
    22     threads contend for significant runtime resources independently.
    23     There is a limited number of replacement worker threads that get
    24     activated in certain explicit wait conditions.
    25 
    26   * Future tasks are organized in groups, which are block-structured.
    27     When forking a new new task, the default is to open an individual
    28     subgroup, unless some common group is specified explicitly.
    29     Failure of one group member causes the immediate peers to be
    30     interrupted eventually (i.e. none by default).  Interrupted tasks
    31     that lack regular result information, will pick up parallel
    32     exceptions from the cumulative group context (as Par_Exn).
    33 
    34   * Future task groups may be canceled: present and future group
    35     members will be interrupted eventually.
    36 
    37   * Promised "passive" futures are fulfilled by external means.  There
    38     is no associated evaluation task, but other futures can depend on
    39     them via regular join operations.
    40 *)
    41 
    42 signature FUTURE =
    43 sig
    44   type task = Task_Queue.task
    45   type group = Task_Queue.group
    46   val new_group: group option -> group
    47   val worker_task: unit -> task option
    48   val worker_group: unit -> group option
    49   val worker_subgroup: unit -> group
    50   type 'a future
    51   val task_of: 'a future -> task
    52   val peek: 'a future -> 'a Exn.result option
    53   val is_finished: 'a future -> bool
    54   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    55   val cancel_group: group -> unit
    56   val cancel: 'a future -> unit
    57   type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
    58   val default_params: params
    59   val forks: params -> (unit -> 'a) list -> 'a future list
    60   val fork_pri: int -> (unit -> 'a) -> 'a future
    61   val fork: (unit -> 'a) -> 'a future
    62   val join_results: 'a future list -> 'a Exn.result list
    63   val join_result: 'a future -> 'a Exn.result
    64   val joins: 'a future list -> 'a list
    65   val join: 'a future -> 'a
    66   val join_tasks: task list -> unit
    67   val value_result: 'a Exn.result -> 'a future
    68   val value: 'a -> 'a future
    69   val cond_forks: params -> (unit -> 'a) list -> 'a future list
    70   val map: ('a -> 'b) -> 'a future -> 'b future
    71   val promise_group: group -> (unit -> unit) -> 'a future
    72   val promise: (unit -> unit) -> 'a future
    73   val fulfill_result: 'a future -> 'a Exn.result -> unit
    74   val fulfill: 'a future -> 'a -> unit
    75   val shutdown: unit -> unit
    76   val status: (unit -> 'a) -> 'a
    77   val group_tasks: group -> task list
    78   val queue_status: unit -> {ready: int, pending: int, running: int, passive: int}
    79 end;
    80 
    81 structure Future: FUTURE =
    82 struct
    83 
    84 (** future values **)
    85 
    86 type task = Task_Queue.task;
    87 type group = Task_Queue.group;
    88 val new_group = Task_Queue.new_group;
    89 
    90 
    91 (* identifiers *)
    92 
    93 local
    94   val tag = Universal.tag () : task option Universal.tag;
    95 in
    96   fun worker_task () = the_default NONE (Thread.getLocal tag);
    97   fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
    98 end;
    99 
   100 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
   101 fun worker_subgroup () = new_group (worker_group ());
   102 
   103 fun worker_joining e =
   104   (case worker_task () of
   105     NONE => e ()
   106   | SOME task => Task_Queue.joining task e);
   107 
   108 fun worker_waiting deps e =
   109   (case worker_task () of
   110     NONE => e ()
   111   | SOME task => Task_Queue.waiting task deps e);
   112 
   113 
   114 (* datatype future *)
   115 
   116 type 'a result = 'a Exn.result Single_Assignment.var;
   117 
   118 datatype 'a future = Future of
   119  {promised: bool,
   120   task: task,
   121   result: 'a result};
   122 
   123 fun task_of (Future {task, ...}) = task;
   124 fun result_of (Future {result, ...}) = result;
   125 
   126 fun peek x = Single_Assignment.peek (result_of x);
   127 fun is_finished x = is_some (peek x);
   128 
   129 
   130 
   131 (** scheduling **)
   132 
   133 (* synchronization *)
   134 
   135 val scheduler_event = ConditionVar.conditionVar ();
   136 val work_available = ConditionVar.conditionVar ();
   137 val work_finished = ConditionVar.conditionVar ();
   138 
   139 local
   140   val lock = Mutex.mutex ();
   141 in
   142 
   143 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   144 
   145 fun wait cond = (*requires SYNCHRONIZED*)
   146   Multithreading.sync_wait NONE NONE cond lock;
   147 
   148 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   149   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   150 
   151 fun signal cond = (*requires SYNCHRONIZED*)
   152   ConditionVar.signal cond;
   153 
   154 fun broadcast cond = (*requires SYNCHRONIZED*)
   155   ConditionVar.broadcast cond;
   156 
   157 fun broadcast_work () = (*requires SYNCHRONIZED*)
   158  (ConditionVar.broadcast work_available;
   159   ConditionVar.broadcast work_finished);
   160 
   161 end;
   162 
   163 
   164 (* global state *)
   165 
   166 val queue = Unsynchronized.ref Task_Queue.empty;
   167 val next = Unsynchronized.ref 0;
   168 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   169 val canceled = Unsynchronized.ref ([]: group list);
   170 val do_shutdown = Unsynchronized.ref false;
   171 val max_workers = Unsynchronized.ref 0;
   172 val max_active = Unsynchronized.ref 0;
   173 val worker_trend = Unsynchronized.ref 0;
   174 
   175 datatype worker_state = Working | Waiting | Sleeping;
   176 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   177 
   178 fun count_workers state = (*requires SYNCHRONIZED*)
   179   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   180 
   181 
   182 (* cancellation primitives *)
   183 
   184 fun cancel_now group = (*requires SYNCHRONIZED*)
   185   let
   186     val running = Task_Queue.cancel (! queue) group;
   187     val _ = List.app Simple_Thread.interrupt_unsynchronized running;
   188   in running end;
   189 
   190 fun cancel_all () = (*requires SYNCHRONIZED*)
   191   let
   192     val (groups, threads) = Task_Queue.cancel_all (! queue);
   193     val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
   194   in groups end;
   195 
   196 fun cancel_later group = (*requires SYNCHRONIZED*)
   197  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   198   broadcast scheduler_event);
   199 
   200 fun interruptible_task f x =
   201   (if Multithreading.available then
   202     Multithreading.with_attributes
   203       (if is_some (worker_task ())
   204        then Multithreading.private_interrupts
   205        else Multithreading.public_interrupts)
   206       (fn _ => f x)
   207    else interruptible f x)
   208   before Multithreading.interrupted ();
   209 
   210 
   211 (* worker threads *)
   212 
   213 fun worker_exec (task, jobs) =
   214   let
   215     val group = Task_Queue.group_of_task task;
   216     val valid = not (Task_Queue.is_canceled group);
   217     val ok =
   218       Task_Queue.running task (fn () =>
   219         setmp_worker_task task (fn () =>
   220           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
   221     val _ = Multithreading.tracing 2 (fn () =>
   222       let
   223         val s = Task_Queue.str_of_task_groups task;
   224         fun micros time = string_of_int (Time.toNanoseconds time div 1000);
   225         val (run, wait, deps) = Task_Queue.timing_of_task task;
   226       in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
   227     val _ = SYNCHRONIZED "finish" (fn () =>
   228       let
   229         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   230         val test = Exn.capture Multithreading.interrupted ();
   231         val _ =
   232           if ok andalso not (Exn.is_interrupt_exn test) then ()
   233           else if null (cancel_now group) then ()
   234           else cancel_later group;
   235         val _ = broadcast work_finished;
   236         val _ = if maximal then () else signal work_available;
   237       in () end);
   238   in () end;
   239 
   240 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   241   let
   242     val state =
   243       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   244         SOME state => state
   245       | NONE => raise Fail "Unregistered worker thread");
   246     val _ = state := (if active then Waiting else Sleeping);
   247     val _ = wait cond;
   248     val _ = state := Working;
   249   in () end;
   250 
   251 fun worker_next () = (*requires SYNCHRONIZED*)
   252   if length (! workers) > ! max_workers then
   253     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   254      signal work_available;
   255      NONE)
   256   else if count_workers Working > ! max_active then
   257     (worker_wait false work_available; worker_next ())
   258   else
   259     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   260       NONE => (worker_wait false work_available; worker_next ())
   261     | some => (signal work_available; some));
   262 
   263 fun worker_loop name =
   264   (case SYNCHRONIZED name (fn () => worker_next ()) of
   265     NONE => ()
   266   | SOME work => (worker_exec work; worker_loop name));
   267 
   268 fun worker_start name = (*requires SYNCHRONIZED*)
   269   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   270     Unsynchronized.ref Working));
   271 
   272 
   273 (* scheduler *)
   274 
   275 val status_ticks = Unsynchronized.ref 0;
   276 
   277 val last_round = Unsynchronized.ref Time.zeroTime;
   278 val next_round = seconds 0.05;
   279 
   280 fun scheduler_next () = (*requires SYNCHRONIZED*)
   281   let
   282     val now = Time.now ();
   283     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   284     val _ = if tick then last_round := now else ();
   285 
   286 
   287     (* queue and worker status *)
   288 
   289     val _ =
   290       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   291     val _ =
   292       if tick andalso ! status_ticks = 0 then
   293         Multithreading.tracing 1 (fn () =>
   294           let
   295             val {ready, pending, running, passive} = Task_Queue.status (! queue);
   296             val total = length (! workers);
   297             val active = count_workers Working;
   298             val waiting = count_workers Waiting;
   299           in
   300             "SCHEDULE " ^ Time.toString now ^ ": " ^
   301               string_of_int ready ^ " ready, " ^
   302               string_of_int pending ^ " pending, " ^
   303               string_of_int running ^ " running, " ^
   304               string_of_int passive ^ " passive; " ^
   305               string_of_int total ^ " workers, " ^
   306               string_of_int active ^ " active, " ^
   307               string_of_int waiting ^ " waiting "
   308           end)
   309       else ();
   310 
   311     val _ =
   312       if forall (Thread.isActive o #1) (! workers) then ()
   313       else
   314         let
   315           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   316           val _ = workers := alive;
   317         in
   318           Multithreading.tracing 0 (fn () =>
   319             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   320         end;
   321 
   322 
   323     (* worker pool adjustments *)
   324 
   325     val max_active0 = ! max_active;
   326     val max_workers0 = ! max_workers;
   327 
   328     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   329     val _ = max_active := m;
   330 
   331     val mm =
   332       if ! do_shutdown then 0
   333       else if m = 9999 then 1
   334       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   335     val _ =
   336       if tick andalso mm > ! max_workers then
   337         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   338       else if tick andalso mm < ! max_workers then
   339         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   340       else ();
   341     val _ =
   342       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   343         max_workers := mm
   344       else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
   345         max_workers := Int.min (mm, 2 * m)
   346       else ();
   347 
   348     val missing = ! max_workers - length (! workers);
   349     val _ =
   350       if missing > 0 then
   351         funpow missing (fn () =>
   352           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   353       else ();
   354 
   355     val _ =
   356       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   357       else signal work_available;
   358 
   359 
   360     (* canceled groups *)
   361 
   362     val _ =
   363       if null (! canceled) then ()
   364       else
   365        (Multithreading.tracing 1 (fn () =>
   366           string_of_int (length (! canceled)) ^ " canceled groups");
   367         Unsynchronized.change canceled (filter_out (null o cancel_now));
   368         broadcast_work ());
   369 
   370 
   371     (* delay loop *)
   372 
   373     val _ = Exn.release (wait_timeout next_round scheduler_event);
   374 
   375 
   376     (* shutdown *)
   377 
   378     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   379     val continue = not (! do_shutdown andalso null (! workers));
   380     val _ = if continue then () else scheduler := NONE;
   381 
   382     val _ = broadcast scheduler_event;
   383   in continue end
   384   handle exn =>
   385     if Exn.is_interrupt exn then
   386      (Multithreading.tracing 1 (fn () => "Interrupt");
   387       List.app cancel_later (cancel_all ());
   388       broadcast_work (); true)
   389     else reraise exn;
   390 
   391 fun scheduler_loop () =
   392  (while
   393     Multithreading.with_attributes
   394       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   395       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   396   do (); last_round := Time.zeroTime);
   397 
   398 fun scheduler_active () = (*requires SYNCHRONIZED*)
   399   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   400 
   401 fun scheduler_check () = (*requires SYNCHRONIZED*)
   402  (do_shutdown := false;
   403   if scheduler_active () then ()
   404   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   405 
   406 
   407 
   408 (** futures **)
   409 
   410 (* cancel *)
   411 
   412 fun cancel_group group = SYNCHRONIZED "cancel_group" (fn () =>
   413   let
   414     val _ = if null (cancel_now group) then () else cancel_later group;
   415     val _ = signal work_available;
   416     val _ = scheduler_check ();
   417   in () end);
   418 
   419 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
   420 
   421 
   422 (* future jobs *)
   423 
   424 fun assign_result group result raw_res =
   425   let
   426     val res =
   427       (case raw_res of
   428         Exn.Exn exn => Exn.Exn (#2 (Par_Exn.serial exn))
   429       | _ => raw_res);
   430     val _ = Single_Assignment.assign result res
   431       handle exn as Fail _ =>
   432         (case Single_Assignment.peek result of
   433           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   434         | _ => reraise exn);
   435     val ok =
   436       (case the (Single_Assignment.peek result) of
   437         Exn.Exn exn =>
   438           (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
   439       | Exn.Res _ => true);
   440   in ok end;
   441 
   442 fun future_job group interrupts (e: unit -> 'a) =
   443   let
   444     val result = Single_Assignment.var "future" : 'a result;
   445     val pos = Position.thread_data ();
   446     fun job ok =
   447       let
   448         val res =
   449           if ok then
   450             Exn.capture (fn () =>
   451               Multithreading.with_attributes
   452                 (if interrupts
   453                  then Multithreading.private_interrupts else Multithreading.no_interrupts)
   454                 (fn _ => Position.setmp_thread_data pos e ())) ()
   455           else Exn.interrupt_exn;
   456       in assign_result group result res end;
   457   in (result, job) end;
   458 
   459 
   460 (* fork *)
   461 
   462 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
   463 val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
   464 
   465 fun forks ({name, group, deps, pri, interrupts}: params) es =
   466   if null es then []
   467   else
   468     let
   469       val grp =
   470         (case group of
   471           NONE => worker_subgroup ()
   472         | SOME grp => grp);
   473       fun enqueue e queue =
   474         let
   475           val (result, job) = future_job grp interrupts e;
   476           val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
   477           val future = Future {promised = false, task = task, result = result};
   478         in (future, queue') end;
   479     in
   480       SYNCHRONIZED "enqueue" (fn () =>
   481         let
   482           val (futures, queue') = fold_map enqueue es (! queue);
   483           val _ = queue := queue';
   484           val minimal = forall (not o Task_Queue.known_task queue') deps;
   485           val _ = if minimal then signal work_available else ();
   486           val _ = scheduler_check ();
   487         in futures end)
   488     end;
   489 
   490 fun fork_pri pri e =
   491   (singleton o forks) {name = "fork", group = NONE, deps = [], pri = pri, interrupts = true} e;
   492 
   493 fun fork e = fork_pri 0 e;
   494 
   495 
   496 (* join *)
   497 
   498 local
   499 
   500 fun get_result x =
   501   (case peek x of
   502     NONE => Exn.Exn (Fail "Unfinished future")
   503   | SOME res =>
   504       if Exn.is_interrupt_exn res then
   505         (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
   506           NONE => res
   507         | SOME exn => Exn.Exn exn)
   508       else res);
   509 
   510 fun join_next deps = (*requires SYNCHRONIZED*)
   511   if null deps then NONE
   512   else
   513     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
   514       (NONE, []) => NONE
   515     | (NONE, deps') =>
   516         (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
   517     | (SOME work, deps') => SOME (work, deps'));
   518 
   519 fun execute_work NONE = ()
   520   | execute_work (SOME (work, deps')) =
   521       (worker_joining (fn () => worker_exec work); join_work deps')
   522 and join_work deps =
   523   Multithreading.with_attributes Multithreading.no_interrupts
   524     (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
   525 
   526 in
   527 
   528 fun join_results xs =
   529   let
   530     val _ =
   531       if forall is_finished xs then ()
   532       else if Multithreading.self_critical () then
   533         error "Cannot join future values within critical section"
   534       else if is_some (worker_task ()) then join_work (map task_of xs)
   535       else List.app (ignore o Single_Assignment.await o result_of) xs;
   536   in map get_result xs end;
   537 
   538 end;
   539 
   540 fun join_result x = singleton join_results x;
   541 fun joins xs = Par_Exn.release_all (join_results xs);
   542 fun join x = Exn.release (join_result x);
   543 
   544 fun join_tasks [] = ()
   545   | join_tasks tasks =
   546       (singleton o forks)
   547         {name = "join_tasks", group = SOME (new_group NONE),
   548           deps = tasks, pri = 0, interrupts = false} I
   549       |> join;
   550 
   551 
   552 (* fast-path versions -- bypassing task queue *)
   553 
   554 fun value_result (res: 'a Exn.result) =
   555   let
   556     val task = Task_Queue.dummy_task;
   557     val group = Task_Queue.group_of_task task;
   558     val result = Single_Assignment.var "value" : 'a result;
   559     val _ = assign_result group result res;
   560   in Future {promised = false, task = task, result = result} end;
   561 
   562 fun value x = value_result (Exn.Res x);
   563 
   564 fun cond_forks args es =
   565   if Multithreading.enabled () then forks args es
   566   else map (fn e => value_result (Exn.interruptible_capture e ())) es;
   567 
   568 fun map_future f x =
   569   let
   570     val task = task_of x;
   571     val group = new_group (SOME (Task_Queue.group_of_task task));
   572     val (result, job) = future_job group true (fn () => f (join x));
   573 
   574     val extended = SYNCHRONIZED "extend" (fn () =>
   575       (case Task_Queue.extend task job (! queue) of
   576         SOME queue' => (queue := queue'; true)
   577       | NONE => false));
   578   in
   579     if extended then Future {promised = false, task = task, result = result}
   580     else
   581       (singleton o cond_forks)
   582         {name = "map_future", group = SOME group, deps = [task],
   583           pri = Task_Queue.pri_of_task task, interrupts = true}
   584         (fn () => f (join x))
   585   end;
   586 
   587 
   588 (* promised futures -- fulfilled by external means *)
   589 
   590 fun promise_group group abort : 'a future =
   591   let
   592     val result = Single_Assignment.var "promise" : 'a result;
   593     fun assign () = assign_result group result Exn.interrupt_exn
   594       handle Fail _ => true
   595         | exn =>
   596             if Exn.is_interrupt exn
   597             then raise Fail "Concurrent attempt to fulfill promise"
   598             else reraise exn;
   599     fun job () =
   600       Multithreading.with_attributes Multithreading.no_interrupts
   601         (fn _ => assign () before abort ());
   602     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   603       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
   604   in Future {promised = true, task = task, result = result} end;
   605 
   606 fun promise abort = promise_group (worker_subgroup ()) abort;
   607 
   608 fun fulfill_result (Future {promised, task, result}) res =
   609   if not promised then raise Fail "Not a promised future"
   610   else
   611     let
   612       val group = Task_Queue.group_of_task task;
   613       fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
   614       val _ =
   615         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   616           let
   617             val still_passive =
   618               SYNCHRONIZED "fulfill_result" (fn () =>
   619                 Unsynchronized.change_result queue
   620                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   621           in if still_passive then worker_exec (task, [job]) else () end);
   622       val _ =
   623         if is_some (Single_Assignment.peek result) then ()
   624         else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
   625     in () end;
   626 
   627 fun fulfill x res = fulfill_result x (Exn.Res res);
   628 
   629 
   630 (* shutdown *)
   631 
   632 fun shutdown () =
   633   if Multithreading.available then
   634     SYNCHRONIZED "shutdown" (fn () =>
   635      while scheduler_active () do
   636       (wait scheduler_event; broadcast_work ()))
   637   else ();
   638 
   639 
   640 (* status markup *)
   641 
   642 fun status e =
   643   let
   644     val task_props =
   645       (case worker_task () of
   646         NONE => I
   647       | SOME task => Markup.properties [(Isabelle_Markup.taskN, Task_Queue.str_of_task task)]);
   648     val _ = Output.status (Markup.markup_only (task_props Isabelle_Markup.forked));
   649     val x = e ();  (*sic -- report "joined" only for success*)
   650     val _ = Output.status (Markup.markup_only (task_props Isabelle_Markup.joined));
   651   in x end;
   652 
   653 
   654 (* queue status *)
   655 
   656 fun group_tasks group = Task_Queue.group_tasks (! queue) group;
   657 
   658 fun queue_status () = Task_Queue.status (! queue);
   659 
   660 
   661 (*final declarations of this structure!*)
   662 val map = map_future;
   663 
   664 end;
   665 
   666 type 'a future = 'a Future.future;
   667