src/Pure/ML-Systems/multithreading_polyml.ML
author wenzelm
Sun Jul 29 17:28:57 2007 +0200 (2007-07-29)
changeset 24060 b643ee118928
parent 23991 d4417ba26706
child 24063 736c03ae92f5
permissions -rw-r--r--
critical: improved diagostics;
schedule: proper broadcast on wakeup condition;
     1 (*  Title:      Pure/ML-Systems/multithreading_polyml.ML
     2     ID:         $Id$
     3     Author:     Makarius
     4 
     5 Multithreading in Poly/ML (version 5.1).
     6 *)
     7 
     8 open Thread;
     9 
    10 structure Multithreading: MULTITHREADING =
    11 struct
    12 
    13 val trace = ref false;
    14 fun tracing msg =
    15   if ! trace
    16   then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
    17   else ();
    18 
    19 val available = true;
    20 val max_threads = ref 1;
    21 
    22 
    23 (* critical section -- may be nested within the same thread *)
    24 
    25 local
    26 
    27 val critical_lock = Mutex.mutex ();
    28 val critical_thread = ref (NONE: Thread.thread option);
    29 val critical_name = ref "";
    30 
    31 fun add_name "" = ""
    32   | add_name name = " " ^ name;
    33 
    34 fun add_name' "" = ""
    35   | add_name' name = " [" ^ name ^ "]";
    36 
    37 in
    38 
    39 fun self_critical () =
    40   (case ! critical_thread of
    41     NONE => false
    42   | SOME id => Thread.equal (id, Thread.self ()));
    43 
    44 fun NAMED_CRITICAL name e =
    45   if self_critical () then e ()
    46   else
    47     let
    48       val _ =
    49         if Mutex.trylock critical_lock then ()
    50         else
    51           let
    52             val timer = Timer.startRealTimer ();
    53             val _ = tracing (fn () =>
    54               "CRITICAL" ^ add_name name ^ add_name' (! critical_name) ^ ": waiting for lock");
    55             val _ = Mutex.lock critical_lock;
    56             val _ = tracing (fn () =>
    57               "CRITICAL" ^ add_name name ^ add_name' (! critical_name) ^ ": obtained lock after " ^
    58               Time.toString (Timer.checkRealTimer timer));
    59           in () end;
    60       val _ = critical_thread := SOME (Thread.self ());
    61       val _ = critical_name := name;
    62       val result = Exn.capture e ();
    63       val _ = critical_name := "";
    64       val _ = critical_thread := NONE;
    65       val _ = Mutex.unlock critical_lock;
    66     in Exn.release result end;
    67 
    68 fun CRITICAL e = NAMED_CRITICAL "" e;
    69 
    70 end;
    71 
    72 
    73 (* scheduling -- non-interruptible threads working on a queue of tasks *)
    74 
    75 fun schedule n next_task tasks =
    76   let
    77     (*protected execution*)
    78     val lock = Mutex.mutex ();
    79     fun PROTECTED k e =
    80       let
    81         val _ =
    82           if Mutex.trylock lock then ()
    83           else
    84            (tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": waiting for lock");
    85             Mutex.lock lock;
    86             tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": obtained lock"));
    87         val res = Exn.capture e ();
    88         val _ = Mutex.unlock lock;
    89       in Exn.release res end;
    90 
    91     (*the queue of tasks*)
    92     val queue = ref tasks;
    93     fun dequeue k = PROTECTED k (fn () =>
    94       let
    95         val (next, tasks') = next_task (! queue);
    96         val _ = queue := tasks';
    97       in next end);
    98 
    99     (*worker threads*)
   100     val running = ref 0;
   101     val status = ref ([]: exn list);
   102     val wakeup = ConditionVar.conditionVar ();
   103     fun wait () = ConditionVar.wait (wakeup, lock);
   104     fun continue cont k =
   105       (PROTECTED k (fn () => queue := cont (! queue)); ConditionVar.broadcast wakeup; work k ())
   106     and work k () =
   107       (case dequeue k of
   108         (Task.Task f, cont) =>
   109           (tracing (fn () => "TASK " ^ Int.toString k);
   110            case Exn.capture f () of
   111             Exn.Result () => continue cont k
   112           | Exn.Exn exn =>
   113               (PROTECTED k (fn () => status := exn :: ! status); continue cont k))
   114       | (Task.Running, _) =>
   115           (tracing (fn () => "WAITING " ^ Int.toString k);
   116            PROTECTED k wait; work k ())
   117       | (Task.Finished, _) =>
   118          (tracing (fn () => "TERMINATING " ^ Int.toString k);
   119           PROTECTED k (fn () => running := ! running - 1);
   120           ConditionVar.broadcast wakeup));
   121 
   122     (*main control: fork and wait*)
   123     fun fork 0 = ()
   124       | fork k =
   125          (running := ! running + 1;
   126           Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
   127           fork (k - 1));
   128     val _ = PROTECTED 0 (fn () =>
   129      (fork (Int.max (n, 1)); while ! running <> 0 do (tracing (fn () => "MAIN WAIT"); wait ())));
   130 
   131   in ! status end;
   132 
   133 end;
   134 
   135 val NAMED_CRITICAL = Multithreading.NAMED_CRITICAL;
   136 val CRITICAL = Multithreading.CRITICAL;