| author | ballarin | 
| Sat, 21 Jun 2014 10:41:02 +0200 | |
| changeset 57282 | 7da3e398804c | 
| parent 54723 | 124432e77ecf | 
| child 59054 | 61b723761dff | 
| permissions | -rw-r--r-- | 
| 23961 | 1 | (* Title: Pure/ML-Systems/multithreading_polyml.ML | 
| 2 | Author: Makarius | |
| 3 | ||
| 50910 | 4 | Multithreading in Poly/ML (cf. polyml/basis/Thread.sml). | 
| 23961 | 5 | *) | 
| 6 | ||
| 25704 | 7 | signature MULTITHREADING_POLYML = | 
| 8 | sig | |
| 26083 
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
 wenzelm parents: 
26074diff
changeset | 9 |   val interruptible: ('a -> 'b) -> 'a -> 'b
 | 
| 
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
 wenzelm parents: 
26074diff
changeset | 10 |   val uninterruptible: ((('c -> 'd) -> 'c -> 'd) -> 'a -> 'b) -> 'a -> 'b
 | 
| 25704 | 11 | end; | 
| 12 | ||
| 13 | signature BASIC_MULTITHREADING = | |
| 14 | sig | |
| 15 | include BASIC_MULTITHREADING | |
| 16 | include MULTITHREADING_POLYML | |
| 17 | end; | |
| 18 | ||
| 24208 | 19 | signature MULTITHREADING = | 
| 20 | sig | |
| 21 | include MULTITHREADING | |
| 25704 | 22 | include MULTITHREADING_POLYML | 
| 24208 | 23 | end; | 
| 24 | ||
| 23961 | 25 | structure Multithreading: MULTITHREADING = | 
| 26 | struct | |
| 27 | ||
| 24208 | 28 | (* thread attributes *) | 
| 29 | ||
| 28161 | 30 | val no_interrupts = | 
| 31 | [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer]; | |
| 32 | ||
| 41713 
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
 wenzelm parents: 
41710diff
changeset | 33 | val test_interrupts = | 
| 
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
 wenzelm parents: 
41710diff
changeset | 34 | [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptSynch]; | 
| 
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
 wenzelm parents: 
41710diff
changeset | 35 | |
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 36 | val public_interrupts = | 
| 28161 | 37 | [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce]; | 
| 38 | ||
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 39 | val private_interrupts = | 
| 30612 
cb6421b6a18f
future_job: do not inherit attributes, but enforce restricted interrupts -- attempt to prevent interrupt race conditions;
 wenzelm parents: 
30602diff
changeset | 40 | [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptAsynchOnce]; | 
| 
cb6421b6a18f
future_job: do not inherit attributes, but enforce restricted interrupts -- attempt to prevent interrupt race conditions;
 wenzelm parents: 
30602diff
changeset | 41 | |
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 42 | val sync_interrupts = map | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 43 | (fn x as Thread.InterruptState Thread.InterruptDefer => x | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 44 | | Thread.InterruptState _ => Thread.InterruptState Thread.InterruptSynch | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 45 | | x => x); | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 46 | |
| 28466 
6e35fbfc32b8
with_attributes: enforces InterruptAsynch => InterruptAsynchOnce to avoid race condition;
 wenzelm parents: 
28465diff
changeset | 47 | val safe_interrupts = map | 
| 
6e35fbfc32b8
with_attributes: enforces InterruptAsynch => InterruptAsynchOnce to avoid race condition;
 wenzelm parents: 
28465diff
changeset | 48 | (fn Thread.InterruptState Thread.InterruptAsynch => | 
| 
6e35fbfc32b8
with_attributes: enforces InterruptAsynch => InterruptAsynchOnce to avoid race condition;
 wenzelm parents: 
28465diff
changeset | 49 | Thread.InterruptState Thread.InterruptAsynchOnce | 
| 
6e35fbfc32b8
with_attributes: enforces InterruptAsynch => InterruptAsynchOnce to avoid race condition;
 wenzelm parents: 
28465diff
changeset | 50 | | x => x); | 
| 
6e35fbfc32b8
with_attributes: enforces InterruptAsynch => InterruptAsynchOnce to avoid race condition;
 wenzelm parents: 
