src/Pure/ML-Systems/multithreading_polyml.ML
author wenzelm
Tue Jul 24 22:53:48 2007 +0200 (2007-07-24)
changeset 23973 b6ce6de5b700
parent 23961 9e7e1e309ebd
child 23981 03b71bf91318
permissions -rw-r--r--
renamed number_of_threads to max_threads;
added schedule operator;
wenzelm@23961
     1
(*  Title:      Pure/ML-Systems/multithreading_polyml.ML
wenzelm@23961
     2
    ID:         $Id$
wenzelm@23961
     3
    Author:     Makarius
wenzelm@23961
     4
wenzelm@23961
     5
Multithreading in Poly/ML (version 5.1).
wenzelm@23961
     6
*)
wenzelm@23961
     7
wenzelm@23961
     8
open Thread;
wenzelm@23961
     9
wenzelm@23961
    10
structure Multithreading: MULTITHREADING =
wenzelm@23961
    11
struct
wenzelm@23961
    12
wenzelm@23961
    13
(* FIXME tmp *)
wenzelm@23961
    14
fun message s =
wenzelm@23961
    15
  (TextIO.output (TextIO.stdErr, (">>> " ^ s ^ "\n")); TextIO.flushOut TextIO.stdErr);
wenzelm@23961
    16
wenzelm@23961
    17
wenzelm@23973
    18
val max_threads = ref 1;
wenzelm@23973
    19
wenzelm@23973
    20
wenzelm@23961
    21
(* critical section -- may be nested within the same thread *)
wenzelm@23961
    22
wenzelm@23961
    23
local
wenzelm@23961
    24
wenzelm@23961
    25
val critical_lock = Mutex.mutex ();
wenzelm@23961
    26
val critical_thread = ref (NONE: Thread.thread option);
wenzelm@23961
    27
wenzelm@23961
    28
in
wenzelm@23961
    29
wenzelm@23961
    30
fun self_critical () =
wenzelm@23961
    31
  (case ! critical_thread of
wenzelm@23961
    32
    NONE => false
wenzelm@23961
    33
  | SOME id => Thread.equal (id, Thread.self ()));
wenzelm@23961
    34
wenzelm@23961
    35
fun CRITICAL e =
wenzelm@23961
    36
  if self_critical () then e ()
wenzelm@23961
    37
  else
wenzelm@23961
    38
    let
wenzelm@23961
    39
      val _ =
wenzelm@23961
    40
        if Mutex.trylock critical_lock then ()
wenzelm@23961
    41
        else
wenzelm@23973
    42
          (message "CRITICAL: waiting for lock";
wenzelm@23961
    43
           Mutex.lock critical_lock;
wenzelm@23973
    44
           message "CRITICAL: obtained lock");
wenzelm@23961
    45
      val _ = critical_thread := SOME (Thread.self ());
wenzelm@23961
    46
      val result = Exn.capture e ();
wenzelm@23961
    47
      val _ = critical_thread := NONE;
wenzelm@23961
    48
      val _ = Mutex.unlock critical_lock;
wenzelm@23961
    49
    in Exn.release result end;
wenzelm@23961
    50
wenzelm@23961
    51
end;
wenzelm@23961
    52
wenzelm@23973
    53
wenzelm@23973
    54
(* scheduling -- non-interruptible threads working on a queue of tasks *)
wenzelm@23973
    55
wenzelm@23973
    56
fun schedule n next_task tasks =
wenzelm@23973
    57
  let
wenzelm@23973
    58
    (*protected execution*)
wenzelm@23973
    59
    val lock = Mutex.mutex ();
wenzelm@23973
    60
    fun PROTECTED e =
wenzelm@23973
    61
      let
wenzelm@23973
    62
        val _ = Mutex.lock lock;
wenzelm@23973
    63
        val res = Exn.capture e ();
wenzelm@23973
    64
        val _ = Mutex.unlock lock;
wenzelm@23973
    65
      in Exn.release res end;
wenzelm@23973
    66
wenzelm@23973
    67
    (*queue of tasks*)
wenzelm@23973
    68
    val queue = ref tasks;
wenzelm@23973
    69
    fun dequeue () = PROTECTED (fn () =>
wenzelm@23973
    70
      let
wenzelm@23973
    71
        val (task, tasks') = next_task (! queue);
wenzelm@23973
    72
        val _ = queue := tasks';
wenzelm@23973
    73
      in task end);
wenzelm@23973
    74
wenzelm@23973
    75
    (*worker threads*)
wenzelm@23973
    76
    val running = ref 0;
wenzelm@23973
    77
    val status = ref ([]: exn list);
wenzelm@23973
    78
    val finished = ConditionVar.conditionVar ();
wenzelm@23973
    79
    fun work k () =
wenzelm@23973
    80
      (message ("WORKER THREAD " ^ Int.toString k);
wenzelm@23973
    81
       case dequeue () of
wenzelm@23973
    82
        SOME f =>
wenzelm@23973
    83
          (case Exn.capture f () of
wenzelm@23973
    84
            Exn.Result () => work k ()
wenzelm@23973
    85
          | Exn.Exn exn => (PROTECTED (fn () => status := exn :: ! status); work k ()))
wenzelm@23973
    86
      | NONE =>
wenzelm@23973
    87
         (PROTECTED (fn () => running := ! running - 1);
wenzelm@23973
    88
          ConditionVar.signal finished));
wenzelm@23973
    89
wenzelm@23973
    90
    (*main control: fork and wait*)
wenzelm@23973
    91
    fun fork 0 = ()
wenzelm@23973
    92
      | fork k =
wenzelm@23973
    93
         (running := ! running + 1;
wenzelm@23973
    94
          Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
wenzelm@23973
    95
          fork (k - 1));
wenzelm@23973
    96
    val _ = PROTECTED (fn () =>
wenzelm@23973
    97
     (fork (Int.max (n, 1));
wenzelm@23973
    98
      while ! running <> 0 do ConditionVar.wait (finished, lock)));
wenzelm@23973
    99
wenzelm@23973
   100
  in ! status end;
wenzelm@23973
   101
wenzelm@23961
   102
end;
wenzelm@23961
   103
wenzelm@23961
   104
val CRITICAL = Multithreading.CRITICAL;