--- a/src/Pure/ML-Systems/multithreading.ML Fri Feb 15 17:36:21 2008 +0100
+++ b/src/Pure/ML-Systems/multithreading.ML Fri Feb 15 23:22:02 2008 +0100
@@ -19,6 +19,7 @@
val available: bool
val max_threads: int ref
val max_threads_value: unit -> int
+ val managed_process: string -> string * bool
val self_critical: unit -> bool
datatype 'a task =
Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
@@ -41,6 +42,12 @@
fun max_threads_value () = Int.max (! max_threads, 1);
+(* managed external processes *)
+
+fun managed_process _ =
+ raise Fail "No multithreading support -- cannot manage external processes";
+
+
(* critical section *)
fun self_critical () = false;
@@ -53,7 +60,8 @@
datatype 'a task =
Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
-fun schedule _ _ _ = raise Fail "No multithreading support";
+fun schedule _ _ _ =
+ raise Fail "No multithreading support -- cannot schedule tasks";
(* serial numbers *)
--- a/src/Pure/ML-Systems/multithreading_polyml.ML Fri Feb 15 17:36:21 2008 +0100
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML Fri Feb 15 23:22:02 2008 +0100
@@ -114,6 +114,46 @@
end;
+(* managed external processes -- with propagation of interrupts *)
+
+fun managed_process cmdline = uninterruptible (fn atts => fn () =>
+ let
+ val proc = Unix.execute (cmdline, []);
+ val (proc_stdout, proc_stdin) = Unix.streamsOf proc;
+ val _ = TextIO.closeOut proc_stdin;
+
+ (*finished state*)
+ val finished = ref false;
+ val finished_mutex = Mutex.mutex ();
+ val finished_cond = ConditionVar.conditionVar ();
+ fun signal_finished () =
+ (Mutex.lock finished_mutex; finished := true; Mutex.unlock finished_mutex;
+ ConditionVar.signal finished_cond);
+
+ val _ = Mutex.lock finished_mutex;
+
+ (*reader thread*)
+ val buffer = ref [];
+ fun reader () =
+ (case Exn.capture TextIO.input proc_stdout of
+ Exn.Exn Interrupt => ()
+ | Exn.Exn _ => signal_finished ()
+ | Exn.Result "" => signal_finished ()
+ | Exn.Result txt => (change buffer (cons txt); reader ()));
+ val reader_thread = Thread.fork (reader, []);
+
+ (*main thread*)
+ val () =
+ while not (! finished) do with_attributes atts (fn _ => fn () =>
+ ((ConditionVar.waitUntil (finished_cond, finished_mutex, Time.now () + Time.fromSeconds 1); ())
+ handle Interrupt => Unix.kill (proc, Posix.Signal.int))) (); (* FIXME lock!?! *)
+ val _ = Thread.interrupt reader_thread handle Thread _ => ();
+
+ val status = OS.Process.isSuccess (Unix.reap proc);
+ val output = implode (rev (! buffer));
+ in (output, status) end) ();
+
+
(* critical section -- may be nested within the same thread *)
local