src/Pure/ML-Systems/multithreading_polyml.ML
changeset 26083 abb3f8dd66dc
parent 26074 44c5419cd9f1
child 26098 b59d33f73aed
     1.1 --- a/src/Pure/ML-Systems/multithreading_polyml.ML	Sat Feb 16 16:44:00 2008 +0100
     1.2 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Sat Feb 16 16:44:02 2008 +0100
     1.3 @@ -9,8 +9,8 @@
     1.4  
     1.5  signature MULTITHREADING_POLYML =
     1.6  sig
     1.7 -  val ignore_interrupt: ('a -> 'b) -> 'a -> 'b
     1.8 -  val raise_interrupt: ('a -> 'b) -> 'a -> 'b
     1.9 +  val interruptible: ('a -> 'b) -> 'a -> 'b
    1.10 +  val uninterruptible: ((('c -> 'd) -> 'c -> 'd) -> 'a -> 'b) -> 'a -> 'b
    1.11    structure TimeLimit: TIME_LIMIT
    1.12  end;
    1.13  
    1.14 @@ -76,17 +76,15 @@
    1.15        handle Interrupt => (restore (); Exn.Exn Interrupt))
    1.16    end;
    1.17  
    1.18 -
    1.19 -(* interrupt handling *)
    1.20 -
    1.21 -fun uninterruptible f x = with_attributes
    1.22 -  [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer] f x;
    1.23 +fun interruptible f =
    1.24 +  with_attributes
    1.25 +    [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce]
    1.26 +    (fn _ => f);
    1.27  
    1.28 -fun interruptible f x = with_attributes
    1.29 -  [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce] f x;
    1.30 -
    1.31 -fun ignore_interrupt f = uninterruptible (fn _ => f);
    1.32 -fun raise_interrupt f = interruptible (fn _ => f);
    1.33 +fun uninterruptible f =
    1.34 +  with_attributes
    1.35 +    [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer]
    1.36 +    (fn atts => f (fn g => with_attributes atts (fn _ => g)));
    1.37  
    1.38  
    1.39  (* execution with time limit *)
    1.40 @@ -96,64 +94,23 @@
    1.41  
    1.42  exception TimeOut;
    1.43  
    1.44 -fun timeLimit time f x =
    1.45 -  uninterruptible (fn atts => fn () =>
    1.46 -    let
    1.47 -      val worker = Thread.self ();
    1.48 -      val timeout = ref false;
    1.49 -      val watchdog = Thread.fork (interruptible (fn _ => fn () =>
    1.50 -        (OS.Process.sleep time; timeout := true; Thread.interrupt worker)), []);
    1.51 +fun timeLimit time f x = uninterruptible (fn restore_attributes => fn () =>
    1.52 +  let
    1.53 +    val worker = Thread.self ();
    1.54 +    val timeout = ref false;
    1.55 +    val watchdog = Thread.fork (fn () =>
    1.56 +      (OS.Process.sleep time; timeout := true; Thread.interrupt worker), []);
    1.57  
    1.58 -      (*RACE! timeout signal vs. external Interrupt*)
    1.59 -      val result = Exn.capture (with_attributes atts (fn _ => f)) x;
    1.60 -      val was_timeout = (case result of Exn.Exn Interrupt => ! timeout | _ => false);
    1.61 +    (*RACE! timeout signal vs. external Interrupt*)
    1.62 +    val result = Exn.capture (restore_attributes f) x;
    1.63 +    val was_timeout = (case result of Exn.Exn Interrupt => ! timeout | _ => false);
    1.64  
    1.65 -      val _ = Thread.interrupt watchdog handle Thread _ => ();
    1.66 -    in if was_timeout then raise TimeOut else Exn.release result end) ();
    1.67 +    val _ = Thread.interrupt watchdog handle Thread _ => ();
    1.68 +  in if was_timeout then raise TimeOut else Exn.release result end) ();
    1.69  
    1.70  end;
    1.71  
    1.72  
    1.73 -(* managed external processes -- with propagation of interrupts *)
    1.74 -
    1.75 -fun managed_process cmdline = uninterruptible (fn atts => fn () =>
    1.76 -  let
    1.77 -    val proc = Unix.execute (cmdline, []);
    1.78 -    val (proc_stdout, proc_stdin) = Unix.streamsOf proc;
    1.79 -    val _ = TextIO.closeOut proc_stdin;
    1.80 -
    1.81 -    (*finished state*)
    1.82 -    val finished = ref false;
    1.83 -    val finished_mutex = Mutex.mutex ();
    1.84 -    val finished_cond = ConditionVar.conditionVar ();
    1.85 -    fun signal_finished () =
    1.86 -      (Mutex.lock finished_mutex; finished := true; Mutex.unlock finished_mutex;
    1.87 -        ConditionVar.signal finished_cond);
    1.88 -
    1.89 -    val _ = Mutex.lock finished_mutex;
    1.90 -
    1.91 -    (*reader thread*)
    1.92 -    val buffer = ref [];
    1.93 -    fun reader () =
    1.94 -      (case Exn.capture TextIO.input proc_stdout of
    1.95 -        Exn.Exn Interrupt => ()
    1.96 -      | Exn.Exn _ => signal_finished ()
    1.97 -      | Exn.Result "" => signal_finished ()
    1.98 -      | Exn.Result txt => (change buffer (cons txt); reader ()));
    1.99 -    val reader_thread = Thread.fork (reader, []);
   1.100 -
   1.101 -    (*main thread*)
   1.102 -    val () =
   1.103 -      while not (! finished) do with_attributes atts (fn _ => fn () =>
   1.104 -        ((ConditionVar.waitUntil (finished_cond, finished_mutex, Time.now () + Time.fromSeconds 1); ())
   1.105 -          handle Interrupt => Unix.kill (proc, Posix.Signal.int))) ();  (* FIXME lock!?! *)
   1.106 -    val _ = Thread.interrupt reader_thread handle Thread _ => ();
   1.107 -
   1.108 -    val status = OS.Process.isSuccess (Unix.reap proc);
   1.109 -    val output = implode (rev (! buffer));
   1.110 -  in (output, status) end) ();
   1.111 -
   1.112 -
   1.113  (* critical section -- may be nested within the same thread *)
   1.114  
   1.115  local
   1.116 @@ -172,7 +129,7 @@
   1.117  fun NAMED_CRITICAL name e =
   1.118    if self_critical () then e ()
   1.119    else
   1.120 -    uninterruptible (fn atts => fn () =>
   1.121 +    uninterruptible (fn restore_attributes => fn () =>
   1.122        let
   1.123          val name' = ! critical_name;
   1.124          val _ =
   1.125 @@ -188,7 +145,7 @@
   1.126              in () end;
   1.127          val _ = critical_thread := SOME (Thread.self ());
   1.128          val _ = critical_name := name;
   1.129 -        val result = Exn.capture (with_attributes atts (fn _ => e)) ();
   1.130 +        val result = Exn.capture (restore_attributes e) ();
   1.131          val _ = critical_name := "";
   1.132          val _ = critical_thread := NONE;
   1.133          val _ = Mutex.unlock critical_lock;
   1.134 @@ -204,7 +161,7 @@
   1.135  datatype 'a task =
   1.136    Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
   1.137  
   1.138 -fun schedule n next_task = uninterruptible (fn _ => fn tasks =>
   1.139 +fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks =>
   1.140    let
   1.141      (*protected execution*)
   1.142      val lock = Mutex.mutex ();
   1.143 @@ -263,7 +220,7 @@
   1.144      fun worker () =
   1.145        (case PROTECTED "dequeue" dequeue of
   1.146          Task {body, cont, fail} =>
   1.147 -          (case Exn.capture (interruptible (fn _ => body)) () of
   1.148 +          (case Exn.capture (restore_attributes body) () of
   1.149              Exn.Result () =>
   1.150                (PROTECTED "cont" (fn () => (change queue cont; wakeup_all ())); worker ())
   1.151            | Exn.Exn exn =>