wenzelm@23961: (* Title: Pure/ML-Systems/multithreading_polyml.ML wenzelm@23961: Author: Makarius wenzelm@23961: wenzelm@28254: Multithreading in Poly/ML 5.2 or later (cf. polyml/basis/Thread.sml). wenzelm@23961: *) wenzelm@23961: wenzelm@25704: signature MULTITHREADING_POLYML = wenzelm@25704: sig wenzelm@26083: val interruptible: ('a -> 'b) -> 'a -> 'b wenzelm@26083: val uninterruptible: ((('c -> 'd) -> 'c -> 'd) -> 'a -> 'b) -> 'a -> 'b wenzelm@26098: val system_out: string -> string * int wenzelm@25704: structure TimeLimit: TIME_LIMIT wenzelm@25704: end; wenzelm@25704: wenzelm@25704: signature BASIC_MULTITHREADING = wenzelm@25704: sig wenzelm@25704: include BASIC_MULTITHREADING wenzelm@25704: include MULTITHREADING_POLYML wenzelm@25704: end; wenzelm@25704: wenzelm@24208: signature MULTITHREADING = wenzelm@24208: sig wenzelm@24208: include MULTITHREADING wenzelm@25704: include MULTITHREADING_POLYML wenzelm@24208: end; wenzelm@24208: wenzelm@23961: structure Multithreading: MULTITHREADING = wenzelm@23961: struct wenzelm@23961: wenzelm@24072: (* options *) wenzelm@24069: wenzelm@24119: val trace = ref 0; wenzelm@28465: wenzelm@24119: fun tracing level msg = wenzelm@28465: if level > ! trace then () wenzelm@28465: else (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr) wenzelm@28465: handle _ (*sic*) => (); wenzelm@23961: wenzelm@23981: val available = true; wenzelm@25775: wenzelm@23973: val max_threads = ref 1; wenzelm@23973: wenzelm@25775: fun max_threads_value () = wenzelm@25775: let val m = ! max_threads wenzelm@28161: in if m <= 0 then Int.max (Thread.numProcessors (), 1) else m end; wenzelm@25775: wenzelm@28555: fun enabled () = max_threads_value () > 1; wenzelm@28555: wenzelm@23973: wenzelm@24069: (* misc utils *) wenzelm@24069: wenzelm@24208: fun show "" = "" | show name = " " ^ name; wenzelm@24208: fun show' "" = "" | show' name = " [" ^ name ^ "]"; wenzelm@24208: wenzelm@26098: fun read_file name = wenzelm@26098: let val is = TextIO.openIn name wenzelm@26504: in Exn.release (Exn.capture TextIO.inputAll is before TextIO.closeIn is) end; wenzelm@26098: wenzelm@26098: fun write_file name txt = wenzelm@26098: let val os = TextIO.openOut name wenzelm@26504: in Exn.release (Exn.capture TextIO.output (os, txt) before TextIO.closeOut os) end; wenzelm@26098: wenzelm@24208: wenzelm@24208: (* thread attributes *) wenzelm@24208: wenzelm@28161: val no_interrupts = wenzelm@28161: [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer]; wenzelm@28161: wenzelm@28161: val regular_interrupts = wenzelm@28161: [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce]; wenzelm@28161: wenzelm@28466: val safe_interrupts = map wenzelm@28466: (fn Thread.InterruptState Thread.InterruptAsynch => wenzelm@28466: Thread.InterruptState Thread.InterruptAsynchOnce wenzelm@28466: | x => x); wenzelm@28466: wenzelm@24208: fun with_attributes new_atts f x = wenzelm@24208: let wenzelm@29550: val orig_atts = safe_interrupts (Thread.getAttributes ()); wenzelm@24208: fun restore () = Thread.setAttributes orig_atts; wenzelm@28466: val result = wenzelm@28466: (Thread.setAttributes (safe_interrupts new_atts); wenzelm@28466: Exn.Result (f orig_atts x) before restore ()) wenzelm@29550: handle e => Exn.Exn e before restore (); wenzelm@28466: in Exn.release result end; wenzelm@24208: wenzelm@28161: fun interruptible f = with_attributes regular_interrupts (fn _ => f); wenzelm@28150: wenzelm@26083: fun uninterruptible f = wenzelm@28150: with_attributes no_interrupts (fn atts => f (fn g => with_attributes atts (fn _ => g))); wenzelm@24668: wenzelm@24688: wenzelm@24688: (* execution with time limit *) wenzelm@24688: wenzelm@24688: structure TimeLimit = wenzelm@24688: struct wenzelm@24688: wenzelm@24688: exception TimeOut; wenzelm@24688: wenzelm@26083: fun timeLimit time f x = uninterruptible (fn restore_attributes => fn () => wenzelm@26083: let wenzelm@26083: val worker = Thread.self (); wenzelm@26083: val timeout = ref false; wenzelm@26083: val watchdog = Thread.fork (fn () => wenzelm@26083: (OS.Process.sleep time; timeout := true; Thread.interrupt worker), []); wenzelm@24688: wenzelm@26083: val result = Exn.capture (restore_attributes f) x; wenzelm@28443: val was_timeout = (case result of Exn.Exn Exn.Interrupt => ! timeout | _ => false); wenzelm@24688: wenzelm@26083: val _ = Thread.interrupt watchdog handle Thread _ => (); wenzelm@26083: in if was_timeout then raise TimeOut else Exn.release result end) (); wenzelm@24688: wenzelm@24688: end; wenzelm@24668: wenzelm@24069: wenzelm@26098: (* system shell processes, with propagation of interrupts *) wenzelm@26098: wenzelm@28254: fun system_out script = uninterruptible (fn restore_attributes => fn () => wenzelm@26098: let wenzelm@26098: val script_name = OS.FileSys.tmpName (); wenzelm@26098: val _ = write_file script_name script; wenzelm@26098: wenzelm@26098: val pid_name = OS.FileSys.tmpName (); wenzelm@26098: val output_name = OS.FileSys.tmpName (); wenzelm@26098: wenzelm@26098: (*result state*) wenzelm@26098: datatype result = Wait | Signal | Result of int; wenzelm@26098: val result = ref Wait; wenzelm@26098: val result_mutex = Mutex.mutex (); wenzelm@26098: val result_cond = ConditionVar.conditionVar (); wenzelm@26098: fun set_result res = wenzelm@26098: (Mutex.lock result_mutex; result := res; Mutex.unlock result_mutex; wenzelm@26098: ConditionVar.signal result_cond); wenzelm@26098: wenzelm@26098: val _ = Mutex.lock result_mutex; wenzelm@26098: wenzelm@26098: (*system thread*) wenzelm@26098: val system_thread = Thread.fork (fn () => wenzelm@26098: let wenzelm@26098: val status = wenzelm@26098: OS.Process.system ("perl -w \"$ISABELLE_HOME/lib/scripts/system.pl\" group " ^ wenzelm@26098: script_name ^ " " ^ pid_name ^ " " ^ output_name); wenzelm@26098: val res = wenzelm@26098: (case Posix.Process.fromStatus status of wenzelm@26098: Posix.Process.W_EXITED => Result 0 wenzelm@26098: | Posix.Process.W_EXITSTATUS 0wx82 => Signal wenzelm@26098: | Posix.Process.W_EXITSTATUS w => Result (Word8.toInt w) wenzelm@26098: | Posix.Process.W_SIGNALED s => wenzelm@26098: if s = Posix.Signal.int then Signal wenzelm@26098: else Result (256 + LargeWord.toInt (Posix.Signal.toWord s)) wenzelm@26098: | Posix.Process.W_STOPPED s => Result (512 + LargeWord.toInt (Posix.Signal.toWord s))); wenzelm@28398: in set_result res end handle _ (*sic*) => set_result (Result 2), []); wenzelm@26098: wenzelm@26098: (*main thread -- proxy for interrupts*) wenzelm@26098: fun kill n = wenzelm@26098: (case Int.fromString (read_file pid_name) of wenzelm@26098: SOME pid => wenzelm@26098: Posix.Process.kill wenzelm@26098: (Posix.Process.K_GROUP (Posix.Process.wordToPid (LargeWord.fromInt pid)), wenzelm@26098: Posix.Signal.int) wenzelm@26098: | NONE => ()) wenzelm@26098: handle OS.SysErr _ => () | IO.Io _ => wenzelm@26098: (OS.Process.sleep (Time.fromMilliseconds 100); if n > 0 then kill (n - 1) else ()); wenzelm@26098: wenzelm@26098: val _ = while ! result = Wait do wenzelm@26098: restore_attributes (fn () => wenzelm@26098: (ConditionVar.waitUntil (result_cond, result_mutex, Time.now () + Time.fromMilliseconds 100); ()) wenzelm@28443: handle Exn.Interrupt => kill 10) (); wenzelm@26098: wenzelm@26098: (*cleanup*) wenzelm@26098: val output = read_file output_name handle IO.Io _ => ""; wenzelm@26098: val _ = OS.FileSys.remove script_name handle OS.SysErr _ => (); wenzelm@26098: val _ = OS.FileSys.remove pid_name handle OS.SysErr _ => (); wenzelm@26098: val _ = OS.FileSys.remove output_name handle OS.SysErr _ => (); wenzelm@26098: val _ = Thread.interrupt system_thread handle Thread _ => (); wenzelm@28443: val rc = (case ! result of Signal => raise Exn.Interrupt | Result rc => rc); wenzelm@26098: in (output, rc) end) (); wenzelm@26098: wenzelm@26098: wenzelm@23961: (* critical section -- may be nested within the same thread *) wenzelm@23961: wenzelm@23961: local wenzelm@23961: wenzelm@24063: val critical_lock = Mutex.mutex (); wenzelm@24063: val critical_thread = ref (NONE: Thread.thread option); wenzelm@24063: val critical_name = ref ""; wenzelm@24063: wenzelm@23961: in wenzelm@23961: wenzelm@23961: fun self_critical () = wenzelm@23961: (case ! critical_thread of wenzelm@23961: NONE => false wenzelm@28150: | SOME t => Thread.equal (t, Thread.self ())); wenzelm@23961: wenzelm@23991: fun NAMED_CRITICAL name e = wenzelm@23961: if self_critical () then e () wenzelm@23961: else wenzelm@26083: uninterruptible (fn restore_attributes => fn () => wenzelm@24208: let wenzelm@24208: val name' = ! critical_name; wenzelm@24208: val _ = wenzelm@24208: if Mutex.trylock critical_lock then () wenzelm@24208: else wenzelm@24208: let wenzelm@24208: val timer = Timer.startRealTimer (); wenzelm@24208: val _ = tracing 4 (fn () => "CRITICAL" ^ show name ^ show' name' ^ ": waiting"); wenzelm@24208: val _ = Mutex.lock critical_lock; wenzelm@24208: val time = Timer.checkRealTimer timer; wenzelm@26493: val trace_time = wenzelm@26493: if Time.>= (time, Time.fromMilliseconds 1000) then 1 wenzelm@26493: else if Time.>= (time, Time.fromMilliseconds 100) then 2 wenzelm@26493: else if Time.>= (time, Time.fromMilliseconds 10) then 3 else 4; wenzelm@26493: val _ = tracing trace_time (fn () => wenzelm@24208: "CRITICAL" ^ show name ^ show' name' ^ ": passed after " ^ Time.toString time); wenzelm@24208: in () end; wenzelm@24208: val _ = critical_thread := SOME (Thread.self ()); wenzelm@24208: val _ = critical_name := name; wenzelm@26083: val result = Exn.capture (restore_attributes e) (); wenzelm@24208: val _ = critical_name := ""; wenzelm@24208: val _ = critical_thread := NONE; wenzelm@24208: val _ = Mutex.unlock critical_lock; wenzelm@24208: in Exn.release result end) (); wenzelm@23961: wenzelm@23991: fun CRITICAL e = NAMED_CRITICAL "" e; wenzelm@23981: wenzelm@23961: end; wenzelm@23961: wenzelm@23973: wenzelm@25704: (* serial numbers *) wenzelm@25704: wenzelm@25704: local wenzelm@25704: wenzelm@25704: val serial_lock = Mutex.mutex (); wenzelm@25704: val serial_count = ref 0; wenzelm@25704: wenzelm@25704: in wenzelm@25704: wenzelm@25704: val serial = uninterruptible (fn _ => fn () => wenzelm@25704: let wenzelm@25704: val _ = Mutex.lock serial_lock; wenzelm@28124: val _ = serial_count := ! serial_count + 1; wenzelm@28124: val res = ! serial_count; wenzelm@25704: val _ = Mutex.unlock serial_lock; wenzelm@25704: in res end); wenzelm@25704: wenzelm@23961: end; wenzelm@23961: wenzelm@25704: end; wenzelm@24688: wenzelm@25704: structure BasicMultithreading: BASIC_MULTITHREADING = Multithreading; wenzelm@25704: open BasicMultithreading;