28465diff
changeset | 51 | |
| 41713 
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
 wenzelm parents: 
41710diff
changeset | 52 | fun interrupted () = | 
| 
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
 wenzelm parents: 
41710diff
changeset | 53 | let | 
| 
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
 wenzelm parents: 
41710diff
changeset | 54 | val orig_atts = safe_interrupts (Thread.getAttributes ()); | 
| 
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
 wenzelm parents: 
41710diff
changeset | 55 | val _ = Thread.setAttributes test_interrupts; | 
| 
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
 wenzelm parents: 
41710diff
changeset | 56 | val test = Exn.capture Thread.testInterrupt (); | 
| 
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
 wenzelm parents: 
41710diff
changeset | 57 | val _ = Thread.setAttributes orig_atts; | 
| 
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
 wenzelm parents: 
41710diff
changeset | 58 | in Exn.release test end; | 
| 
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
 wenzelm parents: 
41710diff
changeset | 59 | |
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 60 | fun with_attributes new_atts e = | 
| 24208 | 61 | let | 
| 29550 
67ec51c032cb
with_attributes: make double sure that unsafe attributes are avoided;
 wenzelm parents: 
28555diff
changeset | 62 | val orig_atts = safe_interrupts (Thread.getAttributes ()); | 
| 30602 
1bd90b76477a
with_attributes: canonical capture/release scheme (potentially iron out race condition);
 wenzelm parents: 
29564diff
changeset | 63 | val result = Exn.capture (fn () => | 
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 64 | (Thread.setAttributes (safe_interrupts new_atts); e orig_atts)) (); | 
| 30602 
1bd90b76477a
with_attributes: canonical capture/release scheme (potentially iron out race condition);
 wenzelm parents: 
29564diff
changeset | 65 | val _ = Thread.setAttributes orig_atts; | 
| 28466 
6e35fbfc32b8
with_attributes: enforces InterruptAsynch => InterruptAsynchOnce to avoid race condition;
 wenzelm parents: 
28465diff
changeset | 66 | in Exn.release result end; | 
| 24208 | 67 | |
| 32286 
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
 wenzelm parents: 
32230diff
changeset | 68 | |
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 69 | (* portable wrappers *) | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 70 | |
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 71 | fun interruptible f x = with_attributes public_interrupts (fn _ => f x); | 
| 32286 
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
 wenzelm parents: 
32230diff
changeset | 72 | |
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 73 | fun uninterruptible f x = | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 74 | with_attributes no_interrupts (fn atts => | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 75 | f (fn g => fn y => with_attributes atts (fn _ => g y)) x); | 
| 24668 | 76 | |
| 24688 
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
 wenzelm parents: 
24672diff
changeset | 77 | |
| 54717 | 78 | (* options *) | 
| 79 | ||
| 80 | val available = true; | |
| 81 | ||
| 82 | fun max_threads_result m = | |
| 83 | if m > 0 then m | |
| 54723 
124432e77ecf
simplified polyml-5.5.2 setup -- implicit upgrade of Thread.numProcessors;
 wenzelm parents: 
54717diff
changeset | 84 | else Int.min (Int.max (Thread.numProcessors (), 1), 8); | 
| 54717 | 85 | |
| 86 | val max_threads = ref 1; | |
| 87 | ||
| 88 | fun max_threads_value () = ! max_threads; | |
| 89 | ||
| 90 | fun max_threads_update m = max_threads := max_threads_result m; | |
| 91 | ||
| 92 | fun max_threads_setmp m f x = | |
| 93 | uninterruptible (fn restore_attributes => fn () => | |
| 94 | let | |
| 95 | val max_threads_orig = ! max_threads; | |
| 96 | val _ = max_threads_update m; | |
| 97 | val result = Exn.capture (restore_attributes f) x; | |
| 98 | val _ = max_threads := max_threads_orig; | |
| 99 | in Exn.release result end) (); | |
| 100 | ||
| 101 | fun enabled () = max_threads_value () > 1; | |
| 102 | ||
| 103 | ||
| 32286 
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
 wenzelm parents: 
