src/Pure/ML-Systems/multithreading_polyml.ML
changeset 26083 abb3f8dd66dc
parent 26074 44c5419cd9f1
child 26098 b59d33f73aed
--- a/src/Pure/ML-Systems/multithreading_polyml.ML	Sat Feb 16 16:44:00 2008 +0100
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Sat Feb 16 16:44:02 2008 +0100
@@ -9,8 +9,8 @@
 
 signature MULTITHREADING_POLYML =
 sig
-  val ignore_interrupt: ('a -> 'b) -> 'a -> 'b
-  val raise_interrupt: ('a -> 'b) -> 'a -> 'b
+  val interruptible: ('a -> 'b) -> 'a -> 'b
+  val uninterruptible: ((('c -> 'd) -> 'c -> 'd) -> 'a -> 'b) -> 'a -> 'b
   structure TimeLimit: TIME_LIMIT
 end;
 
@@ -76,17 +76,15 @@
       handle Interrupt => (restore (); Exn.Exn Interrupt))
   end;
 
-
-(* interrupt handling *)
-
-fun uninterruptible f x = with_attributes
-  [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer] f x;
+fun interruptible f =
+  with_attributes
+    [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce]
+    (fn _ => f);
 
-fun interruptible f x = with_attributes
-  [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce] f x;
-
-fun ignore_interrupt f = uninterruptible (fn _ => f);
-fun raise_interrupt f = interruptible (fn _ => f);
+fun uninterruptible f =
+  with_attributes
+    [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer]
+    (fn atts => f (fn g => with_attributes atts (fn _ => g)));
 
 
 (* execution with time limit *)
@@ -96,64 +94,23 @@
 
 exception TimeOut;
 
-fun timeLimit time f x =
-  uninterruptible (fn atts => fn () =>
-    let
-      val worker = Thread.self ();
-      val timeout = ref false;
-      val watchdog = Thread.fork (interruptible (fn _ => fn () =>
-        (OS.Process.sleep time; timeout := true; Thread.interrupt worker)), []);
+fun timeLimit time f x = uninterruptible (fn restore_attributes => fn () =>
+  let
+    val worker = Thread.self ();
+    val timeout = ref false;
+    val watchdog = Thread.fork (fn () =>
+      (OS.Process.sleep time; timeout := true; Thread.interrupt worker), []);
 
-      (*RACE! timeout signal vs. external Interrupt*)
-      val result = Exn.capture (with_attributes atts (fn _ => f)) x;
-      val was_timeout = (case result of Exn.Exn Interrupt => ! timeout | _ => false);
+    (*RACE! timeout signal vs. external Interrupt*)
+    val result = Exn.capture (restore_attributes f) x;
+    val was_timeout = (case result of Exn.Exn Interrupt => ! timeout | _ => false);
 
-      val _ = Thread.interrupt watchdog handle Thread _ => ();
-    in if was_timeout then raise TimeOut else Exn.release result end) ();
+    val _ = Thread.interrupt watchdog handle Thread _ => ();
+  in if was_timeout then raise TimeOut else Exn.release result end) ();
 
 end;
 
 
-(* managed external processes -- with propagation of interrupts *)
-
-fun managed_process cmdline = uninterruptible (fn atts => fn () =>
-  let
-    val proc = Unix.execute (cmdline, []);
-    val (proc_stdout, proc_stdin) = Unix.streamsOf proc;
-    val _ = TextIO.closeOut proc_stdin;
-
-    (*finished state*)
-    val finished = ref false;
-    val finished_mutex = Mutex.mutex ();
-    val finished_cond = ConditionVar.conditionVar ();
-    fun signal_finished () =
-      (Mutex.lock finished_mutex; finished := true; Mutex.unlock finished_mutex;
-        ConditionVar.signal finished_cond);
-
-    val _ = Mutex.lock finished_mutex;
-
-    (*reader thread*)
-    val buffer = ref [];
-    fun reader () =
-      (case Exn.capture TextIO.input proc_stdout of
-        Exn.Exn Interrupt => ()
-      | Exn.Exn _ => signal_finished ()
-      | Exn.Result "" => signal_finished ()
-      | Exn.Result txt => (change buffer (cons txt); reader ()));
-    val reader_thread = Thread.fork (reader, []);
-
-    (*main thread*)
-    val () =
-      while not (! finished) do with_attributes atts (fn _ => fn () =>
-        ((ConditionVar.waitUntil (finished_cond, finished_mutex, Time.now () + Time.fromSeconds 1); ())
-          handle Interrupt => Unix.kill (proc, Posix.Signal.int))) ();  (* FIXME lock!?! *)
-    val _ = Thread.interrupt reader_thread handle Thread _ => ();
-
-    val status = OS.Process.isSuccess (Unix.reap proc);
-    val output = implode (rev (! buffer));
-  in (output, status) end) ();
-
-
 (* critical section -- may be nested within the same thread *)
 
 local
@@ -172,7 +129,7 @@
 fun NAMED_CRITICAL name e =
   if self_critical () then e ()
   else
-    uninterruptible (fn atts => fn () =>
+    uninterruptible (fn restore_attributes => fn () =>
       let
         val name' = ! critical_name;
         val _ =
@@ -188,7 +145,7 @@
             in () end;
         val _ = critical_thread := SOME (Thread.self ());
         val _ = critical_name := name;
-        val result = Exn.capture (with_attributes atts (fn _ => e)) ();
+        val result = Exn.capture (restore_attributes e) ();
         val _ = critical_name := "";
         val _ = critical_thread := NONE;
         val _ = Mutex.unlock critical_lock;
@@ -204,7 +161,7 @@
 datatype 'a task =
   Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
 
-fun schedule n next_task = uninterruptible (fn _ => fn tasks =>
+fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks =>
   let
     (*protected execution*)
     val lock = Mutex.mutex ();
@@ -263,7 +220,7 @@
     fun worker () =
       (case PROTECTED "dequeue" dequeue of
         Task {body, cont, fail} =>
-          (case Exn.capture (interruptible (fn _ => body)) () of
+          (case Exn.capture (restore_attributes body) () of
             Exn.Result () =>
               (PROTECTED "cont" (fn () => (change queue cont; wakeup_all ())); worker ())
           | Exn.Exn exn =>