# HG changeset patch # User wenzelm # Date 1203114122 -3600 # Node ID 44c5419cd9f1a437c19303f0fd0984e30b88a53e # Parent 0e70d3bd2eb4a494fae8106e017dc6e1974bdd1b support for managed external processes; diff -r 0e70d3bd2eb4 -r 44c5419cd9f1 src/Pure/ML-Systems/multithreading.ML --- a/src/Pure/ML-Systems/multithreading.ML Fri Feb 15 17:36:21 2008 +0100 +++ b/src/Pure/ML-Systems/multithreading.ML Fri Feb 15 23:22:02 2008 +0100 @@ -19,6 +19,7 @@ val available: bool val max_threads: int ref val max_threads_value: unit -> int + val managed_process: string -> string * bool val self_critical: unit -> bool datatype 'a task = Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate; @@ -41,6 +42,12 @@ fun max_threads_value () = Int.max (! max_threads, 1); +(* managed external processes *) + +fun managed_process _ = + raise Fail "No multithreading support -- cannot manage external processes"; + + (* critical section *) fun self_critical () = false; @@ -53,7 +60,8 @@ datatype 'a task = Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate; -fun schedule _ _ _ = raise Fail "No multithreading support"; +fun schedule _ _ _ = + raise Fail "No multithreading support -- cannot schedule tasks"; (* serial numbers *) diff -r 0e70d3bd2eb4 -r 44c5419cd9f1 src/Pure/ML-Systems/multithreading_polyml.ML --- a/src/Pure/ML-Systems/multithreading_polyml.ML Fri Feb 15 17:36:21 2008 +0100 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML Fri Feb 15 23:22:02 2008 +0100 @@ -114,6 +114,46 @@ end; +(* managed external processes -- with propagation of interrupts *) + +fun managed_process cmdline = uninterruptible (fn atts => fn () => + let + val proc = Unix.execute (cmdline, []); + val (proc_stdout, proc_stdin) = Unix.streamsOf proc; + val _ = TextIO.closeOut proc_stdin; + + (*finished state*) + val finished = ref false; + val finished_mutex = Mutex.mutex (); + val finished_cond = ConditionVar.conditionVar (); + fun signal_finished () = + (Mutex.lock finished_mutex; finished := true; Mutex.unlock finished_mutex; + ConditionVar.signal finished_cond); + + val _ = Mutex.lock finished_mutex; + + (*reader thread*) + val buffer = ref []; + fun reader () = + (case Exn.capture TextIO.input proc_stdout of + Exn.Exn Interrupt => () + | Exn.Exn _ => signal_finished () + | Exn.Result "" => signal_finished () + | Exn.Result txt => (change buffer (cons txt); reader ())); + val reader_thread = Thread.fork (reader, []); + + (*main thread*) + val () = + while not (! finished) do with_attributes atts (fn _ => fn () => + ((ConditionVar.waitUntil (finished_cond, finished_mutex, Time.now () + Time.fromSeconds 1); ()) + handle Interrupt => Unix.kill (proc, Posix.Signal.int))) (); (* FIXME lock!?! *) + val _ = Thread.interrupt reader_thread handle Thread _ => (); + + val status = OS.Process.isSuccess (Unix.reap proc); + val output = implode (rev (! buffer)); + in (output, status) end) (); + + (* critical section -- may be nested within the same thread *) local