src/Pure/Concurrent/schedule.ML
author wenzelm
Wed, 10 Dec 2008 22:05:58 +0100
changeset 29055 edaef19665e6
parent 28543 637f2808ab64
permissions -rw-r--r--
merged

(*  Title:      Pure/Concurrent/schedule.ML
    ID:         $Id$
    Author:     Makarius

Scheduling -- multiple threads working on a queue of tasks.
*)

signature SCHEDULE =
sig
  datatype 'a task =
    Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
  val schedule: int -> ('a -> 'a task * 'a) -> 'a -> exn list
end;

structure Schedule: SCHEDULE =
struct

datatype 'a task =
  Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;

fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks =>
  let
    (*synchronized execution*)
    val lock = Mutex.mutex ();
    fun SYNCHRONIZED e =
      let
        val _ = Mutex.lock lock;
        val res = Exn.capture e ();
        val _ = Mutex.unlock lock;
      in Exn.release res end;

    (*wakeup condition*)
    val wakeup = ConditionVar.conditionVar ();
    fun wakeup_all () = ConditionVar.broadcast wakeup;
    fun wait () = ConditionVar.wait (wakeup, lock);
    fun wait_timeout () =
      ConditionVar.waitUntil (wakeup, lock, Time.+ (Time.now (), Time.fromSeconds 1));

    (*queue of tasks*)
    val queue = ref tasks;
    val active = ref 0;
    fun trace_active () = Multithreading.tracing 1 (fn () =>
      "SCHEDULE: " ^ string_of_int (! active) ^ " active");
    fun dequeue () =
      (case change_result queue next_task of
        Wait =>
          (dec active; trace_active ();
            wait ();
            inc active; trace_active ();
            dequeue ())
      | next => next);

    (*pool of running threads*)
    val status = ref ([]: exn list);
    val running = ref ([]: Thread.thread list);
    fun start f = (inc active; change running (cons (SimpleThread.fork false f)));
    fun stop () = (dec active; change running (remove Thread.equal (Thread.self ())));

    (*worker thread*)
    fun worker () =
      (case SYNCHRONIZED dequeue of
        Task {body, cont, fail} =>
          (case Exn.capture (restore_attributes body) () of
            Exn.Result () =>
              (SYNCHRONIZED (fn () => (change queue cont; wakeup_all ())); worker ())
          | Exn.Exn exn =>
              SYNCHRONIZED (fn () =>
                (change status (cons exn); change queue fail; stop (); wakeup_all ())))
      | Terminate => SYNCHRONIZED (fn () => (stop (); wakeup_all ())));

    (*main control: fork and wait*)
    fun fork 0 = ()
      | fork k = (start worker; fork (k - 1));
    val _ = SYNCHRONIZED (fn () =>
     (fork (Int.max (n, 1));
      while not (null (! running)) do
      (trace_active ();
       if not (null (! status))
       then (List.app SimpleThread.interrupt (! running))
       else ();
       wait_timeout ())));

  in ! status end);

end;