src/Pure/ML-Systems/multithreading_polyml.ML
changeset 24072 8b9e5d776ef3
parent 24069 8a15a04e36f6
child 24108 24e5587603b4
     1.1 --- a/src/Pure/ML-Systems/multithreading_polyml.ML	Mon Jul 30 11:12:28 2007 +0200
     1.2 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Mon Jul 30 19:22:27 2007 +0200
     1.3 @@ -10,8 +10,7 @@
     1.4  structure Multithreading: MULTITHREADING =
     1.5  struct
     1.6  
     1.7 -
     1.8 -(* flags *)
     1.9 +(* options *)
    1.10  
    1.11  val trace = ref false;
    1.12  fun tracing msg =
    1.13 @@ -105,40 +104,43 @@
    1.14          val _ = Mutex.unlock lock;
    1.15        in Exn.release res end;
    1.16  
    1.17 +    (*wakeup condition*)
    1.18 +    val wakeup = ConditionVar.conditionVar ();
    1.19 +    fun wakeup_all () = ConditionVar.broadcast wakeup;
    1.20 +    fun wait () = ConditionVar.wait (wakeup, lock);
    1.21 +
    1.22      (*the queue of tasks*)
    1.23      val queue = ref tasks;
    1.24 -    fun dequeue () = PROTECTED "dequeue" (fn () =>
    1.25 +    val active = ref 0;
    1.26 +    fun trace_active () = tracing (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active");
    1.27 +    fun dequeue () =
    1.28        let
    1.29          val (next, tasks') = next_task (! queue);
    1.30          val _ = queue := tasks';
    1.31 -      in next end);
    1.32 +      in
    1.33 +        if Task.is_running (#1 next) then
    1.34 +         (dec active; trace_active ();
    1.35 +          wait ();
    1.36 +          inc active; trace_active ();
    1.37 +          dequeue ())
    1.38 +        else next
    1.39 +      end;
    1.40  
    1.41      (*worker threads*)
    1.42      val running = ref 0;
    1.43 -    val active = ref 0;
    1.44 -    fun trace_active () = tracing (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active");
    1.45      val status = ref ([]: exn list);
    1.46 -    val wakeup = ConditionVar.conditionVar ();
    1.47 -    fun wait () = ConditionVar.wait (wakeup, lock);
    1.48      fun work () =
    1.49 -      (case dequeue () of
    1.50 +      (case PROTECTED "dequeue" dequeue of
    1.51          (Task.Task f, cont) =>
    1.52            (case Exn.capture f () of
    1.53              Exn.Result () => continue cont
    1.54            | Exn.Exn exn =>
    1.55                (PROTECTED "status" (fn () => status := exn :: ! status); continue cont))
    1.56 -      | (Task.Running, _) =>
    1.57 -          (PROTECTED "wait" (fn () =>
    1.58 -            (dec active; trace_active ();
    1.59 -             wait ();
    1.60 -             inc active; trace_active ()));
    1.61 -           work ())
    1.62        | (Task.Finished, _) =>
    1.63           (PROTECTED "running" (fn () => (dec active; dec running));
    1.64 -          ConditionVar.broadcast wakeup))
    1.65 +          wakeup_all ()))
    1.66      and continue cont =
    1.67 -      (PROTECTED "cont" (fn () => queue := cont (! queue));
    1.68 -       ConditionVar.broadcast wakeup; work ());
    1.69 +      (PROTECTED "cont" (fn () => queue := cont (! queue)); wakeup_all; work ());
    1.70  
    1.71      (*main control: fork and wait*)
    1.72      fun fork 0 = ()