removed managed_process (cf. General/shell_process.ML);
replaced ignore/raise_interrupt by more flexible (un)interruptible combinators;
tuned timeLimit: sleep already interruptible by default;
schedule: restore attributes of body, instead of forcing interruptible execution;
--- a/src/Pure/ML-Systems/multithreading_polyml.ML Sat Feb 16 16:44:00 2008 +0100
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML Sat Feb 16 16:44:02 2008 +0100
@@ -9,8 +9,8 @@
signature MULTITHREADING_POLYML =
sig
- val ignore_interrupt: ('a -> 'b) -> 'a -> 'b
- val raise_interrupt: ('a -> 'b) -> 'a -> 'b
+ val interruptible: ('a -> 'b) -> 'a -> 'b
+ val uninterruptible: ((('c -> 'd) -> 'c -> 'd) -> 'a -> 'b) -> 'a -> 'b
structure TimeLimit: TIME_LIMIT
end;
@@ -76,17 +76,15 @@
handle Interrupt => (restore (); Exn.Exn Interrupt))
end;
-
-(* interrupt handling *)
-
-fun uninterruptible f x = with_attributes
- [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer] f x;
+fun interruptible f =
+ with_attributes
+ [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce]
+ (fn _ => f);
-fun interruptible f x = with_attributes
- [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce] f x;
-
-fun ignore_interrupt f = uninterruptible (fn _ => f);
-fun raise_interrupt f = interruptible (fn _ => f);
+fun uninterruptible f =
+ with_attributes
+ [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer]
+ (fn atts => f (fn g => with_attributes atts (fn _ => g)));
(* execution with time limit *)
@@ -96,64 +94,23 @@
exception TimeOut;
-fun timeLimit time f x =
- uninterruptible (fn atts => fn () =>
- let
- val worker = Thread.self ();
- val timeout = ref false;
- val watchdog = Thread.fork (interruptible (fn _ => fn () =>
- (OS.Process.sleep time; timeout := true; Thread.interrupt worker)), []);
+fun timeLimit time f x = uninterruptible (fn restore_attributes => fn () =>
+ let
+ val worker = Thread.self ();
+ val timeout = ref false;
+ val watchdog = Thread.fork (fn () =>
+ (OS.Process.sleep time; timeout := true; Thread.interrupt worker), []);
- (*RACE! timeout signal vs. external Interrupt*)
- val result = Exn.capture (with_attributes atts (fn _ => f)) x;
- val was_timeout = (case result of Exn.Exn Interrupt => ! timeout | _ => false);
+ (*RACE! timeout signal vs. external Interrupt*)
+ val result = Exn.capture (restore_attributes f) x;
+ val was_timeout = (case result of Exn.Exn Interrupt => ! timeout | _ => false);
- val _ = Thread.interrupt watchdog handle Thread _ => ();
- in if was_timeout then raise TimeOut else Exn.release result end) ();
+ val _ = Thread.interrupt watchdog handle Thread _ => ();
+ in if was_timeout then raise TimeOut else Exn.release result end) ();
end;
-(* managed external processes -- with propagation of interrupts *)
-
-fun managed_process cmdline = uninterruptible (fn atts => fn () =>
- let
- val proc = Unix.execute (cmdline, []);
- val (proc_stdout, proc_stdin) = Unix.streamsOf proc;
- val _ = TextIO.closeOut proc_stdin;
-
- (*finished state*)
- val finished = ref false;
- val finished_mutex = Mutex.mutex ();
- val finished_cond = ConditionVar.conditionVar ();
- fun signal_finished () =
- (Mutex.lock finished_mutex; finished := true; Mutex.unlock finished_mutex;
- ConditionVar.signal finished_cond);
-
- val _ = Mutex.lock finished_mutex;
-
- (*reader thread*)
- val buffer = ref [];
- fun reader () =
- (case Exn.capture TextIO.input proc_stdout of
- Exn.Exn Interrupt => ()
- | Exn.Exn _ => signal_finished ()
- | Exn.Result "" => signal_finished ()
- | Exn.Result txt => (change buffer (cons txt); reader ()));
- val reader_thread = Thread.fork (reader, []);
-
- (*main thread*)
- val () =
- while not (! finished) do with_attributes atts (fn _ => fn () =>
- ((ConditionVar.waitUntil (finished_cond, finished_mutex, Time.now () + Time.fromSeconds 1); ())
- handle Interrupt => Unix.kill (proc, Posix.Signal.int))) (); (* FIXME lock!?! *)
- val _ = Thread.interrupt reader_thread handle Thread _ => ();
-
- val status = OS.Process.isSuccess (Unix.reap proc);
- val output = implode (rev (! buffer));
- in (output, status) end) ();
-
-
(* critical section -- may be nested within the same thread *)
local
@@ -172,7 +129,7 @@
fun NAMED_CRITICAL name e =
if self_critical () then e ()
else
- uninterruptible (fn atts => fn () =>
+ uninterruptible (fn restore_attributes => fn () =>
let
val name' = ! critical_name;
val _ =
@@ -188,7 +145,7 @@
in () end;
val _ = critical_thread := SOME (Thread.self ());
val _ = critical_name := name;
- val result = Exn.capture (with_attributes atts (fn _ => e)) ();
+ val result = Exn.capture (restore_attributes e) ();
val _ = critical_name := "";
val _ = critical_thread := NONE;
val _ = Mutex.unlock critical_lock;
@@ -204,7 +161,7 @@
datatype 'a task =
Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
-fun schedule n next_task = uninterruptible (fn _ => fn tasks =>
+fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks =>
let
(*protected execution*)
val lock = Mutex.mutex ();
@@ -263,7 +220,7 @@
fun worker () =
(case PROTECTED "dequeue" dequeue of
Task {body, cont, fail} =>
- (case Exn.capture (interruptible (fn _ => body)) () of
+ (case Exn.capture (restore_attributes body) () of
Exn.Result () =>
(PROTECTED "cont" (fn () => (change queue cont; wakeup_all ())); worker ())
| Exn.Exn exn =>