added trace flag, official tracing operation;
added named CRITICAL';
schedule: tuned signature, actually observe dependencies on running tasks;
--- a/src/Pure/ML-Systems/multithreading_polyml.ML Wed Jul 25 17:05:48 2007 +0200
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML Wed Jul 25 17:05:49 2007 +0200
@@ -10,11 +10,13 @@
structure Multithreading: MULTITHREADING =
struct
-(* FIXME tmp *)
-fun message s =
- (TextIO.output (TextIO.stdErr, (">>> " ^ s ^ "\n")); TextIO.flushOut TextIO.stdErr);
+val trace = ref false;
+fun tracing msg =
+ if ! trace
+ then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
+ else ();
-
+val available = true;
val max_threads = ref 1;
@@ -32,22 +34,24 @@
NONE => false
| SOME id => Thread.equal (id, Thread.self ()));
-fun CRITICAL e =
+fun CRITICAL' name e =
if self_critical () then e ()
else
let
val _ =
if Mutex.trylock critical_lock then ()
else
- (message "CRITICAL: waiting for lock";
- Mutex.lock critical_lock;
- message "CRITICAL: obtained lock");
+ (tracing (fn () => "CRITICAL " ^ name ^ ": waiting for lock");
+ Mutex.lock critical_lock;
+ tracing (fn () => "CRITICAL " ^ name ^ ": obtained lock"));
val _ = critical_thread := SOME (Thread.self ());
val result = Exn.capture e ();
val _ = critical_thread := NONE;
val _ = Mutex.unlock critical_lock;
in Exn.release result end;
+fun CRITICAL e = CRITICAL' "" e;
+
end;
@@ -57,34 +61,47 @@
let
(*protected execution*)
val lock = Mutex.mutex ();
- fun PROTECTED e =
+ fun PROTECTED k e =
let
- val _ = Mutex.lock lock;
+ val _ =
+ if Mutex.trylock lock then ()
+ else
+ (tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": waiting for lock");
+ Mutex.lock lock;
+ tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": obtained lock"));
val res = Exn.capture e ();
val _ = Mutex.unlock lock;
in Exn.release res end;
- (*queue of tasks*)
+ (*the queue of tasks*)
val queue = ref tasks;
- fun dequeue () = PROTECTED (fn () =>
+ fun dequeue k = PROTECTED k (fn () =>
let
- val (task, tasks') = next_task (! queue);
+ val (next, tasks') = next_task (! queue);
val _ = queue := tasks';
- in task end);
+ in next end);
(*worker threads*)
val running = ref 0;
val status = ref ([]: exn list);
val finished = ConditionVar.conditionVar ();
- fun work k () =
- (message ("WORKER THREAD " ^ Int.toString k);
- case dequeue () of
- SOME f =>
- (case Exn.capture f () of
- Exn.Result () => work k ()
- | Exn.Exn exn => (PROTECTED (fn () => status := exn :: ! status); work k ()))
- | NONE =>
- (PROTECTED (fn () => running := ! running - 1);
+ fun wait () = ConditionVar.waitUntil (finished, lock, Time.fromMilliseconds 500);
+ fun continue cont k =
+ (PROTECTED k (fn () => queue := cont (! queue)); ConditionVar.signal finished; work k ())
+ and work k () =
+ (case dequeue k of
+ (Task.Task f, cont) =>
+ (tracing (fn () => "TASK " ^ Int.toString k);
+ case Exn.capture f () of
+ Exn.Result () => continue cont k
+ | Exn.Exn exn =>
+ (PROTECTED k (fn () => status := exn :: ! status); continue cont k))
+ | (Task.Running, _) =>
+ (tracing (fn () => "WAITING " ^ Int.toString k);
+ PROTECTED k wait; work k ())
+ | (Task.Finished, _) =>
+ (tracing (fn () => "TERMINATING " ^ Int.toString k);
+ PROTECTED k (fn () => running := ! running - 1);
ConditionVar.signal finished));
(*main control: fork and wait*)
@@ -93,12 +110,12 @@
(running := ! running + 1;
Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
fork (k - 1));
- val _ = PROTECTED (fn () =>
- (fork (Int.max (n, 1));
- while ! running <> 0 do ConditionVar.wait (finished, lock)));
+ val _ = PROTECTED 0 (fn () =>
+ (fork (Int.max (n, 1)); while ! running <> 0 do (tracing (fn () => "MAIN WAIT"); wait ())));
in ! status end;
end;
+val CRITICAL' = Multithreading.CRITICAL';
val CRITICAL = Multithreading.CRITICAL;