# HG changeset patch # User wenzelm # Date 1185375949 -7200 # Node ID 03b71bf913184277d8e082b24de9ccc0b97e3291 # Parent d35dc9344515ebf65f5489467b4c429c77c6a627 added trace flag, official tracing operation; added named CRITICAL'; schedule: tuned signature, actually observe dependencies on running tasks; diff -r d35dc9344515 -r 03b71bf91318 src/Pure/ML-Systems/multithreading_polyml.ML --- a/src/Pure/ML-Systems/multithreading_polyml.ML Wed Jul 25 17:05:48 2007 +0200 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML Wed Jul 25 17:05:49 2007 +0200 @@ -10,11 +10,13 @@ structure Multithreading: MULTITHREADING = struct -(* FIXME tmp *) -fun message s = - (TextIO.output (TextIO.stdErr, (">>> " ^ s ^ "\n")); TextIO.flushOut TextIO.stdErr); +val trace = ref false; +fun tracing msg = + if ! trace + then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr) + else (); - +val available = true; val max_threads = ref 1; @@ -32,22 +34,24 @@ NONE => false | SOME id => Thread.equal (id, Thread.self ())); -fun CRITICAL e = +fun CRITICAL' name e = if self_critical () then e () else let val _ = if Mutex.trylock critical_lock then () else - (message "CRITICAL: waiting for lock"; - Mutex.lock critical_lock; - message "CRITICAL: obtained lock"); + (tracing (fn () => "CRITICAL " ^ name ^ ": waiting for lock"); + Mutex.lock critical_lock; + tracing (fn () => "CRITICAL " ^ name ^ ": obtained lock")); val _ = critical_thread := SOME (Thread.self ()); val result = Exn.capture e (); val _ = critical_thread := NONE; val _ = Mutex.unlock critical_lock; in Exn.release result end; +fun CRITICAL e = CRITICAL' "" e; + end; @@ -57,34 +61,47 @@ let (*protected execution*) val lock = Mutex.mutex (); - fun PROTECTED e = + fun PROTECTED k e = let - val _ = Mutex.lock lock; + val _ = + if Mutex.trylock lock then () + else + (tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": waiting for lock"); + Mutex.lock lock; + tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": obtained lock")); val res = Exn.capture e (); val _ = Mutex.unlock lock; in Exn.release res end; - (*queue of tasks*) + (*the queue of tasks*) val queue = ref tasks; - fun dequeue () = PROTECTED (fn () => + fun dequeue k = PROTECTED k (fn () => let - val (task, tasks') = next_task (! queue); + val (next, tasks') = next_task (! queue); val _ = queue := tasks'; - in task end); + in next end); (*worker threads*) val running = ref 0; val status = ref ([]: exn list); val finished = ConditionVar.conditionVar (); - fun work k () = - (message ("WORKER THREAD " ^ Int.toString k); - case dequeue () of - SOME f => - (case Exn.capture f () of - Exn.Result () => work k () - | Exn.Exn exn => (PROTECTED (fn () => status := exn :: ! status); work k ())) - | NONE => - (PROTECTED (fn () => running := ! running - 1); + fun wait () = ConditionVar.waitUntil (finished, lock, Time.fromMilliseconds 500); + fun continue cont k = + (PROTECTED k (fn () => queue := cont (! queue)); ConditionVar.signal finished; work k ()) + and work k () = + (case dequeue k of + (Task.Task f, cont) => + (tracing (fn () => "TASK " ^ Int.toString k); + case Exn.capture f () of + Exn.Result () => continue cont k + | Exn.Exn exn => + (PROTECTED k (fn () => status := exn :: ! status); continue cont k)) + | (Task.Running, _) => + (tracing (fn () => "WAITING " ^ Int.toString k); + PROTECTED k wait; work k ()) + | (Task.Finished, _) => + (tracing (fn () => "TERMINATING " ^ Int.toString k); + PROTECTED k (fn () => running := ! running - 1); ConditionVar.signal finished)); (*main control: fork and wait*) @@ -93,12 +110,12 @@ (running := ! running + 1; Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]); fork (k - 1)); - val _ = PROTECTED (fn () => - (fork (Int.max (n, 1)); - while ! running <> 0 do ConditionVar.wait (finished, lock))); + val _ = PROTECTED 0 (fn () => + (fork (Int.max (n, 1)); while ! running <> 0 do (tracing (fn () => "MAIN WAIT"); wait ()))); in ! status end; end; +val CRITICAL' = Multithreading.CRITICAL'; val CRITICAL = Multithreading.CRITICAL;