src/Pure/ML-Systems/multithreading_polyml.ML
author wenzelm
Thu Aug 09 23:53:50 2007 +0200 (2007-08-09)
changeset 24208 f4cafbaa05e4
parent 24144 ec51a0f7eefe
child 24214 0482ecc4ef11
permissions -rw-r--r--
schedule: more precise task model;
improved error handling: first failure causes interrupt of all threads;
misc cleanup;
     1 (*  Title:      Pure/ML-Systems/multithreading_polyml.ML
     2     ID:         $Id$
     3     Author:     Makarius
     4 
     5 Multithreading in Poly/ML (version 5.1).
     6 *)
     7 
     8 open Thread;
     9 
    10 signature MULTITHREADING =
    11 sig
    12   include MULTITHREADING
    13   val uninterruptible: ('a -> 'b) -> 'a -> 'b
    14   val interruptible: ('a -> 'b) -> 'a -> 'b
    15 end;
    16 
    17 structure Multithreading: MULTITHREADING =
    18 struct
    19 
    20 (* options *)
    21 
    22 val trace = ref 0;
    23 fun tracing level msg =
    24   if level <= ! trace
    25   then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
    26   else ();
    27 
    28 val available = true;
    29 val max_threads = ref 1;
    30 
    31 
    32 (* misc utils *)
    33 
    34 fun cons x xs = x :: xs;
    35 
    36 fun change r f = r := f (! r);
    37 
    38 fun inc i = (i := ! i + 1; ! i);
    39 fun dec i = (i := ! i - 1; ! i);
    40 
    41 fun show "" = "" | show name = " " ^ name;
    42 fun show' "" = "" | show' name = " [" ^ name ^ "]";
    43 
    44 
    45 (* thread attributes *)
    46 
    47 local
    48 
    49 fun with_attributes new_atts f x =
    50   let
    51     val orig_atts = Thread.getAttributes ();
    52     fun restore () = Thread.setAttributes orig_atts;
    53   in
    54     Exn.release
    55      (let
    56         val _ = Thread.setAttributes new_atts;
    57         val result = Exn.capture f x;
    58         val _ = restore ();
    59       in result end
    60       handle Interrupt => (restore (); Exn.Exn Interrupt))
    61   end;
    62 
    63 fun with_interrupt_state state = with_attributes [Thread.InterruptState state];
    64 
    65 in
    66 
    67 fun uninterruptible f x = with_interrupt_state Thread.InterruptDefer f x;
    68 fun interruptible f x = with_interrupt_state Thread.InterruptAsynchOnce f x;
    69 
    70 end;
    71 
    72 
    73 (* critical section -- may be nested within the same thread *)
    74 
    75 local
    76 
    77 val critical_lock = Mutex.mutex ();
    78 val critical_thread = ref (NONE: Thread.thread option);
    79 val critical_name = ref "";
    80 
    81 in
    82 
    83 fun self_critical () =
    84   (case ! critical_thread of
    85     NONE => false
    86   | SOME id => Thread.equal (id, Thread.self ()));
    87 
    88 fun NAMED_CRITICAL name e =
    89   if self_critical () then e ()
    90   else
    91     uninterruptible (fn () =>
    92       let
    93         val name' = ! critical_name;
    94         val _ =
    95           if Mutex.trylock critical_lock then ()
    96           else
    97             let
    98               val timer = Timer.startRealTimer ();
    99               val _ = tracing 4 (fn () => "CRITICAL" ^ show name ^ show' name' ^ ": waiting");
   100               val _ = Mutex.lock critical_lock;
   101               val time = Timer.checkRealTimer timer;
   102               val _ = tracing (if Time.> (time, Time.fromMilliseconds 10) then 3 else 4) (fn () =>
   103                 "CRITICAL" ^ show name ^ show' name' ^ ": passed after " ^ Time.toString time);
   104             in () end;
   105         val _ = critical_thread := SOME (Thread.self ());
   106         val _ = critical_name := name;
   107         val result = Exn.capture e ();
   108         val _ = critical_name := "";
   109         val _ = critical_thread := NONE;
   110         val _ = Mutex.unlock critical_lock;
   111       in Exn.release result end) ();
   112 
   113 fun CRITICAL e = NAMED_CRITICAL "" e;
   114 
   115 end;
   116 
   117 
   118 (* scheduling -- multiple threads working on a queue of tasks *)
   119 
   120 datatype 'a task =
   121   Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
   122 
   123 local
   124 
   125 val protected_name = ref "";
   126 
   127 in
   128 
   129 fun schedule n next_task = uninterruptible (fn tasks =>
   130   let
   131     (*protected execution*)
   132     val lock = Mutex.mutex ();
   133     fun PROTECTED name e =
   134       let
   135         val name' = ! protected_name;
   136         val _ =
   137           if Mutex.trylock lock then ()
   138           else
   139             let
   140               val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": waiting");
   141               val _ = Mutex.lock lock;
   142               val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": passed");
   143             in () end;
   144         val _ = protected_name := name;
   145         val res = Exn.capture e ();
   146         val _ = protected_name := "";
   147         val _ = Mutex.unlock lock;
   148       in Exn.release res end;
   149 
   150     (*wakeup condition*)
   151     val wakeup = ConditionVar.conditionVar ();
   152     fun wakeup_all () = ConditionVar.broadcast wakeup;
   153     fun wait () = ConditionVar.wait (wakeup, lock);
   154 
   155     (*the queue of tasks*)
   156     val queue = ref tasks;
   157     val active = ref 0;
   158     fun trace_active () = tracing 1 (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active");
   159     fun dequeue () =
   160       let
   161         val (next, tasks') = next_task (! queue);
   162         val _ = queue := tasks';
   163       in
   164         (case next of Wait =>
   165           (dec active; trace_active ();
   166             wait ();
   167             inc active; trace_active ();
   168             dequeue ())
   169         | _ => next)
   170       end;
   171 
   172     (*pool of running threads*)
   173     val status = ref ([]: exn list);
   174     val running = ref ([]: Thread.thread list);
   175     fun start f =
   176       (inc active;
   177        change running (cons (Thread.fork (f, [Thread.InterruptState Thread.InterruptDefer]))));
   178     fun stop () =
   179       (dec active;
   180        change running (List.filter (fn t => not (Thread.equal (t, Thread.self ())))));
   181 
   182    (*worker thread*)
   183     fun worker () =
   184       (case PROTECTED "dequeue" dequeue of
   185         Task {body, cont, fail} =>
   186           (case Exn.capture (interruptible body) () of
   187             Exn.Result () =>
   188               (PROTECTED "cont" (fn () => (change queue cont; wakeup_all ())); worker ())
   189           | Exn.Exn exn =>
   190               PROTECTED "fail" (fn () =>
   191                 (change status (cons exn); change queue fail; stop (); wakeup_all ())))
   192       | Terminate => PROTECTED "terminate" (fn () => (stop (); wakeup_all ())));
   193 
   194     (*main control: fork and wait*)
   195     fun fork 0 = ()
   196       | fork k = (start worker; fork (k - 1));
   197     val _ = PROTECTED "main" (fn () =>
   198      (fork (Int.max (n, 1));
   199       while not (List.null (! running)) do
   200       (trace_active ();
   201        if not (List.null (! status)) then (List.app Thread.interrupt (! running)) else ();
   202        wait ())));
   203 
   204   in ! status end);
   205 
   206 end;
   207 
   208 end;
   209 
   210 val NAMED_CRITICAL = Multithreading.NAMED_CRITICAL;
   211 val CRITICAL = Multithreading.CRITICAL;
   212