src/Pure/ML-Systems/multithreading_polyml.ML
author wenzelm
Sat Jul 25 00:39:05 2009 +0200 (2009-07-25 ago)
changeset 32185 57ecfab3bcfe
parent 32184 cfa0ef0c0c5f
child 32186 8026b73cd357
permissions -rw-r--r--
added Multithreading.real_time;
     1 (*  Title:      Pure/ML-Systems/multithreading_polyml.ML
     2     Author:     Makarius
     3 
     4 Multithreading in Poly/ML 5.2.1 or later (cf. polyml/basis/Thread.sml).
     5 *)
     6 
     7 signature MULTITHREADING_POLYML =
     8 sig
     9   val interruptible: ('a -> 'b) -> 'a -> 'b
    10   val uninterruptible: ((('c -> 'd) -> 'c -> 'd) -> 'a -> 'b) -> 'a -> 'b
    11   val system_out: string -> string * int
    12   structure TimeLimit: TIME_LIMIT
    13 end;
    14 
    15 signature BASIC_MULTITHREADING =
    16 sig
    17   include BASIC_MULTITHREADING
    18   include MULTITHREADING_POLYML
    19 end;
    20 
    21 signature MULTITHREADING =
    22 sig
    23   include MULTITHREADING
    24   include MULTITHREADING_POLYML
    25 end;
    26 
    27 structure Multithreading: MULTITHREADING =
    28 struct
    29 
    30 (* tracing *)
    31 
    32 val trace = ref 0;
    33 
    34 fun tracing level msg =
    35   if level > ! trace then ()
    36   else (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
    37     handle _ (*sic*) => ();
    38 
    39 fun tracing_time time =
    40   tracing
    41    (if Time.>= (time, Time.fromMilliseconds 1000) then 1
    42     else if Time.>= (time, Time.fromMilliseconds 100) then 2
    43     else if Time.>= (time, Time.fromMilliseconds 10) then 3
    44     else if Time.>= (time, Time.fromMilliseconds 1) then 4 else 5);
    45 
    46 fun real_time f x =
    47   let
    48     val timer = Timer.startRealTimer ();
    49     val () = f x;
    50     val time = Timer.checkRealTimer timer;
    51   in time end;
    52 
    53 
    54 (* options *)
    55 
    56 val available = true;
    57 
    58 val max_threads = ref 0;
    59 
    60 val tested_platform =
    61   let val ml_platform = getenv "ML_PLATFORM"
    62   in String.isSuffix "linux" ml_platform orelse String.isSuffix "darwin" ml_platform end;
    63 
    64 fun max_threads_value () =
    65   let val m = ! max_threads in
    66     if m > 0 then m
    67     else if not tested_platform then 1
    68     else Int.min (Int.max (Thread.numProcessors (), 1), 8)
    69   end;
    70 
    71 fun enabled () = max_threads_value () > 1;
    72 
    73 
    74 (* misc utils *)
    75 
    76 fun show "" = "" | show name = " " ^ name;
    77 fun show' "" = "" | show' name = " [" ^ name ^ "]";
    78 
    79 fun read_file name =
    80   let val is = TextIO.openIn name
    81   in Exn.release (Exn.capture TextIO.inputAll is before TextIO.closeIn is) end;
    82 
    83 fun write_file name txt =
    84   let val os = TextIO.openOut name
    85   in Exn.release (Exn.capture TextIO.output (os, txt) before TextIO.closeOut os) end;
    86 
    87 
    88 (* thread attributes *)
    89 
    90 val no_interrupts =
    91   [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer];
    92 
    93 val regular_interrupts =
    94   [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce];
    95 
    96 val restricted_interrupts =
    97   [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptAsynchOnce];
    98 
    99 val safe_interrupts = map
   100   (fn Thread.InterruptState Thread.InterruptAsynch =>
   101       Thread.InterruptState Thread.InterruptAsynchOnce
   102     | x => x);
   103 
   104 fun with_attributes new_atts f x =
   105   let
   106     val orig_atts = safe_interrupts (Thread.getAttributes ());
   107     val result = Exn.capture (fn () =>
   108       (Thread.setAttributes (safe_interrupts new_atts); f orig_atts x)) ();
   109     val _ = Thread.setAttributes orig_atts;
   110   in Exn.release result end;
   111 
   112 fun interruptible f =
   113   with_attributes regular_interrupts (fn _ => fn x => f x);
   114 
   115 fun uninterruptible f =
   116   with_attributes no_interrupts (fn atts => fn x =>
   117     f (fn g => with_attributes atts (fn _ => fn y => g y)) x);
   118 
   119 
   120 (* execution with time limit *)
   121 
   122 structure TimeLimit =
   123 struct
   124 
   125 exception TimeOut;
   126 
   127 fun timeLimit time f x = uninterruptible (fn restore_attributes => fn () =>
   128   let
   129     val worker = Thread.self ();
   130     val timeout = ref false;
   131     val watchdog = Thread.fork (fn () =>
   132       (OS.Process.sleep time; timeout := true; Thread.interrupt worker), []);
   133 
   134     val result = Exn.capture (restore_attributes f) x;
   135     val was_timeout = (case result of Exn.Exn Exn.Interrupt => ! timeout | _ => false);
   136 
   137     val _ = Thread.interrupt watchdog handle Thread _ => ();
   138   in if was_timeout then raise TimeOut else Exn.release result end) ();
   139 
   140 end;
   141 
   142 
   143 (* system shell processes, with propagation of interrupts *)
   144 
   145 fun system_out script = uninterruptible (fn restore_attributes => fn () =>
   146   let
   147     val script_name = OS.FileSys.tmpName ();
   148     val _ = write_file script_name script;
   149 
   150     val pid_name = OS.FileSys.tmpName ();
   151     val output_name = OS.FileSys.tmpName ();
   152 
   153     (*result state*)
   154     datatype result = Wait | Signal | Result of int;
   155     val result = ref Wait;
   156     val result_mutex = Mutex.mutex ();
   157     val result_cond = ConditionVar.conditionVar ();
   158     fun set_result res =
   159       (Mutex.lock result_mutex; result := res; Mutex.unlock result_mutex;
   160         ConditionVar.signal result_cond);
   161 
   162     val _ = Mutex.lock result_mutex;
   163 
   164     (*system thread*)
   165     val system_thread = Thread.fork (fn () =>
   166       let
   167         val status =
   168           OS.Process.system ("perl -w \"$ISABELLE_HOME/lib/scripts/system.pl\" group " ^
   169             script_name ^ " " ^ pid_name ^ " " ^ output_name);
   170         val res =
   171           (case Posix.Process.fromStatus status of
   172             Posix.Process.W_EXITED => Result 0
   173           | Posix.Process.W_EXITSTATUS 0wx82 => Signal
   174           | Posix.Process.W_EXITSTATUS w => Result (Word8.toInt w)
   175           | Posix.Process.W_SIGNALED s =>
   176               if s = Posix.Signal.int then Signal
   177               else Result (256 + LargeWord.toInt (Posix.Signal.toWord s))
   178           | Posix.Process.W_STOPPED s => Result (512 + LargeWord.toInt (Posix.Signal.toWord s)));
   179       in set_result res end handle _ (*sic*) => set_result (Result 2), []);
   180 
   181     (*main thread -- proxy for interrupts*)
   182     fun kill n =
   183       (case Int.fromString (read_file pid_name) of
   184         SOME pid =>
   185           Posix.Process.kill
   186             (Posix.Process.K_GROUP (Posix.Process.wordToPid (LargeWord.fromInt pid)),
   187               Posix.Signal.int)
   188       | NONE => ())
   189       handle OS.SysErr _ => () | IO.Io _ =>
   190         (OS.Process.sleep (Time.fromMilliseconds 100); if n > 0 then kill (n - 1) else ());
   191 
   192     val _ = while ! result = Wait do
   193       restore_attributes (fn () =>
   194         (ConditionVar.waitUntil (result_cond, result_mutex, Time.now () + Time.fromMilliseconds 100); ())
   195           handle Exn.Interrupt => kill 10) ();
   196 
   197     (*cleanup*)
   198     val output = read_file output_name handle IO.Io _ => "";
   199     val _ = OS.FileSys.remove script_name handle OS.SysErr _ => ();
   200     val _ = OS.FileSys.remove pid_name handle OS.SysErr _ => ();
   201     val _ = OS.FileSys.remove output_name handle OS.SysErr _ => ();
   202     val _ = Thread.interrupt system_thread handle Thread _ => ();
   203     val rc = (case ! result of Signal => raise Exn.Interrupt | Result rc => rc);
   204   in (output, rc) end) ();
   205 
   206 
   207 (* critical section -- may be nested within the same thread *)
   208 
   209 local
   210 
   211 val critical_lock = Mutex.mutex ();
   212 val critical_thread = ref (NONE: Thread.thread option);
   213 val critical_name = ref "";
   214 
   215 in
   216 
   217 fun self_critical () =
   218   (case ! critical_thread of
   219     NONE => false
   220   | SOME t => Thread.equal (t, Thread.self ()));
   221 
   222 fun NAMED_CRITICAL name e =
   223   if self_critical () then e ()
   224   else
   225     Exn.release (uninterruptible (fn restore_attributes => fn () =>
   226       let
   227         val name' = ! critical_name;
   228         val _ =
   229           if Mutex.trylock critical_lock then ()
   230           else
   231             let
   232               val _ = tracing 5 (fn () => "CRITICAL" ^ show name ^ show' name' ^ ": waiting");
   233               val time = real_time Mutex.lock critical_lock;
   234               val _ = tracing_time time (fn () =>
   235                 "CRITICAL" ^ show name ^ show' name' ^ ": passed after " ^ Time.toString time);
   236             in () end;
   237         val _ = critical_thread := SOME (Thread.self ());
   238         val _ = critical_name := name;
   239         val result = Exn.capture (restore_attributes e) ();
   240         val _ = critical_name := "";
   241         val _ = critical_thread := NONE;
   242         val _ = Mutex.unlock critical_lock;
   243       in result end) ());
   244 
   245 fun CRITICAL e = NAMED_CRITICAL "" e;
   246 
   247 end;
   248 
   249 
   250 (* serial numbers *)
   251 
   252 local
   253 
   254 val serial_lock = Mutex.mutex ();
   255 val serial_count = ref 0;
   256 
   257 in
   258 
   259 val serial = uninterruptible (fn _ => fn () =>
   260   let
   261     val _ = Mutex.lock serial_lock;
   262     val _ = serial_count := ! serial_count + 1;
   263     val res = ! serial_count;
   264     val _ = Mutex.unlock serial_lock;
   265   in res end);
   266 
   267 end;
   268 
   269 end;
   270 
   271 structure BasicMultithreading: BASIC_MULTITHREADING = Multithreading;
   272 open BasicMultithreading;