# HG changeset patch # User wenzelm # Date 1248987966 -7200 # Node ID 1fb5db48002d8c4ca8877c9ba7b131846bdc3a07 # Parent ab9b66c2bbca7f655fc0e40b374346658f939e1a added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted; diff -r ab9b66c2bbca -r 1fb5db48002d src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Thu Jul 30 18:43:52 2009 +0200 +++ b/src/Pure/Concurrent/future.ML Thu Jul 30 23:06:06 2009 +0200 @@ -120,11 +120,11 @@ fun SYNCHRONIZED name = SimpleThread.synchronized name lock; fun wait cond = (*requires SYNCHRONIZED*) - ConditionVar.wait (cond, lock) handle Exn.Interrupt => (); + Multithreading.sync_wait NONE cond lock; -fun wait_interruptible cond timeout = (*requires SYNCHRONIZED*) +fun wait_interruptible timeout cond = (*requires SYNCHRONIZED*) interruptible (fn () => - ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)))) (); + ignore (Multithreading.sync_wait (SOME (Time.+ (Time.now (), timeout))) cond lock)) (); fun signal cond = (*requires SYNCHRONIZED*) ConditionVar.signal cond; @@ -194,7 +194,7 @@ (* worker threads *) fun worker_wait cond = (*requires SYNCHRONIZED*) - (change_active false; wait cond; change_active true); + (change_active false; wait cond; change_active true); fun worker_next () = (*requires SYNCHRONIZED*) if ! excessive > 0 then @@ -272,7 +272,7 @@ else (change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_work ()); (*delay loop*) - val _ = wait_interruptible scheduler_event next_round + val _ = wait_interruptible next_round scheduler_event handle Exn.Interrupt => (Multithreading.tracing 1 (fn () => "Interrupt"); List.app do_cancel (Task_Queue.cancel_all (! queue))); diff -r ab9b66c2bbca -r 1fb5db48002d src/Pure/Concurrent/synchronized.ML --- a/src/Pure/Concurrent/synchronized.ML Thu Jul 30 18:43:52 2009 +0200 +++ b/src/Pure/Concurrent/synchronized.ML Thu Jul 30 23:06:06 2009 +0200 @@ -48,11 +48,9 @@ (case f x of SOME (y, x') => (var := x'; SOME y) | NONE => - (case time_limit x of - NONE => (ConditionVar.wait (cond, lock); try_change ()) - | SOME t => - if ConditionVar.waitUntil (cond, lock, t) then try_change () - else NONE)) + if Multithreading.sync_wait (time_limit x) cond lock + then try_change () + else NONE) end; val res = try_change (); val _ = ConditionVar.broadcast cond; diff -r ab9b66c2bbca -r 1fb5db48002d src/Pure/ML-Systems/multithreading.ML --- a/src/Pure/ML-Systems/multithreading.ML Thu Jul 30 18:43:52 2009 +0200 +++ b/src/Pure/ML-Systems/multithreading.ML Thu Jul 30 23:06:06 2009 +0200 @@ -26,6 +26,7 @@ val restricted_interrupts: Thread.threadAttribute list val with_attributes: Thread.threadAttribute list -> (Thread.threadAttribute list -> 'a -> 'b) -> 'a -> 'b + val sync_wait: Time.time option -> ConditionVar.conditionVar -> Mutex.mutex -> bool val self_critical: unit -> bool val serial: unit -> int end; @@ -48,6 +49,9 @@ fun max_threads_value () = 1: int; fun enabled () = false; + +(* attributes *) + val no_interrupts = [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer]; @@ -59,6 +63,8 @@ fun with_attributes _ f x = f [] x; +fun sync_wait _ _ _ = false; + (* critical section *) @@ -74,5 +80,5 @@ end; -structure BasicMultithreading: BASIC_MULTITHREADING = Multithreading; -open BasicMultithreading; +structure Basic_Multithreading: BASIC_MULTITHREADING = Multithreading; +open Basic_Multithreading; diff -r ab9b66c2bbca -r 1fb5db48002d src/Pure/ML-Systems/multithreading_polyml.ML --- a/src/Pure/ML-Systems/multithreading_polyml.ML Thu Jul 30 18:43:52 2009 +0200 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML Thu Jul 30 23:06:06 2009 +0200 @@ -110,6 +110,9 @@ val _ = Thread.setAttributes orig_atts; in Exn.release result end; + +(* regular interruptibility *) + fun interruptible f x = (Thread.testInterrupt (); with_attributes regular_interrupts (fn _ => fn x => f x) x); @@ -118,6 +121,29 @@ f (fn g => with_attributes atts (fn _ => fn y => g y)) x); +(* synchronous wait *) + +fun sync_attributes e = + let + val orig_atts = Thread.getAttributes (); + val broadcast = + (case List.find (fn Thread.EnableBroadcastInterrupt _ => true | _ => false) orig_atts of + NONE => Thread.EnableBroadcastInterrupt false + | SOME att => att); + val interrupt_state = + (case List.find (fn Thread.InterruptState _ => true | _ => false) orig_atts of + NONE => Thread.InterruptState Thread.InterruptDefer + | SOME (state as Thread.InterruptState Thread.InterruptDefer) => state + | _ => Thread.InterruptState Thread.InterruptSynch); + in with_attributes [broadcast, interrupt_state] (fn _ => fn () => e ()) () end; + +fun sync_wait time cond lock = + sync_attributes (fn () => + (case time of + SOME t => ConditionVar.waitUntil (cond, lock, t) + | NONE => (ConditionVar.wait (cond, lock); true))); + + (* execution with time limit *) structure TimeLimit = @@ -192,8 +218,9 @@ val _ = while ! result = Wait do restore_attributes (fn () => - (ConditionVar.waitUntil (result_cond, result_mutex, Time.now () + Time.fromMilliseconds 100); ()) - handle Exn.Interrupt => kill 10) (); + (ignore (sync_wait (SOME (Time.+ (Time.now (), Time.fromMilliseconds 100))) + result_cond result_mutex) + handle Exn.Interrupt => kill 10)) (); (*cleanup*) val output = read_file output_name handle IO.Io _ => ""; @@ -269,5 +296,5 @@ end; -structure BasicMultithreading: BASIC_MULTITHREADING = Multithreading; -open BasicMultithreading; +structure Basic_Multithreading: BASIC_MULTITHREADING = Multithreading; +open Basic_Multithreading;