added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
--- a/src/Pure/Concurrent/future.ML Thu Jul 30 18:43:52 2009 +0200
+++ b/src/Pure/Concurrent/future.ML Thu Jul 30 23:06:06 2009 +0200
@@ -120,11 +120,11 @@
fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
fun wait cond = (*requires SYNCHRONIZED*)
- ConditionVar.wait (cond, lock) handle Exn.Interrupt => ();
+ Multithreading.sync_wait NONE cond lock;
-fun wait_interruptible cond timeout = (*requires SYNCHRONIZED*)
+fun wait_interruptible timeout cond = (*requires SYNCHRONIZED*)
interruptible (fn () =>
- ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)))) ();
+ ignore (Multithreading.sync_wait (SOME (Time.+ (Time.now (), timeout))) cond lock)) ();
fun signal cond = (*requires SYNCHRONIZED*)
ConditionVar.signal cond;
@@ -194,7 +194,7 @@
(* worker threads *)
fun worker_wait cond = (*requires SYNCHRONIZED*)
- (change_active false; wait cond; change_active true);
+ (change_active false; wait cond; change_active true);
fun worker_next () = (*requires SYNCHRONIZED*)
if ! excessive > 0 then
@@ -272,7 +272,7 @@
else (change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_work ());
(*delay loop*)
- val _ = wait_interruptible scheduler_event next_round
+ val _ = wait_interruptible next_round scheduler_event
handle Exn.Interrupt =>
(Multithreading.tracing 1 (fn () => "Interrupt");
List.app do_cancel (Task_Queue.cancel_all (! queue)));
--- a/src/Pure/Concurrent/synchronized.ML Thu Jul 30 18:43:52 2009 +0200
+++ b/src/Pure/Concurrent/synchronized.ML Thu Jul 30 23:06:06 2009 +0200
@@ -48,11 +48,9 @@
(case f x of
SOME (y, x') => (var := x'; SOME y)
| NONE =>
- (case time_limit x of
- NONE => (ConditionVar.wait (cond, lock); try_change ())
- | SOME t =>
- if ConditionVar.waitUntil (cond, lock, t) then try_change ()
- else NONE))
+ if Multithreading.sync_wait (time_limit x) cond lock
+ then try_change ()
+ else NONE)
end;
val res = try_change ();
val _ = ConditionVar.broadcast cond;
--- a/src/Pure/ML-Systems/multithreading.ML Thu Jul 30 18:43:52 2009 +0200
+++ b/src/Pure/ML-Systems/multithreading.ML Thu Jul 30 23:06:06 2009 +0200
@@ -26,6 +26,7 @@
val restricted_interrupts: Thread.threadAttribute list
val with_attributes: Thread.threadAttribute list ->
(Thread.threadAttribute list -> 'a -> 'b) -> 'a -> 'b
+ val sync_wait: Time.time option -> ConditionVar.conditionVar -> Mutex.mutex -> bool
val self_critical: unit -> bool
val serial: unit -> int
end;
@@ -48,6 +49,9 @@
fun max_threads_value () = 1: int;
fun enabled () = false;
+
+(* attributes *)
+
val no_interrupts =
[Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer];
@@ -59,6 +63,8 @@
fun with_attributes _ f x = f [] x;
+fun sync_wait _ _ _ = false;
+
(* critical section *)
@@ -74,5 +80,5 @@
end;
-structure BasicMultithreading: BASIC_MULTITHREADING = Multithreading;
-open BasicMultithreading;
+structure Basic_Multithreading: BASIC_MULTITHREADING = Multithreading;
+open Basic_Multithreading;
--- a/src/Pure/ML-Systems/multithreading_polyml.ML Thu Jul 30 18:43:52 2009 +0200
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML Thu Jul 30 23:06:06 2009 +0200
@@ -110,6 +110,9 @@
val _ = Thread.setAttributes orig_atts;
in Exn.release result end;
+
+(* regular interruptibility *)
+
fun interruptible f x =
(Thread.testInterrupt (); with_attributes regular_interrupts (fn _ => fn x => f x) x);
@@ -118,6 +121,29 @@
f (fn g => with_attributes atts (fn _ => fn y => g y)) x);
+(* synchronous wait *)
+
+fun sync_attributes e =
+ let
+ val orig_atts = Thread.getAttributes ();
+ val broadcast =
+ (case List.find (fn Thread.EnableBroadcastInterrupt _ => true | _ => false) orig_atts of
+ NONE => Thread.EnableBroadcastInterrupt false
+ | SOME att => att);
+ val interrupt_state =
+ (case List.find (fn Thread.InterruptState _ => true | _ => false) orig_atts of
+ NONE => Thread.InterruptState Thread.InterruptDefer
+ | SOME (state as Thread.InterruptState Thread.InterruptDefer) => state
+ | _ => Thread.InterruptState Thread.InterruptSynch);
+ in with_attributes [broadcast, interrupt_state] (fn _ => fn () => e ()) () end;
+
+fun sync_wait time cond lock =
+ sync_attributes (fn () =>
+ (case time of
+ SOME t => ConditionVar.waitUntil (cond, lock, t)
+ | NONE => (ConditionVar.wait (cond, lock); true)));
+
+
(* execution with time limit *)
structure TimeLimit =
@@ -192,8 +218,9 @@
val _ = while ! result = Wait do
restore_attributes (fn () =>
- (ConditionVar.waitUntil (result_cond, result_mutex, Time.now () + Time.fromMilliseconds 100); ())
- handle Exn.Interrupt => kill 10) ();
+ (ignore (sync_wait (SOME (Time.+ (Time.now (), Time.fromMilliseconds 100)))
+ result_cond result_mutex)
+ handle Exn.Interrupt => kill 10)) ();
(*cleanup*)
val output = read_file output_name handle IO.Io _ => "";
@@ -269,5 +296,5 @@
end;
-structure BasicMultithreading: BASIC_MULTITHREADING = Multithreading;
-open BasicMultithreading;
+structure Basic_Multithreading: BASIC_MULTITHREADING = Multithreading;
+open Basic_Multithreading;