renamed number_of_threads to max_threads;
authorwenzelm
Tue Jul 24 22:53:48 2007 +0200 (2007-07-24)
changeset 23973b6ce6de5b700
parent 23972 9c418fa38f7e
child 23974 16ecf0a5a6bb
renamed number_of_threads to max_threads;
added schedule operator;
src/Pure/ML-Systems/multithreading_dummy.ML
src/Pure/ML-Systems/multithreading_polyml.ML
     1.1 --- a/src/Pure/ML-Systems/multithreading_dummy.ML	Tue Jul 24 22:53:46 2007 +0200
     1.2 +++ b/src/Pure/ML-Systems/multithreading_dummy.ML	Tue Jul 24 22:53:48 2007 +0200
     1.3 @@ -7,19 +7,22 @@
     1.4  
     1.5  signature MULTITHREADING =
     1.6  sig
     1.7 -  val number_of_threads: int ref
     1.8 +  val max_threads: int ref
     1.9    val self_critical: unit -> bool
    1.10    val CRITICAL: (unit -> 'a) -> 'a
    1.11 +  val schedule: int -> ('a -> (unit -> unit) option * 'a) -> 'a -> exn list
    1.12  end;
    1.13  
    1.14  structure Multithreading: MULTITHREADING =
    1.15  struct
    1.16  
    1.17 -val number_of_threads = ref 0;
    1.18 +val max_threads = ref 1;
    1.19  
    1.20  fun self_critical () = false;
    1.21  fun CRITICAL e = e ();
    1.22  
    1.23 +fun schedule _ _ _ = raise Fail ("No multithreading support for " ^ ml_system);
    1.24 +
    1.25  end;
    1.26  
    1.27  val CRITICAL = Multithreading.CRITICAL;
     2.1 --- a/src/Pure/ML-Systems/multithreading_polyml.ML	Tue Jul 24 22:53:46 2007 +0200
     2.2 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Tue Jul 24 22:53:48 2007 +0200
     2.3 @@ -10,14 +10,14 @@
     2.4  structure Multithreading: MULTITHREADING =
     2.5  struct
     2.6  
     2.7 -val number_of_threads = ref 0;
     2.8 -
     2.9 -
    2.10  (* FIXME tmp *)
    2.11  fun message s =
    2.12    (TextIO.output (TextIO.stdErr, (">>> " ^ s ^ "\n")); TextIO.flushOut TextIO.stdErr);
    2.13  
    2.14  
    2.15 +val max_threads = ref 1;
    2.16 +
    2.17 +
    2.18  (* critical section -- may be nested within the same thread *)
    2.19  
    2.20  local
    2.21 @@ -39,9 +39,9 @@
    2.22        val _ =
    2.23          if Mutex.trylock critical_lock then ()
    2.24          else
    2.25 -          (message "Waiting for critical lock";
    2.26 +          (message "CRITICAL: waiting for lock";
    2.27             Mutex.lock critical_lock;
    2.28 -           message "Obtained critical lock");
    2.29 +           message "CRITICAL: obtained lock");
    2.30        val _ = critical_thread := SOME (Thread.self ());
    2.31        val result = Exn.capture e ();
    2.32        val _ = critical_thread := NONE;
    2.33 @@ -50,6 +50,55 @@
    2.34  
    2.35  end;
    2.36  
    2.37 +
    2.38 +(* scheduling -- non-interruptible threads working on a queue of tasks *)
    2.39 +
    2.40 +fun schedule n next_task tasks =
    2.41 +  let
    2.42 +    (*protected execution*)
    2.43 +    val lock = Mutex.mutex ();
    2.44 +    fun PROTECTED e =
    2.45 +      let
    2.46 +        val _ = Mutex.lock lock;
    2.47 +        val res = Exn.capture e ();
    2.48 +        val _ = Mutex.unlock lock;
    2.49 +      in Exn.release res end;
    2.50 +
    2.51 +    (*queue of tasks*)
    2.52 +    val queue = ref tasks;
    2.53 +    fun dequeue () = PROTECTED (fn () =>
    2.54 +      let
    2.55 +        val (task, tasks') = next_task (! queue);
    2.56 +        val _ = queue := tasks';
    2.57 +      in task end);
    2.58 +
    2.59 +    (*worker threads*)
    2.60 +    val running = ref 0;
    2.61 +    val status = ref ([]: exn list);
    2.62 +    val finished = ConditionVar.conditionVar ();
    2.63 +    fun work k () =
    2.64 +      (message ("WORKER THREAD " ^ Int.toString k);
    2.65 +       case dequeue () of
    2.66 +        SOME f =>
    2.67 +          (case Exn.capture f () of
    2.68 +            Exn.Result () => work k ()
    2.69 +          | Exn.Exn exn => (PROTECTED (fn () => status := exn :: ! status); work k ()))
    2.70 +      | NONE =>
    2.71 +         (PROTECTED (fn () => running := ! running - 1);
    2.72 +          ConditionVar.signal finished));
    2.73 +
    2.74 +    (*main control: fork and wait*)
    2.75 +    fun fork 0 = ()
    2.76 +      | fork k =
    2.77 +         (running := ! running + 1;
    2.78 +          Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
    2.79 +          fork (k - 1));
    2.80 +    val _ = PROTECTED (fn () =>
    2.81 +     (fork (Int.max (n, 1));
    2.82 +      while ! running <> 0 do ConditionVar.wait (finished, lock)));
    2.83 +
    2.84 +  in ! status end;
    2.85 +
    2.86  end;
    2.87  
    2.88  val CRITICAL = Multithreading.CRITICAL;