support for managed external processes;
authorwenzelm
Fri, 15 Feb 2008 23:22:02 +0100
changeset 26074 44c5419cd9f1
parent 26073 0e70d3bd2eb4
child 26075 815f3ccc0b45
support for managed external processes;
src/Pure/ML-Systems/multithreading.ML
src/Pure/ML-Systems/multithreading_polyml.ML
--- a/src/Pure/ML-Systems/multithreading.ML	Fri Feb 15 17:36:21 2008 +0100
+++ b/src/Pure/ML-Systems/multithreading.ML	Fri Feb 15 23:22:02 2008 +0100
@@ -19,6 +19,7 @@
   val available: bool
   val max_threads: int ref
   val max_threads_value: unit -> int
+  val managed_process: string -> string * bool
   val self_critical: unit -> bool
   datatype 'a task =
     Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
@@ -41,6 +42,12 @@
 fun max_threads_value () = Int.max (! max_threads, 1);
 
 
+(* managed external processes *)
+
+fun managed_process _ =
+  raise Fail "No multithreading support -- cannot manage external processes";
+
+
 (* critical section *)
 
 fun self_critical () = false;
@@ -53,7 +60,8 @@
 datatype 'a task =
   Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
 
-fun schedule _ _ _ = raise Fail "No multithreading support";
+fun schedule _ _ _ =
+  raise Fail "No multithreading support -- cannot schedule tasks";
 
 
 (* serial numbers *)
--- a/src/Pure/ML-Systems/multithreading_polyml.ML	Fri Feb 15 17:36:21 2008 +0100
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Fri Feb 15 23:22:02 2008 +0100
@@ -114,6 +114,46 @@
 end;
 
 
+(* managed external processes -- with propagation of interrupts *)
+
+fun managed_process cmdline = uninterruptible (fn atts => fn () =>
+  let
+    val proc = Unix.execute (cmdline, []);
+    val (proc_stdout, proc_stdin) = Unix.streamsOf proc;
+    val _ = TextIO.closeOut proc_stdin;
+
+    (*finished state*)
+    val finished = ref false;
+    val finished_mutex = Mutex.mutex ();
+    val finished_cond = ConditionVar.conditionVar ();
+    fun signal_finished () =
+      (Mutex.lock finished_mutex; finished := true; Mutex.unlock finished_mutex;
+        ConditionVar.signal finished_cond);
+
+    val _ = Mutex.lock finished_mutex;
+
+    (*reader thread*)
+    val buffer = ref [];
+    fun reader () =
+      (case Exn.capture TextIO.input proc_stdout of
+        Exn.Exn Interrupt => ()
+      | Exn.Exn _ => signal_finished ()
+      | Exn.Result "" => signal_finished ()
+      | Exn.Result txt => (change buffer (cons txt); reader ()));
+    val reader_thread = Thread.fork (reader, []);
+
+    (*main thread*)
+    val () =
+      while not (! finished) do with_attributes atts (fn _ => fn () =>
+        ((ConditionVar.waitUntil (finished_cond, finished_mutex, Time.now () + Time.fromSeconds 1); ())
+          handle Interrupt => Unix.kill (proc, Posix.Signal.int))) ();  (* FIXME lock!?! *)
+    val _ = Thread.interrupt reader_thread handle Thread _ => ();
+
+    val status = OS.Process.isSuccess (Unix.reap proc);
+    val output = implode (rev (! buffer));
+  in (output, status) end) ();
+
+
 (* critical section -- may be nested within the same thread *)
 
 local