src/Pure/ML-Systems/multithreading_polyml.ML
author wenzelm
Sun Nov 18 15:38:37 2012 +0100 (2012-11-18 ago)
changeset 50118 89a14e495526
parent 48879 cb5cdbb645cd
child 50910 54f06ba192ef
permissions -rw-r--r--
adjust max_threads_value to capabilities of Poly/ML 5.5 and current hardware;
     1 (*  Title:      Pure/ML-Systems/multithreading_polyml.ML
     2     Author:     Makarius
     3 
     4 Multithreading in Poly/ML 5.3.0 or later (cf. polyml/basis/Thread.sml).
     5 *)
     6 
     7 signature MULTITHREADING_POLYML =
     8 sig
     9   val interruptible: ('a -> 'b) -> 'a -> 'b
    10   val uninterruptible: ((('c -> 'd) -> 'c -> 'd) -> 'a -> 'b) -> 'a -> 'b
    11 end;
    12 
    13 signature BASIC_MULTITHREADING =
    14 sig
    15   include BASIC_MULTITHREADING
    16   include MULTITHREADING_POLYML
    17 end;
    18 
    19 signature MULTITHREADING =
    20 sig
    21   include MULTITHREADING
    22   include MULTITHREADING_POLYML
    23 end;
    24 
    25 structure Multithreading: MULTITHREADING =
    26 struct
    27 
    28 (* options *)
    29 
    30 val available = true;
    31 
    32 val max_threads = ref 1;
    33 
    34 fun max_threads_value () =
    35   let val m = ! max_threads in
    36     if m > 0 then m
    37     else Int.min (Int.max (Thread.numProcessors (), 1), 8)
    38   end;
    39 
    40 fun enabled () = max_threads_value () > 1;
    41 
    42 
    43 (* thread attributes *)
    44 
    45 val no_interrupts =
    46   [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer];
    47 
    48 val test_interrupts =
    49   [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptSynch];
    50 
    51 val public_interrupts =
    52   [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce];
    53 
    54 val private_interrupts =
    55   [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptAsynchOnce];
    56 
    57 val sync_interrupts = map
    58   (fn x as Thread.InterruptState Thread.InterruptDefer => x
    59     | Thread.InterruptState _ => Thread.InterruptState Thread.InterruptSynch
    60     | x => x);
    61 
    62 val safe_interrupts = map
    63   (fn Thread.InterruptState Thread.InterruptAsynch =>
    64       Thread.InterruptState Thread.InterruptAsynchOnce
    65     | x => x);
    66 
    67 fun interrupted () =
    68   let
    69     val orig_atts = safe_interrupts (Thread.getAttributes ());
    70     val _ = Thread.setAttributes test_interrupts;
    71     val test = Exn.capture Thread.testInterrupt ();
    72     val _ = Thread.setAttributes orig_atts;
    73   in Exn.release test end;
    74 
    75 fun with_attributes new_atts e =
    76   let
    77     val orig_atts = safe_interrupts (Thread.getAttributes ());
    78     val result = Exn.capture (fn () =>
    79       (Thread.setAttributes (safe_interrupts new_atts); e orig_atts)) ();
    80     val _ = Thread.setAttributes orig_atts;
    81   in Exn.release result end;
    82 
    83 
    84 (* portable wrappers *)
    85 
    86 fun interruptible f x = with_attributes public_interrupts (fn _ => f x);
    87 
    88 fun uninterruptible f x =
    89   with_attributes no_interrupts (fn atts =>
    90     f (fn g => fn y => with_attributes atts (fn _ => g y)) x);
    91 
    92 
    93 (* synchronous wait *)
    94 
    95 fun sync_wait opt_atts time cond lock =
    96   with_attributes
    97     (sync_interrupts (case opt_atts of SOME atts => atts | NONE => Thread.getAttributes ()))
    98     (fn _ =>
    99       (case time of
   100         SOME t => Exn.Res (ConditionVar.waitUntil (cond, lock, t))
   101       | NONE => (ConditionVar.wait (cond, lock); Exn.Res true))
   102       handle exn => Exn.Exn exn);
   103 
   104 
   105 (* tracing *)
   106 
   107 val trace = ref 0;
   108 
   109 fun tracing level msg =
   110   if level > ! trace then ()
   111   else uninterruptible (fn _ => fn () =>
   112     (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
   113       handle _ (*sic*) => ()) ();
   114 
   115 fun tracing_time detailed time =
   116   tracing
   117    (if not detailed then 5
   118     else if Time.>= (time, seconds 1.0) then 1
   119     else if Time.>= (time, seconds 0.1) then 2
   120     else if Time.>= (time, seconds 0.01) then 3
   121     else if Time.>= (time, seconds 0.001) then 4 else 5);
   122 
   123 fun real_time f x =
   124   let
   125     val timer = Timer.startRealTimer ();
   126     val () = f x;
   127     val time = Timer.checkRealTimer timer;
   128   in time end;
   129 
   130 
   131 (* critical section -- may be nested within the same thread *)
   132 
   133 local
   134 
   135 val critical_lock = Mutex.mutex ();
   136 val critical_thread = ref (NONE: Thread.thread option);
   137 val critical_name = ref "";
   138 
   139 fun show "" = "" | show name = " " ^ name;
   140 fun show' "" = "" | show' name = " [" ^ name ^ "]";
   141 
   142 in
   143 
   144 fun self_critical () =
   145   (case ! critical_thread of
   146     NONE => false
   147   | SOME t => Thread.equal (t, Thread.self ()));
   148 
   149 fun NAMED_CRITICAL name e =
   150   if self_critical () then e ()
   151   else
   152     Exn.release (uninterruptible (fn restore_attributes => fn () =>
   153       let
   154         val name' = ! critical_name;
   155         val _ =
   156           if Mutex.trylock critical_lock then ()
   157           else
   158             let
   159               val _ = tracing 5 (fn () => "CRITICAL" ^ show name ^ show' name' ^ ": waiting");
   160               val time = real_time Mutex.lock critical_lock;
   161               val _ = tracing_time true time (fn () =>
   162                 "CRITICAL" ^ show name ^ show' name' ^ ": passed after " ^ Time.toString time);
   163             in () end;
   164         val _ = critical_thread := SOME (Thread.self ());
   165         val _ = critical_name := name;
   166         val result = Exn.capture (restore_attributes e) ();
   167         val _ = critical_name := "";
   168         val _ = critical_thread := NONE;
   169         val _ = Mutex.unlock critical_lock;
   170       in result end) ());
   171 
   172 fun CRITICAL e = NAMED_CRITICAL "" e;
   173 
   174 end;
   175 
   176 
   177 (* serial numbers *)
   178 
   179 local
   180 
   181 val serial_lock = Mutex.mutex ();
   182 val serial_count = ref 0;
   183 
   184 in
   185 
   186 val serial = uninterruptible (fn _ => fn () =>
   187   let
   188     val _ = Mutex.lock serial_lock;
   189     val _ = serial_count := ! serial_count + 1;
   190     val res = ! serial_count;
   191     val _ = Mutex.unlock serial_lock;
   192   in res end);
   193 
   194 end;
   195 
   196 end;
   197 
   198 structure Basic_Multithreading: BASIC_MULTITHREADING = Multithreading;
   199 open Basic_Multithreading;