src/Pure/ML-Systems/multithreading_polyml.ML
author wenzelm
Sat Nov 27 16:29:53 2010 +0100 (2010-11-27 ago)
changeset 40748 591b6778d076
parent 40301 bf39a257b3d3
child 41710 11ae688e4e30
permissions -rw-r--r--
removed bash from ML system bootstrap, and past the Secure ML barrier;
     1 (*  Title:      Pure/ML-Systems/multithreading_polyml.ML
     2     Author:     Makarius
     3 
     4 Multithreading in Poly/ML 5.2.1 or later (cf. polyml/basis/Thread.sml).
     5 *)
     6 
     7 signature MULTITHREADING_POLYML =
     8 sig
     9   val interruptible: ('a -> 'b) -> 'a -> 'b
    10   val uninterruptible: ((('c -> 'd) -> 'c -> 'd) -> 'a -> 'b) -> 'a -> 'b
    11   structure TimeLimit: TIME_LIMIT
    12 end;
    13 
    14 signature BASIC_MULTITHREADING =
    15 sig
    16   include BASIC_MULTITHREADING
    17   include MULTITHREADING_POLYML
    18 end;
    19 
    20 signature MULTITHREADING =
    21 sig
    22   include MULTITHREADING
    23   include MULTITHREADING_POLYML
    24 end;
    25 
    26 structure Multithreading: MULTITHREADING =
    27 struct
    28 
    29 (* options *)
    30 
    31 val available = true;
    32 
    33 val max_threads = ref 0;
    34 
    35 fun max_threads_value () =
    36   let val m = ! max_threads in
    37     if m > 0 then m
    38     else Int.min (Int.max (Thread.numProcessors (), 1), 4)
    39   end;
    40 
    41 fun enabled () = max_threads_value () > 1;
    42 
    43 
    44 (* thread attributes *)
    45 
    46 val no_interrupts =
    47   [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer];
    48 
    49 val public_interrupts =
    50   [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce];
    51 
    52 val private_interrupts =
    53   [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptAsynchOnce];
    54 
    55 val sync_interrupts = map
    56   (fn x as Thread.InterruptState Thread.InterruptDefer => x
    57     | Thread.InterruptState _ => Thread.InterruptState Thread.InterruptSynch
    58     | x => x);
    59 
    60 val safe_interrupts = map
    61   (fn Thread.InterruptState Thread.InterruptAsynch =>
    62       Thread.InterruptState Thread.InterruptAsynchOnce
    63     | x => x);
    64 
    65 fun with_attributes new_atts e =
    66   let
    67     val orig_atts = safe_interrupts (Thread.getAttributes ());
    68     val result = Exn.capture (fn () =>
    69       (Thread.setAttributes (safe_interrupts new_atts); e orig_atts)) ();
    70     val _ = Thread.setAttributes orig_atts;
    71   in Exn.release result end;
    72 
    73 
    74 (* portable wrappers *)
    75 
    76 fun interruptible f x = with_attributes public_interrupts (fn _ => f x);
    77 
    78 fun uninterruptible f x =
    79   with_attributes no_interrupts (fn atts =>
    80     f (fn g => fn y => with_attributes atts (fn _ => g y)) x);
    81 
    82 
    83 (* synchronous wait *)
    84 
    85 fun sync_wait opt_atts time cond lock =
    86   with_attributes
    87     (sync_interrupts (case opt_atts of SOME atts => atts | NONE => Thread.getAttributes ()))
    88     (fn _ =>
    89       (case time of
    90         SOME t => Exn.Result (ConditionVar.waitUntil (cond, lock, t))
    91       | NONE => (ConditionVar.wait (cond, lock); Exn.Result true))
    92       handle exn => Exn.Exn exn);
    93 
    94 
    95 (* tracing *)
    96 
    97 val trace = ref 0;
    98 
    99 fun tracing level msg =
   100   if level > ! trace then ()
   101   else uninterruptible (fn _ => fn () =>
   102     (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
   103       handle _ (*sic*) => ()) ();
   104 
   105 fun tracing_time detailed time =
   106   tracing
   107    (if not detailed then 5
   108     else if Time.>= (time, seconds 1.0) then 1
   109     else if Time.>= (time, seconds 0.1) then 2
   110     else if Time.>= (time, seconds 0.01) then 3
   111     else if Time.>= (time, seconds 0.001) then 4 else 5);
   112 
   113 fun real_time f x =
   114   let
   115     val timer = Timer.startRealTimer ();
   116     val () = f x;
   117     val time = Timer.checkRealTimer timer;
   118   in time end;
   119 
   120 
   121 (* execution with time limit *)
   122 
   123 structure TimeLimit =
   124 struct
   125 
   126 exception TimeOut;
   127 
   128 fun timeLimit time f x = uninterruptible (fn restore_attributes => fn () =>
   129   let
   130     val worker = Thread.self ();
   131     val timeout = ref false;
   132     val watchdog = Thread.fork (fn () =>
   133       (OS.Process.sleep time; timeout := true; Thread.interrupt worker), []);
   134 
   135     val result = Exn.capture (restore_attributes f) x;
   136     val was_timeout = Exn.is_interrupt_exn result andalso ! timeout;
   137 
   138     val _ = Thread.interrupt watchdog handle Thread _ => ();
   139   in if was_timeout then raise TimeOut else Exn.release result end) ();
   140 
   141 end;
   142 
   143 
   144 (* critical section -- may be nested within the same thread *)
   145 
   146 local
   147 
   148 val critical_lock = Mutex.mutex ();
   149 val critical_thread = ref (NONE: Thread.thread option);
   150 val critical_name = ref "";
   151 
   152 fun show "" = "" | show name = " " ^ name;
   153 fun show' "" = "" | show' name = " [" ^ name ^ "]";
   154 
   155 in
   156 
   157 fun self_critical () =
   158   (case ! critical_thread of
   159     NONE => false
   160   | SOME t => Thread.equal (t, Thread.self ()));
   161 
   162 fun NAMED_CRITICAL name e =
   163   if self_critical () then e ()
   164   else
   165     Exn.release (uninterruptible (fn restore_attributes => fn () =>
   166       let
   167         val name' = ! critical_name;
   168         val _ =
   169           if Mutex.trylock critical_lock then ()
   170           else
   171             let
   172               val _ = tracing 5 (fn () => "CRITICAL" ^ show name ^ show' name' ^ ": waiting");
   173               val time = real_time Mutex.lock critical_lock;
   174               val _ = tracing_time true time (fn () =>
   175                 "CRITICAL" ^ show name ^ show' name' ^ ": passed after " ^ Time.toString time);
   176             in () end;
   177         val _ = critical_thread := SOME (Thread.self ());
   178         val _ = critical_name := name;
   179         val result = Exn.capture (restore_attributes e) ();
   180         val _ = critical_name := "";
   181         val _ = critical_thread := NONE;
   182         val _ = Mutex.unlock critical_lock;
   183       in result end) ());
   184 
   185 fun CRITICAL e = NAMED_CRITICAL "" e;
   186 
   187 end;
   188 
   189 
   190 (* serial numbers *)
   191 
   192 local
   193 
   194 val serial_lock = Mutex.mutex ();
   195 val serial_count = ref 0;
   196 
   197 in
   198 
   199 val serial = uninterruptible (fn _ => fn () =>
   200   let
   201     val _ = Mutex.lock serial_lock;
   202     val _ = serial_count := ! serial_count + 1;
   203     val res = ! serial_count;
   204     val _ = Mutex.unlock serial_lock;
   205   in res end);
   206 
   207 end;
   208 
   209 end;
   210 
   211 structure Basic_Multithreading: BASIC_MULTITHREADING = Multithreading;
   212 open Basic_Multithreading;