32230diff
changeset | 104 | (* synchronous wait *) | 
| 
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
 wenzelm parents: 
32230diff
changeset | 105 | |
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 106 | fun sync_wait opt_atts time cond lock = | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 107 | with_attributes | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 108 | (sync_interrupts (case opt_atts of SOME atts => atts | NONE => Thread.getAttributes ())) | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 109 | (fn _ => | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 110 | (case time of | 
| 43761 
e72ba84ae58f
tuned signature -- corresponding to Scala version;
 wenzelm parents: 
41713diff
changeset | 111 | SOME t => Exn.Res (ConditionVar.waitUntil (cond, lock, t)) | 
| 
e72ba84ae58f
tuned signature -- corresponding to Scala version;
 wenzelm parents: 
41713diff
changeset | 112 | | NONE => (ConditionVar.wait (cond, lock); Exn.Res true)) | 
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 113 | handle exn => Exn.Exn exn); | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 114 | |
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 115 | |
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 116 | (* tracing *) | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 117 | |
| 39616 
8052101883c3
renamed setmp_noncritical to Unsynchronized.setmp to emphasize its meaning;
 wenzelm parents: 
39583diff
changeset | 118 | val trace = ref 0; | 
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 119 | |
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 120 | fun tracing level msg = | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 121 | if level > ! trace then () | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 122 | else uninterruptible (fn _ => fn () => | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 123 |     (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
 | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 124 | handle _ (*sic*) => ()) (); | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 125 | |
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 126 | fun tracing_time detailed time = | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 127 | tracing | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 128 | (if not detailed then 5 | 
| 40301 | 129 | else if Time.>= (time, seconds 1.0) then 1 | 
| 130 | else if Time.>= (time, seconds 0.1) then 2 | |
| 131 | else if Time.>= (time, seconds 0.01) then 3 | |
| 132 | else if Time.>= (time, seconds 0.001) then 4 else 5); | |
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 133 | |
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 134 | fun real_time f x = | 
| 32286 
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
 wenzelm parents: 
32230diff
changeset | 135 | let | 
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 136 | val timer = Timer.startRealTimer (); | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 137 | val () = f x; | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 138 | val time = Timer.checkRealTimer timer; | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32286diff
changeset | 139 | in time end; | 
| 32286 
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
 wenzelm parents: 
32230diff
changeset | 140 | |
| 
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
 wenzelm parents: 
32230diff
changeset | 141 | |
| 23961 | 142 | (* critical section -- may be nested within the same thread *) | 
| 143 | ||
| 144 | local | |
| 145 | ||
| 24063 | 146 | val critical_lock = Mutex.mutex (); | 
| 39616 
8052101883c3
renamed setmp_noncritical to Unsynchronized.setmp to emphasize its meaning;
 wenzelm parents: 
39583diff
changeset | 147 | val critical_thread = ref (NONE: Thread.thread option); | 
| 
8052101883c3
renamed setmp_noncritical to Unsynchronized.setmp to emphasize its meaning;
 wenzelm parents: 
39583diff
changeset | 148 | val critical_name = ref ""; | 
| 24063 | 149 | |
| 40748 
591b6778d076
removed bash from ML system bootstrap, and past the Secure ML barrier;
 wenzelm parents: 
40301diff
changeset | 150 | fun show "" = "" | show name = " " ^ name; | 
| 
591b6778d076
removed bash from ML system bootstrap, and past the Secure ML barrier;
 wenzelm parents: 
40301diff
changeset | 151 | fun show' "" = "" | show' name = " [" ^ name ^ "]"; | 
| 
591b6778d076
removed bash from ML system bootstrap, and past the Secure ML barrier;
 wenzelm parents: 
40301diff
changeset | 152 | |
| 23961 | 153 | in | 
| 154 | ||
| 155 | fun self_critical () = | |
| 156 | (case ! critical_thread of | |
| 157 | NONE => false | |
| 28150 | 158 | | SOME t => Thread.equal (t, Thread.self ())); | 
| 23961 | 159 | |
| 23991 | 160 | fun NAMED_CRITICAL name e = | 
| 23961 | 161 | if self_critical () then e () | 
| 162 | else | |
| 32184 | 163 | Exn.release (uninterruptible (fn restore_attributes => fn () => | 
| 24208 | 164 | let | 
| 165 | val name' = ! critical_name; | |
| 166 | val _ = | |
| 167 | if Mutex.trylock critical_lock then () | |
| 168 | else | |
| 169 | let | |
| 32184 | 170 | val _ = tracing 5 (fn () => "CRITICAL" ^ show name ^ show' name' ^ ": waiting"); | 
| 32185 | 171 | val time = real_time Mutex.lock critical_lock; | 
| 32186 | 172 | val _ = tracing_time true time (fn () => | 
| 24208 | 173 | "CRITICAL" ^ show name ^ show' name' ^ ": passed after " ^ Time.toString time); | 
| 174 | in () end; | |
| 175 | val _ = critical_thread := SOME (Thread.self ()); | |
| 176 | val _ = critical_name := name; | |
| 26083 
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
 wenzelm parents: 
26074diff
changeset | 177 | val result = Exn.capture (restore_attributes e) (); | 
| 24208 | 178 | val _ = critical_name := ""; | 
| 179 | val _ = critical_thread := NONE; | |
| 180 | val _ = Mutex.unlock critical_lock; | |
| 32184 | 181 | in result end) ()); | 
| 23961 | 182 | |
| 23991 | 183 | fun CRITICAL e = NAMED_CRITICAL "" e; | 
| 23981 | 184 | |
| 23961 | 185 | end; | 
| 186 | ||
| 23973 | 187 | |
| 25704 | 188 | (* serial numbers *) | 
| 189 | ||
| 190 | local | |
| 191 | ||
| 192 | val serial_lock = Mutex.mutex (); | |
| 39616 
8052101883c3
renamed setmp_noncritical to Unsynchronized.setmp to emphasize its meaning;
 wenzelm parents: 
39583diff
changeset | 193 | val serial_count = ref 0; | 
| 25704 | 194 | |
| 195 | in | |
| 196 | ||
| 197 | val serial = uninterruptible (fn _ => fn () => | |
| 198 | let | |
| 199 | val _ = Mutex.lock serial_lock; | |
| 28124 
10a1f1f4c6ae
moved Multithreading.task/schedule to Concurrent/schedule.ML;
 wenzelm parents: 
26504diff
changeset | 200 | val _ = serial_count := ! serial_count + 1; | 
| 
10a1f1f4c6ae
moved Multithreading.task/schedule to Concurrent/schedule.ML;
 wenzelm parents: 
26504diff
changeset | 201 | val res = ! serial_count; | 
| 25704 | 202 | val _ = Mutex.unlock serial_lock; | 
| 203 | in res end); | |
| 204 | ||
| 23961 | 205 | end; | 
| 206 | ||
| 25704 | 207 | end; | 
| 24688 
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
 wenzelm parents: 
24672diff
changeset | 208 | |
| 32286 
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
 wenzelm parents: 
32230diff
changeset | 209 | structure Basic_Multithreading: BASIC_MULTITHREADING = Multithreading; | 
| 
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
 wenzelm parents: 
32230diff
changeset | 210 | open Basic_Multithreading; |