# HG changeset patch # User wenzelm # Date 1249078185 -7200 # Node ID 400cc493d4666776fff7d74ec585039477e76bf0 # Parent d00238af17b69db99ac88c109afefa25e960512d renamed Multithreading.regular_interrupts to Multithreading.public_interrupts; renamed Multithreading.restricted_interrupts to Multithreading.private_interrupts; added Multithreading.sync_interrupts; Multithreading.sync_wait: more careful treatment of attributes; Multithreading.tracing: uninterruptible; Multithreading.system_out: signal within critical region, more careful sync_wait; eliminated redundant Thread.testInterrupt; Future.wait_timeout: uniform Multithreading.sync_wait; future scheduler: interruptible body (sync!), to improve reactivity; future_job: reject duplicate assignments -- system error; misc tuning; diff -r d00238af17b6 -r 400cc493d466 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Thu Jul 30 23:50:11 2009 +0200 +++ b/src/Pure/Concurrent/future.ML Sat Aug 01 00:09:45 2009 +0200 @@ -120,11 +120,10 @@ fun SYNCHRONIZED name = SimpleThread.synchronized name lock; fun wait cond = (*requires SYNCHRONIZED*) - Multithreading.sync_wait NONE cond lock; + Multithreading.sync_wait NONE NONE cond lock; -fun wait_interruptible timeout cond = (*requires SYNCHRONIZED*) - interruptible (fn () => - ignore (Multithreading.sync_wait (SOME (Time.+ (Time.now (), timeout))) cond lock)) (); +fun wait_timeout timeout cond = (*requires SYNCHRONIZED*) + Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock; fun signal cond = (*requires SYNCHRONIZED*) ConditionVar.signal cond; @@ -149,11 +148,11 @@ val res = if ok then Exn.capture (fn () => - (Thread.testInterrupt (); - Multithreading.with_attributes Multithreading.restricted_interrupts - (fn _ => fn () => e ())) ()) () + Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) () else Exn.Exn Exn.Interrupt; - val _ = Synchronized.change result (K (SOME res)); + val _ = Synchronized.change result + (fn NONE => SOME res + | SOME _ => raise Fail "Duplicate assignment of future value"); in (case res of Exn.Exn exn => (Task_Queue.cancel_group group exn; false) @@ -276,20 +275,24 @@ broadcast_work ()); (*delay loop*) - val _ = wait_interruptible next_round scheduler_event - handle Exn.Interrupt => - (Multithreading.tracing 1 (fn () => "Interrupt"); - List.app do_cancel (Task_Queue.cancel_all (! queue))); + val _ = Exn.release (wait_timeout next_round scheduler_event); (*shutdown*) val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else (); val continue = not (! do_shutdown andalso null (! workers)); val _ = if continue then () else scheduler := NONE; val _ = broadcast scheduler_event; - in continue end; + in continue end + handle Exn.Interrupt => + (Multithreading.tracing 1 (fn () => "Interrupt"); + List.app do_cancel (Task_Queue.cancel_all (! queue)); + scheduler_next ()); + fun scheduler_loop () = - while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do (); + Multithreading.with_attributes + (Multithreading.sync_interrupts Multithreading.public_interrupts) + (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ()); fun scheduler_active () = (*requires SYNCHRONIZED*) (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); @@ -393,12 +396,11 @@ fun interruptible_task f x = if Multithreading.available then - (Thread.testInterrupt (); Multithreading.with_attributes (if is_worker () - then Multithreading.restricted_interrupts - else Multithreading.regular_interrupts) - (fn _ => fn x => f x) x) + then Multithreading.private_interrupts + else Multithreading.public_interrupts) + (fn _ => f x) else interruptible f x; (*cancel: present and future group members will be interrupted eventually*) diff -r d00238af17b6 -r 400cc493d466 src/Pure/Concurrent/simple_thread.ML --- a/src/Pure/Concurrent/simple_thread.ML Thu Jul 30 23:50:11 2009 +0200 +++ b/src/Pure/Concurrent/simple_thread.ML Sat Aug 01 00:09:45 2009 +0200 @@ -16,7 +16,7 @@ fun fork interrupts body = Thread.fork (fn () => exception_trace (fn () => body ()), - if interrupts then Multithreading.regular_interrupts else Multithreading.no_interrupts); + if interrupts then Multithreading.public_interrupts else Multithreading.no_interrupts); fun interrupt thread = Thread.interrupt thread handle Thread _ => (); diff -r d00238af17b6 -r 400cc493d466 src/Pure/Concurrent/synchronized.ML --- a/src/Pure/Concurrent/synchronized.ML Thu Jul 30 23:50:11 2009 +0200 +++ b/src/Pure/Concurrent/synchronized.ML Sat Aug 01 00:09:45 2009 +0200 @@ -48,9 +48,10 @@ (case f x of SOME (y, x') => (var := x'; SOME y) | NONE => - if Multithreading.sync_wait (time_limit x) cond lock - then try_change () - else NONE) + (case Multithreading.sync_wait NONE (time_limit x) cond lock of + Exn.Result true => try_change () + | Exn.Result false => NONE + | Exn.Exn exn => reraise exn)) end; val res = try_change (); val _ = ConditionVar.broadcast cond; diff -r d00238af17b6 -r 400cc493d466 src/Pure/ML-Systems/multithreading.ML --- a/src/Pure/ML-Systems/multithreading.ML Thu Jul 30 23:50:11 2009 +0200 +++ b/src/Pure/ML-Systems/multithreading.ML Sat Aug 01 00:09:45 2009 +0200 @@ -13,20 +13,21 @@ signature MULTITHREADING = sig include BASIC_MULTITHREADING - val trace: int ref - val tracing: int -> (unit -> string) -> unit - val tracing_time: bool -> Time.time -> (unit -> string) -> unit - val real_time: ('a -> unit) -> 'a -> Time.time val available: bool val max_threads: int ref val max_threads_value: unit -> int val enabled: unit -> bool val no_interrupts: Thread.threadAttribute list - val regular_interrupts: Thread.threadAttribute list - val restricted_interrupts: Thread.threadAttribute list - val with_attributes: Thread.threadAttribute list -> - (Thread.threadAttribute list -> 'a -> 'b) -> 'a -> 'b - val sync_wait: Time.time option -> ConditionVar.conditionVar -> Mutex.mutex -> bool + val public_interrupts: Thread.threadAttribute list + val private_interrupts: Thread.threadAttribute list + val sync_interrupts: Thread.threadAttribute list -> Thread.threadAttribute list + val with_attributes: Thread.threadAttribute list -> (Thread.threadAttribute list -> 'a) -> 'a + val sync_wait: Thread.threadAttribute list option -> Time.time option -> + ConditionVar.conditionVar -> Mutex.mutex -> bool Exn.result + val trace: int ref + val tracing: int -> (unit -> string) -> unit + val tracing_time: bool -> Time.time -> (unit -> string) -> unit + val real_time: ('a -> unit) -> 'a -> Time.time val self_critical: unit -> bool val serial: unit -> int end; @@ -34,14 +35,6 @@ structure Multithreading: MULTITHREADING = struct -(* tracing *) - -val trace = ref (0: int); -fun tracing _ _ = (); -fun tracing_time _ _ _ = (); -fun real_time f x = (f x; Time.zeroTime); - - (* options *) val available = false; @@ -52,18 +45,22 @@ (* attributes *) -val no_interrupts = - [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer]; +val no_interrupts = []; +val public_interrupts = []; +val private_interrupts = []; +fun sync_interrupts _ = []; -val regular_interrupts = - [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce]; +fun with_attributes _ e = e []; -val restricted_interrupts = - [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptAsynchOnce]; +fun sync_wait _ _ _ _ = Exn.Result true; + + +(* tracing *) -fun with_attributes _ f x = f [] x; - -fun sync_wait _ _ _ = false; +val trace = ref (0: int); +fun tracing _ _ = (); +fun tracing_time _ _ _ = (); +fun real_time f x = (f x; Time.zeroTime); (* critical section *) diff -r d00238af17b6 -r 400cc493d466 src/Pure/ML-Systems/multithreading_polyml.ML --- a/src/Pure/ML-Systems/multithreading_polyml.ML Thu Jul 30 23:50:11 2009 +0200 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML Sat Aug 01 00:09:45 2009 +0200 @@ -27,31 +27,6 @@ structure Multithreading: MULTITHREADING = struct -(* tracing *) - -val trace = ref 0; - -fun tracing level msg = - if level > ! trace then () - else (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr) - handle _ (*sic*) => (); - -fun tracing_time detailed time = - tracing - (if not detailed then 5 - else if Time.>= (time, Time.fromMilliseconds 1000) then 1 - else if Time.>= (time, Time.fromMilliseconds 100) then 2 - else if Time.>= (time, Time.fromMilliseconds 10) then 3 - else if Time.>= (time, Time.fromMilliseconds 1) then 4 else 5); - -fun real_time f x = - let - val timer = Timer.startRealTimer (); - val () = f x; - val time = Timer.checkRealTimer timer; - in time end; - - (* options *) val available = true; @@ -91,57 +66,76 @@ val no_interrupts = [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer]; -val regular_interrupts = +val public_interrupts = [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce]; -val restricted_interrupts = +val private_interrupts = [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptAsynchOnce]; +val sync_interrupts = map + (fn x as Thread.InterruptState Thread.InterruptDefer => x + | Thread.InterruptState _ => Thread.InterruptState Thread.InterruptSynch + | x => x); + val safe_interrupts = map (fn Thread.InterruptState Thread.InterruptAsynch => Thread.InterruptState Thread.InterruptAsynchOnce | x => x); -fun with_attributes new_atts f x = +fun with_attributes new_atts e = let val orig_atts = safe_interrupts (Thread.getAttributes ()); val result = Exn.capture (fn () => - (Thread.setAttributes (safe_interrupts new_atts); f orig_atts x)) (); + (Thread.setAttributes (safe_interrupts new_atts); e orig_atts)) (); val _ = Thread.setAttributes orig_atts; in Exn.release result end; -(* regular interruptibility *) +(* portable wrappers *) + +fun interruptible f x = with_attributes public_interrupts (fn _ => f x); -fun interruptible f x = - (Thread.testInterrupt (); with_attributes regular_interrupts (fn _ => fn x => f x) x); - -fun uninterruptible f = - with_attributes no_interrupts (fn atts => fn x => - f (fn g => with_attributes atts (fn _ => fn y => g y)) x); +fun uninterruptible f x = + with_attributes no_interrupts (fn atts => + f (fn g => fn y => with_attributes atts (fn _ => g y)) x); (* synchronous wait *) -fun sync_attributes e = +fun sync_wait opt_atts time cond lock = + with_attributes + (sync_interrupts (case opt_atts of SOME atts => atts | NONE => Thread.getAttributes ())) + (fn _ => + (case time of + SOME t => Exn.Result (ConditionVar.waitUntil (cond, lock, t)) + | NONE => (ConditionVar.wait (cond, lock); Exn.Result true)) + handle exn => Exn.Exn exn); + + +(* tracing *) + +val trace = ref 0; + +fun tracing level msg = + if level > ! trace then () + else uninterruptible (fn _ => fn () => + (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr) + handle _ (*sic*) => ()) (); + +fun tracing_time detailed time = + tracing + (if not detailed then 5 + else if Time.>= (time, Time.fromMilliseconds 1000) then 1 + else if Time.>= (time, Time.fromMilliseconds 100) then 2 + else if Time.>= (time, Time.fromMilliseconds 10) then 3 + else if Time.>= (time, Time.fromMilliseconds 1) then 4 else 5); + +fun real_time f x = let - val orig_atts = Thread.getAttributes (); - val broadcast = - (case List.find (fn Thread.EnableBroadcastInterrupt _ => true | _ => false) orig_atts of - NONE => Thread.EnableBroadcastInterrupt false - | SOME att => att); - val interrupt_state = - (case List.find (fn Thread.InterruptState _ => true | _ => false) orig_atts of - NONE => Thread.InterruptState Thread.InterruptDefer - | SOME (state as Thread.InterruptState Thread.InterruptDefer) => state - | _ => Thread.InterruptState Thread.InterruptSynch); - in with_attributes [broadcast, interrupt_state] (fn _ => fn () => e ()) () end; - -fun sync_wait time cond lock = - sync_attributes (fn () => - (case time of - SOME t => ConditionVar.waitUntil (cond, lock, t) - | NONE => (ConditionVar.wait (cond, lock); true))); + val timer = Timer.startRealTimer (); + val () = f x; + val time = Timer.checkRealTimer timer; + in time end; (* execution with time limit *) @@ -169,7 +163,7 @@ (* system shell processes, with propagation of interrupts *) -fun system_out script = uninterruptible (fn restore_attributes => fn () => +fun system_out script = with_attributes no_interrupts (fn orig_atts => let val script_name = OS.FileSys.tmpName (); val _ = write_file script_name script; @@ -180,13 +174,12 @@ (*result state*) datatype result = Wait | Signal | Result of int; val result = ref Wait; - val result_mutex = Mutex.mutex (); - val result_cond = ConditionVar.conditionVar (); + val lock = Mutex.mutex (); + val cond = ConditionVar.conditionVar (); fun set_result res = - (Mutex.lock result_mutex; result := res; Mutex.unlock result_mutex; - ConditionVar.signal result_cond); + (Mutex.lock lock; result := res; ConditionVar.signal cond; Mutex.unlock lock); - val _ = Mutex.lock result_mutex; + val _ = Mutex.lock lock; (*system thread*) val system_thread = Thread.fork (fn () => @@ -216,11 +209,12 @@ handle OS.SysErr _ => () | IO.Io _ => (OS.Process.sleep (Time.fromMilliseconds 100); if n > 0 then kill (n - 1) else ()); - val _ = while ! result = Wait do - restore_attributes (fn () => - (ignore (sync_wait (SOME (Time.+ (Time.now (), Time.fromMilliseconds 100))) - result_cond result_mutex) - handle Exn.Interrupt => kill 10)) (); + val _ = + while ! result = Wait do + let val res = + sync_wait (SOME orig_atts) + (SOME (Time.+ (Time.now (), Time.fromMilliseconds 100))) cond lock + in case res of Exn.Exn Exn.Interrupt => kill 10 | _ => () end; (*cleanup*) val output = read_file output_name handle IO.Io _ => ""; @@ -229,7 +223,7 @@ val _ = OS.FileSys.remove output_name handle OS.SysErr _ => (); val _ = Thread.interrupt system_thread handle Thread _ => (); val rc = (case ! result of Signal => raise Exn.Interrupt | Result rc => rc); - in (output, rc) end) (); + in (output, rc) end); (* critical section -- may be nested within the same thread *)