added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
authorwenzelm
Thu, 30 Jul 2009 23:06:06 +0200
changeset 32286 1fb5db48002d
parent 32285 ab9b66c2bbca
child 32291 2a9ba0bb7739
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/synchronized.ML
src/Pure/ML-Systems/multithreading.ML
src/Pure/ML-Systems/multithreading_polyml.ML
--- a/src/Pure/Concurrent/future.ML	Thu Jul 30 18:43:52 2009 +0200
+++ b/src/Pure/Concurrent/future.ML	Thu Jul 30 23:06:06 2009 +0200
@@ -120,11 +120,11 @@
 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
 
 fun wait cond = (*requires SYNCHRONIZED*)
-  ConditionVar.wait (cond, lock) handle Exn.Interrupt => ();
+  Multithreading.sync_wait NONE cond lock;
 
-fun wait_interruptible cond timeout = (*requires SYNCHRONIZED*)
+fun wait_interruptible timeout cond = (*requires SYNCHRONIZED*)
   interruptible (fn () =>
-    ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)))) ();
+    ignore (Multithreading.sync_wait (SOME (Time.+ (Time.now (), timeout))) cond lock)) ();
 
 fun signal cond = (*requires SYNCHRONIZED*)
   ConditionVar.signal cond;
@@ -194,7 +194,7 @@
 (* worker threads *)
 
 fun worker_wait cond = (*requires SYNCHRONIZED*)
- (change_active false; wait cond; change_active true);
+  (change_active false; wait cond; change_active true);
 
 fun worker_next () = (*requires SYNCHRONIZED*)
   if ! excessive > 0 then
@@ -272,7 +272,7 @@
       else (change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_work ());
 
     (*delay loop*)
-    val _ = wait_interruptible scheduler_event next_round
+    val _ = wait_interruptible next_round scheduler_event
       handle Exn.Interrupt =>
         (Multithreading.tracing 1 (fn () => "Interrupt");
           List.app do_cancel (Task_Queue.cancel_all (! queue)));
--- a/src/Pure/Concurrent/synchronized.ML	Thu Jul 30 18:43:52 2009 +0200
+++ b/src/Pure/Concurrent/synchronized.ML	Thu Jul 30 23:06:06 2009 +0200
@@ -48,11 +48,9 @@
           (case f x of
             SOME (y, x') => (var := x'; SOME y)
           | NONE =>
-              (case time_limit x of
-                NONE => (ConditionVar.wait (cond, lock); try_change ())
-              | SOME t =>
-                  if ConditionVar.waitUntil (cond, lock, t) then try_change ()
-                  else NONE))
+              if Multithreading.sync_wait (time_limit x) cond lock
+              then try_change ()
+              else NONE)
         end;
       val res = try_change ();
       val _ = ConditionVar.broadcast cond;
--- a/src/Pure/ML-Systems/multithreading.ML	Thu Jul 30 18:43:52 2009 +0200
+++ b/src/Pure/ML-Systems/multithreading.ML	Thu Jul 30 23:06:06 2009 +0200
@@ -26,6 +26,7 @@
   val restricted_interrupts: Thread.threadAttribute list
   val with_attributes: Thread.threadAttribute list ->
     (Thread.threadAttribute list -> 'a -> 'b) -> 'a -> 'b
+  val sync_wait: Time.time option -> ConditionVar.conditionVar -> Mutex.mutex -> bool
   val self_critical: unit -> bool
   val serial: unit -> int
 end;
@@ -48,6 +49,9 @@
 fun max_threads_value () = 1: int;
 fun enabled () = false;
 
+
+(* attributes *)
+
 val no_interrupts =
   [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer];
 
@@ -59,6 +63,8 @@
 
 fun with_attributes _ f x = f [] x;
 
+fun sync_wait _ _ _ = false;
+
 
 (* critical section *)
 
@@ -74,5 +80,5 @@
 
 end;
 
-structure BasicMultithreading: BASIC_MULTITHREADING = Multithreading;
-open BasicMultithreading;
+structure Basic_Multithreading: BASIC_MULTITHREADING = Multithreading;
+open Basic_Multithreading;
--- a/src/Pure/ML-Systems/multithreading_polyml.ML	Thu Jul 30 18:43:52 2009 +0200
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Thu Jul 30 23:06:06 2009 +0200
@@ -110,6 +110,9 @@
     val _ = Thread.setAttributes orig_atts;
   in Exn.release result end;
 
+
+(* regular interruptibility *)
+
 fun interruptible f x =
   (Thread.testInterrupt (); with_attributes regular_interrupts (fn _ => fn x => f x) x);
 
@@ -118,6 +121,29 @@
     f (fn g => with_attributes atts (fn _ => fn y => g y)) x);
 
 
+(* synchronous wait *)
+
+fun sync_attributes e =
+  let
+    val orig_atts = Thread.getAttributes ();
+    val broadcast =
+      (case List.find (fn Thread.EnableBroadcastInterrupt _ => true | _ => false) orig_atts of
+        NONE => Thread.EnableBroadcastInterrupt false
+      | SOME att => att);
+    val interrupt_state =
+      (case List.find (fn Thread.InterruptState _ => true | _ => false) orig_atts of
+        NONE => Thread.InterruptState Thread.InterruptDefer
+      | SOME (state as Thread.InterruptState Thread.InterruptDefer) => state
+      | _ => Thread.InterruptState Thread.InterruptSynch);
+  in with_attributes [broadcast, interrupt_state] (fn _ => fn () => e ()) () end;
+
+fun sync_wait time cond lock =
+  sync_attributes (fn () =>
+    (case time of
+      SOME t => ConditionVar.waitUntil (cond, lock, t)
+    | NONE => (ConditionVar.wait (cond, lock); true)));
+
+
 (* execution with time limit *)
 
 structure TimeLimit =
@@ -192,8 +218,9 @@
 
     val _ = while ! result = Wait do
       restore_attributes (fn () =>
-        (ConditionVar.waitUntil (result_cond, result_mutex, Time.now () + Time.fromMilliseconds 100); ())
-          handle Exn.Interrupt => kill 10) ();
+        (ignore (sync_wait (SOME (Time.+ (Time.now (), Time.fromMilliseconds 100)))
+            result_cond result_mutex)
+          handle Exn.Interrupt => kill 10)) ();
 
     (*cleanup*)
     val output = read_file output_name handle IO.Io _ => "";
@@ -269,5 +296,5 @@
 
 end;
 
-structure BasicMultithreading: BASIC_MULTITHREADING = Multithreading;
-open BasicMultithreading;
+structure Basic_Multithreading: BASIC_MULTITHREADING = Multithreading;
+open Basic_Multithreading;