author | wenzelm |
Fri, 03 Aug 2007 22:33:09 +0200 | |
changeset 24150 | ed724867099a |
parent 24144 | ec51a0f7eefe |
child 24208 | f4cafbaa05e4 |
permissions | -rw-r--r-- |
23961 | 1 |
(* Title: Pure/ML-Systems/multithreading_polyml.ML |
2 |
ID: $Id$ |
|
3 |
Author: Makarius |
|
4 |
||
5 |
Multithreading in Poly/ML (version 5.1). |
|
6 |
*) |
|
7 |
||
8 |
open Thread; |
|
9 |
||
10 |
structure Multithreading: MULTITHREADING = |
|
11 |
struct |
|
12 |
||
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
13 |
(* options *) |
24069 | 14 |
|
24119 | 15 |
val trace = ref 0; |
16 |
fun tracing level msg = |
|
17 |
if level <= ! trace |
|
23981 | 18 |
then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr) |
19 |
else (); |
|
23961 | 20 |
|
23981 | 21 |
val available = true; |
23973 | 22 |
val max_threads = ref 1; |
23 |
||
24 |
||
24069 | 25 |
(* misc utils *) |
26 |
||
27 |
fun show "" = "" |
|
28 |
| show name = " " ^ name; |
|
29 |
||
30 |
fun show' "" = "" |
|
31 |
| show' name = " [" ^ name ^ "]"; |
|
32 |
||
33 |
fun inc i = (i := ! i + 1; ! i); |
|
34 |
fun dec i = (i := ! i - 1; ! i); |
|
35 |
||
36 |
||
23961 | 37 |
(* critical section -- may be nested within the same thread *) |
38 |
||
39 |
local |
|
40 |
||
24063 | 41 |
val critical_lock = Mutex.mutex (); |
42 |
val critical_thread = ref (NONE: Thread.thread option); |
|
43 |
val critical_name = ref ""; |
|
44 |
||
23961 | 45 |
in |
46 |
||
47 |
fun self_critical () = |
|
48 |
(case ! critical_thread of |
|
49 |
NONE => false |
|
50 |
| SOME id => Thread.equal (id, Thread.self ())); |
|
51 |
||
23991 | 52 |
fun NAMED_CRITICAL name e = |
23961 | 53 |
if self_critical () then e () |
54 |
else |
|
55 |
let |
|
24144 | 56 |
val name' = ! critical_name; |
23961 | 57 |
val _ = |
58 |
if Mutex.trylock critical_lock then () |
|
59 |
else |
|
24060 | 60 |
let |
61 |
val timer = Timer.startRealTimer (); |
|
24144 | 62 |
val _ = tracing 4 (fn () => "CRITICAL" ^ show name ^ show' name' ^ ": waiting"); |
24060 | 63 |
val _ = Mutex.lock critical_lock; |
24144 | 64 |
val time = Timer.checkRealTimer timer; |
65 |
val _ = tracing (if Time.> (time, Time.fromMilliseconds 10) then 3 else 4) (fn () => |
|
66 |
"CRITICAL" ^ show name ^ show' name' ^ ": passed after " ^ Time.toString time); |
|
24060 | 67 |
in () end; |
23961 | 68 |
val _ = critical_thread := SOME (Thread.self ()); |
24060 | 69 |
val _ = critical_name := name; |
23961 | 70 |
val result = Exn.capture e (); |
24060 | 71 |
val _ = critical_name := ""; |
23961 | 72 |
val _ = critical_thread := NONE; |
73 |
val _ = Mutex.unlock critical_lock; |
|
74 |
in Exn.release result end; |
|
75 |
||
23991 | 76 |
fun CRITICAL e = NAMED_CRITICAL "" e; |
23981 | 77 |
|
23961 | 78 |
end; |
79 |
||
23973 | 80 |
|
81 |
(* scheduling -- non-interruptible threads working on a queue of tasks *) |
|
82 |
||
24069 | 83 |
local |
84 |
||
85 |
val protected_name = ref ""; |
|
86 |
||
87 |
in |
|
24063 | 88 |
|
23973 | 89 |
fun schedule n next_task tasks = |
90 |
let |
|
91 |
(*protected execution*) |
|
92 |
val lock = Mutex.mutex (); |
|
24063 | 93 |
fun PROTECTED name e = |
23973 | 94 |
let |
24144 | 95 |
val name' = ! protected_name; |
23981 | 96 |
val _ = |
97 |
if Mutex.trylock lock then () |
|
98 |
else |
|
24144 | 99 |
let |
100 |
val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": waiting"); |
|
101 |
val _ = Mutex.lock lock; |
|
102 |
val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": passed"); |
|
103 |
in () end; |
|
24069 | 104 |
val _ = protected_name := name; |
23973 | 105 |
val res = Exn.capture e (); |
24069 | 106 |
val _ = protected_name := ""; |
23973 | 107 |
val _ = Mutex.unlock lock; |
108 |
in Exn.release res end; |
|
109 |
||
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
110 |
(*wakeup condition*) |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
111 |
val wakeup = ConditionVar.conditionVar (); |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
112 |
fun wakeup_all () = ConditionVar.broadcast wakeup; |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
113 |
fun wait () = ConditionVar.wait (wakeup, lock); |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
114 |
|
23981 | 115 |
(*the queue of tasks*) |
23973 | 116 |
val queue = ref tasks; |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
117 |
val active = ref 0; |
24119 | 118 |
fun trace_active () = tracing 1 (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active"); |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
119 |
fun dequeue () = |
23973 | 120 |
let |
23981 | 121 |
val (next, tasks') = next_task (! queue); |
23973 | 122 |
val _ = queue := tasks'; |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
123 |
in |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
124 |
if Task.is_running (#1 next) then |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
125 |
(dec active; trace_active (); |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
126 |
wait (); |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
127 |
inc active; trace_active (); |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
128 |
dequeue ()) |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
129 |
else next |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
130 |
end; |
23973 | 131 |
|
132 |
(*worker threads*) |
|
133 |
val running = ref 0; |
|
134 |
val status = ref ([]: exn list); |
|
24066 | 135 |
fun work () = |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
136 |
(case PROTECTED "dequeue" dequeue of |
23981 | 137 |
(Task.Task f, cont) => |
24063 | 138 |
(case Exn.capture f () of |
24066 | 139 |
Exn.Result () => continue cont |
23981 | 140 |
| Exn.Exn exn => |
24066 | 141 |
(PROTECTED "status" (fn () => status := exn :: ! status); continue cont)) |
23981 | 142 |
| (Task.Finished, _) => |
24108 | 143 |
(PROTECTED "running" (fn () => (dec active; dec running; wakeup_all ())))) |
24066 | 144 |
and continue cont = |
24109 | 145 |
(PROTECTED "cont" (fn () => (queue := cont (! queue); wakeup_all ())); work ()); |
23973 | 146 |
|
147 |
(*main control: fork and wait*) |
|
148 |
fun fork 0 = () |
|
149 |
| fork k = |
|
24063 | 150 |
(inc running; inc active; |
24066 | 151 |
Thread.fork (work, [Thread.InterruptState Thread.InterruptDefer]); |
23973 | 152 |
fork (k - 1)); |
24063 | 153 |
val _ = PROTECTED "main" (fn () => |
154 |
(fork (Int.max (n, 1)); |
|
24066 | 155 |
while ! running <> 0 do (trace_active (); wait ()))); |
23973 | 156 |
|
157 |
in ! status end; |
|
158 |
||
23961 | 159 |
end; |
160 |
||
24069 | 161 |
end; |
162 |
||
23991 | 163 |
val NAMED_CRITICAL = Multithreading.NAMED_CRITICAL; |
23961 | 164 |
val CRITICAL = Multithreading.CRITICAL; |