renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
Sat, 01 Aug 2009 00:09:45 +0200 (2009-07-31)
changeset 32295 400cc493d466
parent 32294 d00238af17b6
child 32296 e6a8ed8aed3a
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts; renamed Multithreading.restricted_interrupts to Multithreading.private_interrupts; added Multithreading.sync_interrupts; Multithreading.sync_wait: more careful treatment of attributes; Multithreading.tracing: uninterruptible; Multithreading.system_out: signal within critical region, more careful sync_wait; eliminated redundant Thread.testInterrupt; Future.wait_timeout: uniform Multithreading.sync_wait; future scheduler: interruptible body (sync!), to improve reactivity; future_job: reject duplicate assignments -- system error; misc tuning;
--- a/src/Pure/Concurrent/future.ML	Thu Jul 30 23:50:11 2009 +0200
+++ b/src/Pure/Concurrent/future.ML	Sat Aug 01 00:09:45 2009 +0200
@@ -120,11 +120,10 @@
 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
 fun wait cond = (*requires SYNCHRONIZED*)
-  Multithreading.sync_wait NONE cond lock;
+  Multithreading.sync_wait NONE NONE cond lock;
-fun wait_interruptible timeout cond = (*requires SYNCHRONIZED*)
-  interruptible (fn () =>
-    ignore (Multithreading.sync_wait (SOME (Time.+ ( (), timeout))) cond lock)) ();
+fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
+  Multithreading.sync_wait NONE (SOME (Time.+ ( (), timeout))) cond lock;
 fun signal cond = (*requires SYNCHRONIZED*)
   ConditionVar.signal cond;
@@ -149,11 +148,11 @@
         val res =
           if ok then
             Exn.capture (fn () =>
-             (Thread.testInterrupt ();
-              Multithreading.with_attributes Multithreading.restricted_interrupts
-                (fn _ => fn () => e ())) ()) ()
+              Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
           else Exn.Exn Exn.Interrupt;
-        val _ = Synchronized.change result (K (SOME res));
+        val _ = Synchronized.change result
+          (fn NONE => SOME res
+            | SOME _ => raise Fail "Duplicate assignment of future value");
         (case res of
           Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
@@ -276,20 +275,24 @@
         broadcast_work ());
     (*delay loop*)
-    val _ = wait_interruptible next_round scheduler_event
-      handle Exn.Interrupt =>
-        (Multithreading.tracing 1 (fn () => "Interrupt");
- do_cancel (Task_Queue.cancel_all (! queue)));
+    val _ = Exn.release (wait_timeout next_round scheduler_event);
     val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
     val continue = not (! do_shutdown andalso null (! workers));
     val _ = if continue then () else scheduler := NONE;
     val _ = broadcast scheduler_event;
-  in continue end;
+  in continue end
+  handle Exn.Interrupt =>
+   (Multithreading.tracing 1 (fn () => "Interrupt");
+ do_cancel (Task_Queue.cancel_all (! queue));
+    scheduler_next ());
 fun scheduler_loop () =
-  while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ();
+  Multithreading.with_attributes
+    (Multithreading.sync_interrupts Multithreading.public_interrupts)
+    (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ());
 fun scheduler_active () = (*requires SYNCHRONIZED*)
   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
@@ -393,12 +396,11 @@
 fun interruptible_task f x =
   if Multithreading.available then
-   (Thread.testInterrupt ();
       (if is_worker ()
-       then Multithreading.restricted_interrupts
-       else Multithreading.regular_interrupts)
-      (fn _ => fn x => f x) x)
+       then Multithreading.private_interrupts
+       else Multithreading.public_interrupts)
+      (fn _ => f x)
   else interruptible f x;
 (*cancel: present and future group members will be interrupted eventually*)
--- a/src/Pure/Concurrent/simple_thread.ML	Thu Jul 30 23:50:11 2009 +0200
+++ b/src/Pure/Concurrent/simple_thread.ML	Sat Aug 01 00:09:45 2009 +0200
@@ -16,7 +16,7 @@
 fun fork interrupts body =
   Thread.fork (fn () => exception_trace (fn () => body ()),
-    if interrupts then Multithreading.regular_interrupts else Multithreading.no_interrupts);
+    if interrupts then Multithreading.public_interrupts else Multithreading.no_interrupts);
 fun interrupt thread = Thread.interrupt thread handle Thread _ => ();
--- a/src/Pure/Concurrent/synchronized.ML	Thu Jul 30 23:50:11 2009 +0200
+++ b/src/Pure/Concurrent/synchronized.ML	Sat Aug 01 00:09:45 2009 +0200
@@ -48,9 +48,10 @@
           (case f x of
             SOME (y, x') => (var := x'; SOME y)
           | NONE =>
-              if Multithreading.sync_wait (time_limit x) cond lock
-              then try_change ()
-              else NONE)
+              (case Multithreading.sync_wait NONE (time_limit x) cond lock of
+                Exn.Result true => try_change ()
+              | Exn.Result false => NONE
+              | Exn.Exn exn => reraise exn))
       val res = try_change ();
       val _ = ConditionVar.broadcast cond;
