support for managed external processes;
authorwenzelm
Fri Feb 15 23:22:02 2008 +0100 (2008-02-15 ago)
changeset 2607444c5419cd9f1
parent 26073 0e70d3bd2eb4
child 26075 815f3ccc0b45
support for managed external processes;
src/Pure/ML-Systems/multithreading.ML
src/Pure/ML-Systems/multithreading_polyml.ML
     1.1 --- a/src/Pure/ML-Systems/multithreading.ML	Fri Feb 15 17:36:21 2008 +0100
     1.2 +++ b/src/Pure/ML-Systems/multithreading.ML	Fri Feb 15 23:22:02 2008 +0100
     1.3 @@ -19,6 +19,7 @@
     1.4    val available: bool
     1.5    val max_threads: int ref
     1.6    val max_threads_value: unit -> int
     1.7 +  val managed_process: string -> string * bool
     1.8    val self_critical: unit -> bool
     1.9    datatype 'a task =
    1.10      Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
    1.11 @@ -41,6 +42,12 @@
    1.12  fun max_threads_value () = Int.max (! max_threads, 1);
    1.13  
    1.14  
    1.15 +(* managed external processes *)
    1.16 +
    1.17 +fun managed_process _ =
    1.18 +  raise Fail "No multithreading support -- cannot manage external processes";
    1.19 +
    1.20 +
    1.21  (* critical section *)
    1.22  
    1.23  fun self_critical () = false;
    1.24 @@ -53,7 +60,8 @@
    1.25  datatype 'a task =
    1.26    Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
    1.27  
    1.28 -fun schedule _ _ _ = raise Fail "No multithreading support";
    1.29 +fun schedule _ _ _ =
    1.30 +  raise Fail "No multithreading support -- cannot schedule tasks";
    1.31  
    1.32  
    1.33  (* serial numbers *)
     2.1 --- a/src/Pure/ML-Systems/multithreading_polyml.ML	Fri Feb 15 17:36:21 2008 +0100
     2.2 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Fri Feb 15 23:22:02 2008 +0100
     2.3 @@ -114,6 +114,46 @@
     2.4  end;
     2.5  
     2.6  
     2.7 +(* managed external processes -- with propagation of interrupts *)
     2.8 +
     2.9 +fun managed_process cmdline = uninterruptible (fn atts => fn () =>
    2.10 +  let
    2.11 +    val proc = Unix.execute (cmdline, []);
    2.12 +    val (proc_stdout, proc_stdin) = Unix.streamsOf proc;
    2.13 +    val _ = TextIO.closeOut proc_stdin;
    2.14 +
    2.15 +    (*finished state*)
    2.16 +    val finished = ref false;
    2.17 +    val finished_mutex = Mutex.mutex ();
    2.18 +    val finished_cond = ConditionVar.conditionVar ();
    2.19 +    fun signal_finished () =
    2.20 +      (Mutex.lock finished_mutex; finished := true; Mutex.unlock finished_mutex;
    2.21 +        ConditionVar.signal finished_cond);
    2.22 +
    2.23 +    val _ = Mutex.lock finished_mutex;
    2.24 +
    2.25 +    (*reader thread*)
    2.26 +    val buffer = ref [];
    2.27 +    fun reader () =
    2.28 +      (case Exn.capture TextIO.input proc_stdout of
    2.29 +        Exn.Exn Interrupt => ()
    2.30 +      | Exn.Exn _ => signal_finished ()
    2.31 +      | Exn.Result "" => signal_finished ()
    2.32 +      | Exn.Result txt => (change buffer (cons txt); reader ()));
    2.33 +    val reader_thread = Thread.fork (reader, []);
    2.34 +
    2.35 +    (*main thread*)
    2.36 +    val () =
    2.37 +      while not (! finished) do with_attributes atts (fn _ => fn () =>
    2.38 +        ((ConditionVar.waitUntil (finished_cond, finished_mutex, Time.now () + Time.fromSeconds 1); ())
    2.39 +          handle Interrupt => Unix.kill (proc, Posix.Signal.int))) ();  (* FIXME lock!?! *)
    2.40 +    val _ = Thread.interrupt reader_thread handle Thread _ => ();
    2.41 +
    2.42 +    val status = OS.Process.isSuccess (Unix.reap proc);
    2.43 +    val output = implode (rev (! buffer));
    2.44 +  in (output, status) end) ();
    2.45 +
    2.46 +
    2.47  (* critical section -- may be nested within the same thread *)
    2.48  
    2.49  local