# HG changeset patch # User wenzelm # Date 1185816147 -7200 # Node ID 8b9e5d776ef3534ce897939fd69143eb166706ba # Parent 82873bc360c220868fec0e344c8c54072c934db1 dequeue: wait loop while PROTECTED -- avoids race condition; diff -r 82873bc360c2 -r 8b9e5d776ef3 src/Pure/ML-Systems/multithreading_polyml.ML --- a/src/Pure/ML-Systems/multithreading_polyml.ML Mon Jul 30 11:12:28 2007 +0200 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML Mon Jul 30 19:22:27 2007 +0200 @@ -10,8 +10,7 @@ structure Multithreading: MULTITHREADING = struct - -(* flags *) +(* options *) val trace = ref false; fun tracing msg = @@ -105,40 +104,43 @@ val _ = Mutex.unlock lock; in Exn.release res end; + (*wakeup condition*) + val wakeup = ConditionVar.conditionVar (); + fun wakeup_all () = ConditionVar.broadcast wakeup; + fun wait () = ConditionVar.wait (wakeup, lock); + (*the queue of tasks*) val queue = ref tasks; - fun dequeue () = PROTECTED "dequeue" (fn () => + val active = ref 0; + fun trace_active () = tracing (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active"); + fun dequeue () = let val (next, tasks') = next_task (! queue); val _ = queue := tasks'; - in next end); + in + if Task.is_running (#1 next) then + (dec active; trace_active (); + wait (); + inc active; trace_active (); + dequeue ()) + else next + end; (*worker threads*) val running = ref 0; - val active = ref 0; - fun trace_active () = tracing (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active"); val status = ref ([]: exn list); - val wakeup = ConditionVar.conditionVar (); - fun wait () = ConditionVar.wait (wakeup, lock); fun work () = - (case dequeue () of + (case PROTECTED "dequeue" dequeue of (Task.Task f, cont) => (case Exn.capture f () of Exn.Result () => continue cont | Exn.Exn exn => (PROTECTED "status" (fn () => status := exn :: ! status); continue cont)) - | (Task.Running, _) => - (PROTECTED "wait" (fn () => - (dec active; trace_active (); - wait (); - inc active; trace_active ())); - work ()) | (Task.Finished, _) => (PROTECTED "running" (fn () => (dec active; dec running)); - ConditionVar.broadcast wakeup)) + wakeup_all ())) and continue cont = - (PROTECTED "cont" (fn () => queue := cont (! queue)); - ConditionVar.broadcast wakeup; work ()); + (PROTECTED "cont" (fn () => queue := cont (! queue)); wakeup_all; work ()); (*main control: fork and wait*) fun fork 0 = ()