--- a/src/Pure/ML-Systems/multithreading.ML	Thu Jul 30 23:50:11 2009 +0200
+++ b/src/Pure/ML-Systems/multithreading.ML	Sat Aug 01 00:09:45 2009 +0200
@@ -13,20 +13,21 @@
-  val trace: int ref
-  val tracing: int -> (unit -> string) -> unit
-  val tracing_time: bool -> Time.time -> (unit -> string) -> unit
-  val real_time: ('a -> unit) -> 'a -> Time.time
   val available: bool
   val max_threads: int ref
   val max_threads_value: unit -> int
   val enabled: unit -> bool
   val no_interrupts: Thread.threadAttribute list
-  val regular_interrupts: Thread.threadAttribute list
-  val restricted_interrupts: Thread.threadAttribute list
-  val with_attributes: Thread.threadAttribute list ->
-    (Thread.threadAttribute list -> 'a -> 'b) -> 'a -> 'b
-  val sync_wait: Time.time option -> ConditionVar.conditionVar -> Mutex.mutex -> bool
+  val public_interrupts: Thread.threadAttribute list
+  val private_interrupts: Thread.threadAttribute list
+  val sync_interrupts: Thread.threadAttribute list -> Thread.threadAttribute list
+  val with_attributes: Thread.threadAttribute list -> (Thread.threadAttribute list -> 'a) -> 'a
+  val sync_wait: Thread.threadAttribute list option -> Time.time option ->
+    ConditionVar.conditionVar -> Mutex.mutex -> bool Exn.result
+  val trace: int ref
+  val tracing: int -> (unit -> string) -> unit
+  val tracing_time: bool -> Time.time -> (unit -> string) -> unit
+  val real_time: ('a -> unit) -> 'a -> Time.time
   val self_critical: unit -> bool
   val serial: unit -> int
@@ -34,14 +35,6 @@
 structure Multithreading: MULTITHREADING =
-(* tracing *)
-val trace = ref (0: int);
-fun tracing _ _ = ();
-fun tracing_time _ _ _ = ();
-fun real_time f x = (f x; Time.zeroTime);
 (* options *)
 val available = false;
@@ -52,18 +45,22 @@
 (* attributes *)
-val no_interrupts =
-  [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer];
+val no_interrupts = [];
+val public_interrupts = [];
+val private_interrupts = [];
+fun sync_interrupts _ = [];
-val regular_interrupts =
-  [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce];
+fun with_attributes _ e = e [];
-val restricted_interrupts =
-  [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptAsynchOnce];
+fun sync_wait _ _ _ _ = Exn.Result true;
+(* tracing *)
-fun with_attributes _ f x = f [] x;
-fun sync_wait _ _ _ = false;
+val trace = ref (0: int);
+fun tracing _ _ = ();
+fun tracing_time _ _ _ = ();
+fun real_time f x = (f x; Time.zeroTime);
 (* critical section *)
--- a/src/Pure/ML-Systems/multithreading_polyml.ML	Thu Jul 30 23:50:11 2009 +0200
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Sat Aug 01 00:09:45 2009 +0200
@@ -27,31 +27,6 @@
 structure Multithreading: MULTITHREADING =
-(* tracing *)
-val trace = ref 0;
-fun tracing level msg =
-  if level > ! trace then ()
-  else (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
-    handle _ (*sic*) => ();
-fun tracing_time detailed time =
-  tracing
-   (if not detailed then 5
-    else if Time.>= (time, Time.fromMilliseconds 1000) then 1
-    else if Time.>= (time, Time.fromMilliseconds 100) then 2
-    else if Time.>= (time, Time.fromMilliseconds 10) then 3
-    else if Time.>= (time, Time.fromMilliseconds 1) then 4 else 5);
-fun real_time f x =
-  let
-    val timer = Timer.startRealTimer ();
-    val () = f x;
-    val time = Timer.checkRealTimer timer;
-  in time end;
 (* options *)
 val available = true;
@@ -91,57 +66,76 @@
 val no_interrupts =
   [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer];
-val regular_interrupts =
+val public_interrupts =
   [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce];
-val restricted_interrupts =
+val private_interrupts =
   [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptAsynchOnce];
+val sync_interrupts = map
+  (fn x as Thread.InterruptState Thread.InterruptDefer => x
+    | Thread.InterruptState _ => Thread.InterruptState Thread.InterruptSynch
+    | x => x);
 val safe_interrupts = map
   (fn Thread.InterruptState Thread.InterruptAsynch =>
       Thread.InterruptState Thread.InterruptAsynchOnce
     | x => x);
-fun with_attributes new_atts f x =
+fun with_attributes new_atts e =
     val orig_atts = safe_interrupts (Thread.getAttributes ());
     val result = Exn.capture (fn () =>
-      (Thread.setAttributes (safe_interrupts new_atts); f orig_atts x)) ();
+      (Thread.setAttributes (safe_interrupts new_atts); e orig_atts)) ();
     val _ = Thread.setAttributes orig_atts;
   in Exn.release result end;
-(* regular interruptibility *)
+(* portable wrappers *)
+fun interruptible f x = with_attributes public_interrupts (fn _ => f x);
-fun interruptible f x =
-  (Thread.testInterrupt (); with_attributes regular_interrupts (fn _ => fn x => f x) x);
-fun uninterruptible f =
-  with_attributes no_interrupts (fn atts => fn x =>
-    f (fn g => with_attributes atts (fn _ => fn y => g y)) x);
+fun uninterruptible f x =
+  with_attributes no_interrupts (fn atts =>
+    f (fn g => fn y => with_attributes atts (fn _ => g y)) x);
 (* synchronous wait *)
-fun sync_attributes e =
+fun sync_wait opt_atts time cond lock =
+  with_attributes
+    (sync_interrupts (case opt_atts of SOME atts => atts | NONE => Thread.getAttributes ()))
+    (fn _ =>
+      (case time of
+        SOME t => Exn.Result (ConditionVar.waitUntil (cond, lock, t))
+      | NONE => (ConditionVar.wait (cond, lock); Exn.Result true))
+      handle exn => Exn.Exn exn);
+(* tracing *)
+val trace = ref 0;
+fun tracing level msg =
+  if level > ! trace then ()
+  else uninterruptible (fn _ => fn () =>
+    (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
+      handle _ (*sic*) => ()) ();
+fun tracing_time detailed time =
+  tracing
+   (if not detailed then 5
+    else if Time.>= (time, Time.fromMilliseconds 1000) then 1
+    else if Time.>= (time, Time.fromMilliseconds 100) then 2
+    else if Time.>= (time, Time.fromMilliseconds 10) then 3
+    else if Time.>= (time, Time.fromMilliseconds 1) then 4 else 5);
+fun real_time f x =
-    val orig_atts = Thread.getAttributes ();
-    val broadcast =
-      (case List.find (fn Thread.EnableBroadcastInterrupt _ => true | _ => false) orig_atts of
-        NONE => Thread.EnableBroadcastInterrupt false
-      | SOME att => att);
-    val interrupt_state =
-      (case List.find (fn Thread.InterruptState _ => true | _ => false) orig_atts of
-        NONE => Thread.InterruptState Thread.InterruptDefer
-      | SOME (state as Thread.InterruptState Thread.InterruptDefer) => state
-      | _ => Thread.InterruptState Thread.InterruptSynch);
-  in with_attributes [broadcast, interrupt_state] (fn _ => fn () => e ()) () end;
-fun sync_wait time cond lock =
-  sync_attributes (fn () =>
-    (case time of
-      SOME t => ConditionVar.waitUntil (cond, lock, t)
-    | NONE => (ConditionVar.wait (cond, lock); true)));
+    val timer = Timer.startRealTimer ();
+    val () = f x;
+    val time = Timer.checkRealTimer timer;
+  in time end;
 (* execution with time limit *)
@@ -169,7 +163,7 @@
 (* system shell processes, with propagation of interrupts *)
-fun system_out script = uninterruptible (fn restore_attributes => fn () =>
+fun system_out script = with_attributes no_interrupts (fn orig_atts =>
     val script_name = OS.FileSys.tmpName ();
     val _ = write_file script_name script;
@@ -180,13 +174,12 @@
     (*result state*)
     datatype result = Wait | Signal | Result of int;
     val result = ref Wait;
-    val result_mutex = Mutex.mutex ();
-    val result_cond = ConditionVar.conditionVar ();
+    val lock = Mutex.mutex ();
+    val cond = ConditionVar.conditionVar ();
     fun set_result res =
-      (Mutex.lock result_mutex; result := res; Mutex.unlock result_mutex;
-        ConditionVar.signal result_cond);
+      (Mutex.lock lock; result := res; ConditionVar.signal cond; Mutex.unlock lock);
-    val _ = Mutex.lock result_mutex;
+    val _ = Mutex.lock lock;
     (*system thread*)
     val system_thread = Thread.fork (fn () =>
@@ -216,11 +209,12 @@
       handle OS.SysErr _ => () | IO.Io _ =>
         (OS.Process.sleep (Time.fromMilliseconds 100); if n > 0 then kill (n - 1) else ());
-    val _ = while ! result = Wait do
-      restore_attributes (fn () =>
-        (ignore (sync_wait (SOME (Time.+ ( (), Time.fromMilliseconds 100)))
-            result_cond result_mutex)
-          handle Exn.Interrupt => kill 10)) ();
+    val _ =
+      while ! result = Wait do
+        let val res =
+          sync_wait (SOME orig_atts)
+            (SOME (Time.+ ( (), Time.fromMilliseconds 100))) cond lock
+        in case res of Exn.Exn Exn.Interrupt => kill 10 | _ => () end;
     val output = read_file output_name handle IO.Io _ => "";
@@ -229,7 +223,7 @@
     val _ = OS.FileSys.remove output_name handle OS.SysErr _ => ();
     val _ = Thread.interrupt system_thread handle Thread _ => ();
     val rc = (case ! result of Signal => raise Exn.Interrupt | Result rc => rc);
-  in (output, rc) end) ();
+  in (output, rc) end);
 (* critical section -- may be nested within the same thread *)