# HG changeset patch # User wenzelm # Date 1203176642 -3600 # Node ID abb3f8dd66dcb7bc0fcfc1b03b87fd56c56b2352 # Parent ea11278a0300cd69057ec8987c73d7a23f3e1b82 removed managed_process (cf. General/shell_process.ML); replaced ignore/raise_interrupt by more flexible (un)interruptible combinators; tuned timeLimit: sleep already interruptible by default; schedule: restore attributes of body, instead of forcing interruptible execution; diff -r ea11278a0300 -r abb3f8dd66dc src/Pure/ML-Systems/multithreading_polyml.ML --- a/src/Pure/ML-Systems/multithreading_polyml.ML Sat Feb 16 16:44:00 2008 +0100 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML Sat Feb 16 16:44:02 2008 +0100 @@ -9,8 +9,8 @@ signature MULTITHREADING_POLYML = sig - val ignore_interrupt: ('a -> 'b) -> 'a -> 'b - val raise_interrupt: ('a -> 'b) -> 'a -> 'b + val interruptible: ('a -> 'b) -> 'a -> 'b + val uninterruptible: ((('c -> 'd) -> 'c -> 'd) -> 'a -> 'b) -> 'a -> 'b structure TimeLimit: TIME_LIMIT end; @@ -76,17 +76,15 @@ handle Interrupt => (restore (); Exn.Exn Interrupt)) end; - -(* interrupt handling *) - -fun uninterruptible f x = with_attributes - [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer] f x; +fun interruptible f = + with_attributes + [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce] + (fn _ => f); -fun interruptible f x = with_attributes - [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce] f x; - -fun ignore_interrupt f = uninterruptible (fn _ => f); -fun raise_interrupt f = interruptible (fn _ => f); +fun uninterruptible f = + with_attributes + [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer] + (fn atts => f (fn g => with_attributes atts (fn _ => g))); (* execution with time limit *) @@ -96,64 +94,23 @@ exception TimeOut; -fun timeLimit time f x = - uninterruptible (fn atts => fn () => - let - val worker = Thread.self (); - val timeout = ref false; - val watchdog = Thread.fork (interruptible (fn _ => fn () => - (OS.Process.sleep time; timeout := true; Thread.interrupt worker)), []); +fun timeLimit time f x = uninterruptible (fn restore_attributes => fn () => + let + val worker = Thread.self (); + val timeout = ref false; + val watchdog = Thread.fork (fn () => + (OS.Process.sleep time; timeout := true; Thread.interrupt worker), []); - (*RACE! timeout signal vs. external Interrupt*) - val result = Exn.capture (with_attributes atts (fn _ => f)) x; - val was_timeout = (case result of Exn.Exn Interrupt => ! timeout | _ => false); + (*RACE! timeout signal vs. external Interrupt*) + val result = Exn.capture (restore_attributes f) x; + val was_timeout = (case result of Exn.Exn Interrupt => ! timeout | _ => false); - val _ = Thread.interrupt watchdog handle Thread _ => (); - in if was_timeout then raise TimeOut else Exn.release result end) (); + val _ = Thread.interrupt watchdog handle Thread _ => (); + in if was_timeout then raise TimeOut else Exn.release result end) (); end; -(* managed external processes -- with propagation of interrupts *) - -fun managed_process cmdline = uninterruptible (fn atts => fn () => - let - val proc = Unix.execute (cmdline, []); - val (proc_stdout, proc_stdin) = Unix.streamsOf proc; - val _ = TextIO.closeOut proc_stdin; - - (*finished state*) - val finished = ref false; - val finished_mutex = Mutex.mutex (); - val finished_cond = ConditionVar.conditionVar (); - fun signal_finished () = - (Mutex.lock finished_mutex; finished := true; Mutex.unlock finished_mutex; - ConditionVar.signal finished_cond); - - val _ = Mutex.lock finished_mutex; - - (*reader thread*) - val buffer = ref []; - fun reader () = - (case Exn.capture TextIO.input proc_stdout of - Exn.Exn Interrupt => () - | Exn.Exn _ => signal_finished () - | Exn.Result "" => signal_finished () - | Exn.Result txt => (change buffer (cons txt); reader ())); - val reader_thread = Thread.fork (reader, []); - - (*main thread*) - val () = - while not (! finished) do with_attributes atts (fn _ => fn () => - ((ConditionVar.waitUntil (finished_cond, finished_mutex, Time.now () + Time.fromSeconds 1); ()) - handle Interrupt => Unix.kill (proc, Posix.Signal.int))) (); (* FIXME lock!?! *) - val _ = Thread.interrupt reader_thread handle Thread _ => (); - - val status = OS.Process.isSuccess (Unix.reap proc); - val output = implode (rev (! buffer)); - in (output, status) end) (); - - (* critical section -- may be nested within the same thread *) local @@ -172,7 +129,7 @@ fun NAMED_CRITICAL name e = if self_critical () then e () else - uninterruptible (fn atts => fn () => + uninterruptible (fn restore_attributes => fn () => let val name' = ! critical_name; val _ = @@ -188,7 +145,7 @@ in () end; val _ = critical_thread := SOME (Thread.self ()); val _ = critical_name := name; - val result = Exn.capture (with_attributes atts (fn _ => e)) (); + val result = Exn.capture (restore_attributes e) (); val _ = critical_name := ""; val _ = critical_thread := NONE; val _ = Mutex.unlock critical_lock; @@ -204,7 +161,7 @@ datatype 'a task = Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate; -fun schedule n next_task = uninterruptible (fn _ => fn tasks => +fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks => let (*protected execution*) val lock = Mutex.mutex (); @@ -263,7 +220,7 @@ fun worker () = (case PROTECTED "dequeue" dequeue of Task {body, cont, fail} => - (case Exn.capture (interruptible (fn _ => body)) () of + (case Exn.capture (restore_attributes body) () of Exn.Result () => (PROTECTED "cont" (fn () => (change queue cont; wakeup_all ())); worker ()) | Exn.Exn exn =>