--- a/src/Pure/ML-Systems/multithreading_polyml.ML Sun Jul 29 19:46:03 2007 +0200
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML Sun Jul 29 19:46:04 2007 +0200
@@ -24,16 +24,16 @@
local
-val critical_lock = Mutex.mutex ();
-val critical_thread = ref (NONE: Thread.thread option);
-val critical_name = ref "";
-
fun add_name "" = ""
| add_name name = " " ^ name;
fun add_name' "" = ""
| add_name' name = " [" ^ name ^ "]";
+val critical_lock = Mutex.mutex ();
+val critical_thread = ref (NONE: Thread.thread option);
+val critical_name = ref "";
+
in
fun self_critical () =
@@ -72,25 +72,28 @@
(* scheduling -- non-interruptible threads working on a queue of tasks *)
+fun inc i = (i := ! i + 1; ! i);
+fun dec i = (i := ! i - 1; ! i);
+
fun schedule n next_task tasks =
let
(*protected execution*)
val lock = Mutex.mutex ();
- fun PROTECTED k e =
+ fun PROTECTED name e =
let
val _ =
if Mutex.trylock lock then ()
else
- (tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": waiting for lock");
+ (tracing (fn () => "PROTECTED " ^ name ^ ": waiting for lock");
Mutex.lock lock;
- tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": obtained lock"));
+ tracing (fn () => "PROTECTED " ^ name ^ ": obtained lock"));
val res = Exn.capture e ();
val _ = Mutex.unlock lock;
in Exn.release res end;
(*the queue of tasks*)
val queue = ref tasks;
- fun dequeue k = PROTECTED k (fn () =>
+ fun dequeue () = PROTECTED "dequeue" (fn () =>
let
val (next, tasks') = next_task (! queue);
val _ = queue := tasks';
@@ -98,35 +101,37 @@
(*worker threads*)
val running = ref 0;
+ val active = ref 0;
val status = ref ([]: exn list);
val wakeup = ConditionVar.conditionVar ();
fun wait () = ConditionVar.wait (wakeup, lock);
fun continue cont k =
- (PROTECTED k (fn () => queue := cont (! queue)); ConditionVar.broadcast wakeup; work k ())
+ (PROTECTED "cont" (fn () => queue := cont (! queue));
+ ConditionVar.broadcast wakeup; work k ())
and work k () =
- (case dequeue k of
+ (case dequeue () of
(Task.Task f, cont) =>
- (tracing (fn () => "TASK " ^ Int.toString k);
- case Exn.capture f () of
+ (case Exn.capture f () of
Exn.Result () => continue cont k
| Exn.Exn exn =>
- (PROTECTED k (fn () => status := exn :: ! status); continue cont k))
+ (PROTECTED "status" (fn () => status := exn :: ! status); continue cont k))
| (Task.Running, _) =>
- (tracing (fn () => "WAITING " ^ Int.toString k);
- PROTECTED k wait; work k ())
+ (PROTECTED "wait" (fn () => (dec active; wait (); inc active)); work k ())
| (Task.Finished, _) =>
- (tracing (fn () => "TERMINATING " ^ Int.toString k);
- PROTECTED k (fn () => running := ! running - 1);
+ (PROTECTED "running" (fn () => (dec active; dec running));
ConditionVar.broadcast wakeup));
(*main control: fork and wait*)
fun fork 0 = ()
| fork k =
- (running := ! running + 1;
+ (inc running; inc active;
Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
fork (k - 1));
- val _ = PROTECTED 0 (fn () =>
- (fork (Int.max (n, 1)); while ! running <> 0 do (tracing (fn () => "MAIN WAIT"); wait ())));
+ val _ = PROTECTED "main" (fn () =>
+ (fork (Int.max (n, 1));
+ while ! running <> 0 do
+ (tracing (fn () => "MAIN: " ^ Int.toString (! active) ^ " active");
+ wait ())));
in ! status end;