src/Pure/Concurrent/future.ML
author wenzelm
Sat Apr 02 23:29:05 2016 +0200 (2016-04-02 ago)
changeset 62826 eb94e570c1a4
parent 62823 751bcf0473a7
child 62889 99c7f31615c2
permissions -rw-r--r--
prefer infix operations;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Value-oriented parallel execution via futures and promises.
     5 *)
     6 
     7 signature FUTURE =
     8 sig
     9   type task = Task_Queue.task
    10   type group = Task_Queue.group
    11   val new_group: group option -> group
    12   val worker_task: unit -> task option
    13   val worker_group: unit -> group option
    14   val the_worker_group: unit -> group
    15   val worker_subgroup: unit -> group
    16   type 'a future
    17   val task_of: 'a future -> task
    18   val peek: 'a future -> 'a Exn.result option
    19   val is_finished: 'a future -> bool
    20   val ML_statistics: bool Unsynchronized.ref
    21   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    22   val cancel_group: group -> unit
    23   val cancel: 'a future -> unit
    24   val error_message: Position.T -> (serial * string) * string option -> unit
    25   val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result
    26   type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
    27   val default_params: params
    28   val forks: params -> (unit -> 'a) list -> 'a future list
    29   val fork: (unit -> 'a) -> 'a future
    30   val join_results: 'a future list -> 'a Exn.result list
    31   val join_result: 'a future -> 'a Exn.result
    32   val joins: 'a future list -> 'a list
    33   val join: 'a future -> 'a
    34   val join_tasks: task list -> unit
    35   val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b
    36   val value_result: 'a Exn.result -> 'a future
    37   val value: 'a -> 'a future
    38   val cond_forks: params -> (unit -> 'a) list -> 'a future list
    39   val map: ('a -> 'b) -> 'a future -> 'b future
    40   val promise_group: group -> (unit -> unit) -> 'a future
    41   val promise: (unit -> unit) -> 'a future
    42   val fulfill_result: 'a future -> 'a Exn.result -> unit
    43   val fulfill: 'a future -> 'a -> unit
    44   val group_snapshot: group -> task list
    45   val shutdown: unit -> unit
    46 end;
    47 
    48 structure Future: FUTURE =
    49 struct
    50 
    51 (** future values **)
    52 
    53 type task = Task_Queue.task;
    54 type group = Task_Queue.group;
    55 val new_group = Task_Queue.new_group;
    56 
    57 
    58 (* identifiers *)
    59 
    60 local
    61   val tag = Universal.tag () : task option Universal.tag;
    62 in
    63   fun worker_task () = the_default NONE (Thread.getLocal tag);
    64   fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
    65 end;
    66 
    67 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
    68 
    69 fun the_worker_group () =
    70   (case worker_group () of
    71     SOME group => group
    72   | NONE => raise Fail "Missing worker thread context");
    73 
    74 fun worker_subgroup () = new_group (worker_group ());
    75 
    76 fun worker_joining e =
    77   (case worker_task () of
    78     NONE => e ()
    79   | SOME task => Task_Queue.joining task e);
    80 
    81 fun worker_waiting deps e =
    82   (case worker_task () of
    83     NONE => e ()
    84   | SOME task => Task_Queue.waiting task deps e);
    85 
    86 
    87 (* datatype future *)
    88 
    89 type 'a result = 'a Exn.result Single_Assignment.var;
    90 
    91 datatype 'a future = Future of
    92  {promised: bool,
    93   task: task,
    94   result: 'a result};
    95 
    96 fun task_of (Future {task, ...}) = task;
    97 fun result_of (Future {result, ...}) = result;
    98 
    99 fun peek x = Single_Assignment.peek (result_of x);
   100 fun is_finished x = is_some (peek x);
   101 
   102 val _ =
   103   ML_system_pp (fn depth => fn pretty => fn x =>
   104     (case peek x of
   105       NONE => PolyML_Pretty.PrettyString "<future>"
   106     | SOME (Exn.Exn _) => PolyML_Pretty.PrettyString "<failed>"
   107     | SOME (Exn.Res y) => pretty (y, depth)));
   108 
   109 
   110 
   111 (** scheduling **)
   112 
   113 (* synchronization *)
   114 
   115 val scheduler_event = ConditionVar.conditionVar ();
   116 val work_available = ConditionVar.conditionVar ();
   117 val work_finished = ConditionVar.conditionVar ();
   118 
   119 local
   120   val lock = Mutex.mutex ();
   121 in
   122 
   123 fun SYNCHRONIZED name = Multithreading.synchronized name lock;
   124 
   125 fun wait cond = (*requires SYNCHRONIZED*)
   126   Multithreading.sync_wait NONE NONE cond lock;
   127 
   128 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   129   Multithreading.sync_wait NONE (SOME (Time.now () + timeout)) cond lock;
   130 
   131 fun signal cond = (*requires SYNCHRONIZED*)
   132   ConditionVar.signal cond;
   133 
   134 fun broadcast cond = (*requires SYNCHRONIZED*)
   135   ConditionVar.broadcast cond;
   136 
   137 end;
   138 
   139 
   140 (* global state *)
   141 
   142 val queue = Unsynchronized.ref Task_Queue.empty;
   143 val next = Unsynchronized.ref 0;
   144 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   145 val canceled = Unsynchronized.ref ([]: group list);
   146 val do_shutdown = Unsynchronized.ref false;
   147 val max_workers = Unsynchronized.ref 0;
   148 val max_active = Unsynchronized.ref 0;
   149 
   150 val status_ticks = Unsynchronized.ref 0;
   151 val last_round = Unsynchronized.ref Time.zeroTime;
   152 val next_round = seconds 0.05;
   153 
   154 datatype worker_state = Working | Waiting | Sleeping;
   155 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   156 
   157 fun count_workers state = (*requires SYNCHRONIZED*)
   158   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   159 
   160 
   161 
   162 (* status *)
   163 
   164 val ML_statistics = Unsynchronized.ref false;
   165 
   166 fun report_status () = (*requires SYNCHRONIZED*)
   167   if ! ML_statistics then
   168     let
   169       val {ready, pending, running, passive, urgent} = Task_Queue.status (! queue);
   170       val total = length (! workers);
   171       val active = count_workers Working;
   172       val waiting = count_workers Waiting;
   173       val stats =
   174        [("now", Markup.print_real (Time.toReal (Time.now ()))),
   175         ("tasks_ready", Markup.print_int ready),
   176         ("tasks_pending", Markup.print_int pending),
   177         ("tasks_running", Markup.print_int running),
   178         ("tasks_passive", Markup.print_int passive),
   179         ("tasks_urgent", Markup.print_int urgent),
   180         ("workers_total", Markup.print_int total),
   181         ("workers_active", Markup.print_int active),
   182         ("workers_waiting", Markup.print_int waiting)] @
   183         ML_Statistics.get ();
   184     in Output.try_protocol_message (Markup.ML_statistics :: stats) [] end
   185   else ();
   186 
   187 
   188 (* cancellation primitives *)
   189 
   190 fun cancel_now group = (*requires SYNCHRONIZED*)
   191   let
   192     val running = Task_Queue.cancel (! queue) group;
   193     val _ = running |> List.app (fn thread =>
   194       if Standard_Thread.is_self thread then ()
   195       else Standard_Thread.interrupt_unsynchronized thread);
   196   in running end;
   197 
   198 fun cancel_all () = (*requires SYNCHRONIZED*)
   199   let
   200     val (groups, threads) = Task_Queue.cancel_all (! queue);
   201     val _ = List.app Standard_Thread.interrupt_unsynchronized threads;
   202   in groups end;
   203 
   204 fun cancel_later group = (*requires SYNCHRONIZED*)
   205  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   206   broadcast scheduler_event);
   207 
   208 fun interruptible_task f x =
   209   Multithreading.with_attributes
   210     (if is_some (worker_task ())
   211      then Multithreading.private_interrupts
   212      else Multithreading.public_interrupts)
   213     (fn _ => f x)
   214   before Multithreading.interrupted ();
   215 
   216 
   217 (* worker threads *)
   218 
   219 fun worker_exec (task, jobs) =
   220   let
   221     val group = Task_Queue.group_of_task task;
   222     val valid = not (Task_Queue.is_canceled group);
   223     val ok =
   224       Task_Queue.running task (fn () =>
   225         setmp_worker_task task (fn () =>
   226           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
   227     val _ =
   228       if ! Multithreading.trace >= 2 then
   229         Output.try_protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) []
   230       else ();
   231     val _ = SYNCHRONIZED "finish" (fn () =>
   232       let
   233         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   234         val test = Exn.capture Multithreading.interrupted ();
   235         val _ =
   236           if ok andalso not (Exn.is_interrupt_exn test) then ()
   237           else if null (cancel_now group) then ()
   238           else cancel_later group;
   239         val _ = broadcast work_finished;
   240         val _ = if maximal then () else signal work_available;
   241       in () end);
   242   in () end;
   243 
   244 fun worker_wait worker_state cond = (*requires SYNCHRONIZED*)
   245   (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   246     SOME state => Unsynchronized.setmp state worker_state wait cond
   247   | NONE => wait cond);
   248 
   249 fun worker_next () = (*requires SYNCHRONIZED*)
   250   if length (! workers) > ! max_workers then
   251     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   252      signal work_available;
   253      NONE)
   254   else
   255     let val urgent_only = count_workers Working > ! max_active in
   256       (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ()) urgent_only) of
   257         NONE => (worker_wait Sleeping work_available; worker_next ())
   258       | some => (signal work_available; some))
   259     end;
   260 
   261 fun worker_loop name =
   262   (case SYNCHRONIZED name (fn () => worker_next ()) of
   263     NONE => ()
   264   | SOME work => (worker_exec work; worker_loop name));
   265 
   266 fun worker_start name = (*requires SYNCHRONIZED*)
   267   let
   268     val threads_stack_limit =
   269       Real.floor (Options.default_real "threads_stack_limit" * 1024.0 * 1024.0 * 1024.0);
   270     val stack_limit = if threads_stack_limit <= 0 then NONE else SOME threads_stack_limit;
   271     val worker =
   272       Standard_Thread.fork {name = "worker", stack_limit = stack_limit, interrupts = false}
   273         (fn () => worker_loop name);
   274   in Unsynchronized.change workers (cons (worker, Unsynchronized.ref Working)) end
   275   handle Fail msg => Multithreading.tracing 0 (fn () => "SCHEDULER: " ^ msg);
   276 
   277 
   278 (* scheduler *)
   279 
   280 fun scheduler_next () = (*requires SYNCHRONIZED*)
   281   let
   282     val now = Time.now ();
   283     val tick = ! last_round + next_round <= now;
   284     val _ = if tick then last_round := now else ();
   285 
   286 
   287     (* runtime status *)
   288 
   289     val _ =
   290       if tick then Unsynchronized.change status_ticks (fn i => i + 1) else ();
   291     val _ =
   292       if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0
   293       then report_status () else ();
   294 
   295     val _ =
   296       if not tick orelse forall (Thread.isActive o #1) (! workers) then ()
   297       else
   298         let
   299           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   300           val _ = workers := alive;
   301         in
   302           Multithreading.tracing 0 (fn () =>
   303             "SCHEDULER: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   304         end;
   305 
   306 
   307     (* worker pool adjustments *)
   308 
   309     val max_active0 = ! max_active;
   310     val max_workers0 = ! max_workers;
   311 
   312     val m =
   313       if ! do_shutdown andalso Task_Queue.all_passive (! queue) then 0
   314       else Multithreading.max_threads_value ();
   315     val _ = max_active := m;
   316     val _ = max_workers := 2 * m;
   317 
   318     val missing = ! max_workers - length (! workers);
   319     val _ =
   320       if missing > 0 then
   321         funpow missing (fn () =>
   322           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   323       else ();
   324 
   325     val _ =
   326       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   327       else signal work_available;
   328 
   329 
   330     (* canceled groups *)
   331 
   332     val _ =
   333       if null (! canceled) then ()
   334       else
   335        (Multithreading.tracing 1 (fn () =>
   336           string_of_int (length (! canceled)) ^ " canceled groups");
   337         Unsynchronized.change canceled (filter_out (null o cancel_now));
   338         signal work_available);
   339 
   340 
   341     (* delay loop *)
   342 
   343     val _ = Exn.release (wait_timeout next_round scheduler_event);
   344 
   345 
   346     (* shutdown *)
   347 
   348     val continue = not (! do_shutdown andalso null (! workers));
   349     val _ = if continue then () else (report_status (); scheduler := NONE);
   350 
   351     val _ = broadcast scheduler_event;
   352   in continue end
   353   handle exn =>
   354     if Exn.is_interrupt exn then
   355      (Multithreading.tracing 1 (fn () => "SCHEDULER: Interrupt");
   356       List.app cancel_later (cancel_all ());
   357       signal work_available; true)
   358     else Exn.reraise exn;
   359 
   360 fun scheduler_loop () =
   361  (while
   362     Multithreading.with_attributes
   363       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   364       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   365   do (); last_round := Time.zeroTime);
   366 
   367 fun scheduler_active () = (*requires SYNCHRONIZED*)
   368   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   369 
   370 fun scheduler_check () = (*requires SYNCHRONIZED*)
   371  (do_shutdown := false;
   372   if scheduler_active () then ()
   373   else
   374     scheduler :=
   375       SOME (Standard_Thread.fork {name = "scheduler", stack_limit = NONE, interrupts = false}
   376         scheduler_loop));
   377 
   378 
   379 
   380 (** futures **)
   381 
   382 (* cancel *)
   383 
   384 fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)
   385   let
   386     val _ = if null (cancel_now group) then () else cancel_later group;
   387     val _ = signal work_available;
   388     val _ = scheduler_check ();
   389   in () end;
   390 
   391 fun cancel_group group =
   392   SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);
   393 
   394 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
   395 
   396 
   397 (* results *)
   398 
   399 fun error_message pos ((serial, msg), exec_id) =
   400   Position.setmp_thread_data pos (fn () =>
   401     let val id = Position.get_id pos in
   402       if is_none id orelse is_none exec_id orelse id = exec_id
   403       then Output.error_message' (serial, msg) else ()
   404     end) ();
   405 
   406 fun identify_result pos res =
   407   res |> Exn.map_exn (fn exn =>
   408     let val exec_id =
   409       (case Position.get_id pos of
   410         NONE => []
   411       | SOME id => [(Markup.exec_idN, id)])
   412     in Par_Exn.identify exec_id exn end);
   413 
   414 fun assign_result group result res =
   415   let
   416     val _ = Single_Assignment.assign result res
   417       handle exn as Fail _ =>
   418         (case Single_Assignment.peek result of
   419           SOME (Exn.Exn e) => Exn.reraise (if Exn.is_interrupt e then e else exn)
   420         | _ => Exn.reraise exn);
   421     val ok =
   422       (case the (Single_Assignment.peek result) of
   423         Exn.Exn exn =>
   424           (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
   425       | Exn.Res _ => true);
   426   in ok end;
   427 
   428 
   429 (* future jobs *)
   430 
   431 fun future_job group atts (e: unit -> 'a) =
   432   let
   433     val result = Single_Assignment.var "future" : 'a result;
   434     val pos = Position.thread_data ();
   435     val context = Context.thread_data ();
   436     fun job ok =
   437       let
   438         val res =
   439           if ok then
   440             Exn.capture (fn () =>
   441               Multithreading.with_attributes atts (fn _ =>
   442                 (Position.setmp_thread_data pos o Context.setmp_thread_data context) e ())) ()
   443           else Exn.interrupt_exn;
   444       in assign_result group result (identify_result pos res) end;
   445   in (result, job) end;
   446 
   447 
   448 (* fork *)
   449 
   450 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
   451 val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
   452 
   453 fun forks ({name, group, deps, pri, interrupts}: params) es =
   454   if null es then []
   455   else
   456     let
   457       val grp =
   458         (case group of
   459           NONE => worker_subgroup ()
   460         | SOME grp => grp);
   461       fun enqueue e queue =
   462         let
   463           val atts =
   464             if interrupts
   465             then Multithreading.private_interrupts
   466             else Multithreading.no_interrupts;
   467           val (result, job) = future_job grp atts e;
   468           val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
   469           val future = Future {promised = false, task = task, result = result};
   470         in (future, queue') end;
   471     in
   472       SYNCHRONIZED "enqueue" (fn () =>
   473         let
   474           val (futures, queue') = fold_map enqueue es (! queue);
   475           val _ = queue := queue';
   476           val minimal = forall (not o Task_Queue.known_task queue') deps;
   477           val _ = if minimal then signal work_available else ();
   478           val _ = scheduler_check ();
   479         in futures end)
   480     end;
   481 
   482 fun fork e =
   483   (singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e;
   484 
   485 
   486 (* join *)
   487 
   488 fun get_result x =
   489   (case peek x of
   490     NONE => Exn.Exn (Fail "Unfinished future")
   491   | SOME res =>
   492       if Exn.is_interrupt_exn res then
   493         (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
   494           [] => res
   495         | exns => Exn.Exn (Par_Exn.make exns))
   496       else res);
   497 
   498 local
   499 
   500 fun join_next atts deps = (*requires SYNCHRONIZED*)
   501   if null deps then NONE
   502   else
   503     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
   504       (NONE, []) => NONE
   505     | (NONE, deps') =>
   506        (worker_waiting deps' (fn () =>
   507           Multithreading.with_attributes atts (fn _ =>
   508             Exn.release (worker_wait Waiting work_finished)));
   509         join_next atts deps')
   510     | (SOME work, deps') => SOME (work, deps'));
   511 
   512 fun join_loop atts deps =
   513   (case SYNCHRONIZED "join" (fn () => join_next atts deps) of
   514     NONE => ()
   515   | SOME (work, deps') => (worker_joining (fn () => worker_exec work); join_loop atts deps'));
   516 
   517 in
   518 
   519 fun join_results xs =
   520   let
   521     val _ =
   522       if forall is_finished xs then ()
   523       else if is_some (worker_task ()) then
   524         Multithreading.with_attributes Multithreading.no_interrupts
   525           (fn orig_atts => join_loop orig_atts (map task_of xs))
   526       else List.app (ignore o Single_Assignment.await o result_of) xs;
   527   in map get_result xs end;
   528 
   529 end;
   530 
   531 fun join_result x = singleton join_results x;
   532 fun joins xs = Par_Exn.release_all (join_results xs);
   533 fun join x = Exn.release (join_result x);
   534 
   535 fun join_tasks tasks =
   536   if null tasks then ()
   537   else
   538     (singleton o forks)
   539       {name = "join_tasks", group = SOME (new_group NONE),
   540         deps = tasks, pri = 0, interrupts = false} I
   541     |> join;
   542 
   543 
   544 (* task context for running thread *)
   545 
   546 fun task_context name group f x =
   547   Multithreading.with_attributes Multithreading.no_interrupts (fn orig_atts =>
   548     let
   549       val (result, job) = future_job group orig_atts (fn () => f x);
   550       val task =
   551         SYNCHRONIZED "enroll" (fn () =>
   552           Unsynchronized.change_result queue (Task_Queue.enroll (Thread.self ()) name group));
   553       val _ = worker_exec (task, [job]);
   554     in
   555       (case Single_Assignment.peek result of
   556         NONE => raise Fail "Missing task context result"
   557       | SOME res => Exn.release res)
   558     end);
   559 
   560 
   561 (* fast-path operations -- bypass task queue if possible *)
   562 
   563 fun value_result (res: 'a Exn.result) =
   564   let
   565     val task = Task_Queue.dummy_task;
   566     val group = Task_Queue.group_of_task task;
   567     val result = Single_Assignment.var "value" : 'a result;
   568     val _ = assign_result group result (identify_result (Position.thread_data ()) res);
   569   in Future {promised = false, task = task, result = result} end;
   570 
   571 fun value x = value_result (Exn.Res x);
   572 
   573 fun cond_forks args es =
   574   if Multithreading.enabled () then forks args es
   575   else map (fn e => value_result (Exn.interruptible_capture e ())) es;
   576 
   577 fun map_future f x =
   578   if is_finished x then value_result (Exn.interruptible_capture (f o join) x)
   579   else
   580     let
   581       val task = task_of x;
   582       val group = Task_Queue.group_of_task task;
   583       val (result, job) =
   584         future_job group Multithreading.private_interrupts (fn () => f (join x));
   585 
   586       val extended = SYNCHRONIZED "extend" (fn () =>
   587         (case Task_Queue.extend task job (! queue) of
   588           SOME queue' => (queue := queue'; true)
   589         | NONE => false));
   590     in
   591       if extended then Future {promised = false, task = task, result = result}
   592       else
   593         (singleton o cond_forks)
   594           {name = "map_future", group = SOME group, deps = [task],
   595             pri = Task_Queue.pri_of_task task, interrupts = true}
   596           (fn () => f (join x))
   597     end;
   598 
   599 
   600 (* promised futures -- fulfilled by external means *)
   601 
   602 fun promise_group group abort : 'a future =
   603   let
   604     val result = Single_Assignment.var "promise" : 'a result;
   605     fun assign () = assign_result group result Exn.interrupt_exn
   606       handle Fail _ => true
   607         | exn =>
   608             if Exn.is_interrupt exn
   609             then raise Fail "Concurrent attempt to fulfill promise"
   610             else Exn.reraise exn;
   611     fun job () =
   612       Multithreading.with_attributes Multithreading.no_interrupts
   613         (fn _ => Exn.release (Exn.capture assign () before abort ()));
   614     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   615       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
   616   in Future {promised = true, task = task, result = result} end;
   617 
   618 fun promise abort = promise_group (worker_subgroup ()) abort;
   619 
   620 fun fulfill_result (Future {promised, task, result}) res =
   621   if not promised then raise Fail "Not a promised future"
   622   else
   623     let
   624       val group = Task_Queue.group_of_task task;
   625       val pos = Position.thread_data ();
   626       fun job ok =
   627         assign_result group result (if ok then identify_result pos res else Exn.interrupt_exn);
   628       val _ =
   629         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   630           let
   631             val passive_job =
   632               SYNCHRONIZED "fulfill_result" (fn () =>
   633                 Unsynchronized.change_result queue
   634                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   635           in
   636             (case passive_job of
   637               SOME true => worker_exec (task, [job])
   638             | SOME false => ()
   639             | NONE => ignore (job (not (Task_Queue.is_canceled group))))
   640           end);
   641       val _ =
   642         if is_some (Single_Assignment.peek result) then ()
   643         else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
   644     in () end;
   645 
   646 fun fulfill x res = fulfill_result x (Exn.Res res);
   647 
   648 
   649 (* group snapshot *)
   650 
   651 fun group_snapshot group =
   652   SYNCHRONIZED "group_snapshot" (fn () =>
   653     Task_Queue.group_tasks (! queue) group);
   654 
   655 
   656 (* shutdown *)
   657 
   658 fun shutdown () =
   659   if is_some (worker_task ()) then
   660     raise Fail "Cannot shutdown while running as worker thread"
   661   else
   662     SYNCHRONIZED "shutdown" (fn () =>
   663       while scheduler_active () do
   664        (do_shutdown := true;
   665         Multithreading.tracing 1 (fn () => "SHUTDOWN: wait");
   666         wait scheduler_event));
   667 
   668 
   669 (*final declarations of this structure!*)
   670 val map = map_future;
   671 
   672 end;
   673 
   674 type 'a future = 'a Future.future;