src/Pure/ML-Systems/multithreading_polyml.ML
author wenzelm
Wed Jul 25 22:20:53 2007 +0200 (2007-07-25)
changeset 23991 d4417ba26706
parent 23981 03b71bf91318
child 24060 b643ee118928
permissions -rw-r--r--
renamed CRITICAL' to NAMED_CRITICAL;
tuned messages;
     1 (*  Title:      Pure/ML-Systems/multithreading_polyml.ML
     2     ID:         $Id$
     3     Author:     Makarius
     4 
     5 Multithreading in Poly/ML (version 5.1).
     6 *)
     7 
     8 open Thread;
     9 
    10 structure Multithreading: MULTITHREADING =
    11 struct
    12 
    13 val trace = ref false;
    14 fun tracing msg =
    15   if ! trace
    16   then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
    17   else ();
    18 
    19 val available = true;
    20 val max_threads = ref 1;
    21 
    22 
    23 (* critical section -- may be nested within the same thread *)
    24 
    25 local
    26 
    27 val critical_lock = Mutex.mutex ();
    28 val critical_thread = ref (NONE: Thread.thread option);
    29 
    30 in
    31 
    32 fun self_critical () =
    33   (case ! critical_thread of
    34     NONE => false
    35   | SOME id => Thread.equal (id, Thread.self ()));
    36 
    37 fun NAMED_CRITICAL name e =
    38   if self_critical () then e ()
    39   else
    40     let
    41       val _ =
    42         if Mutex.trylock critical_lock then ()
    43         else
    44          (tracing (fn () =>
    45             "CRITICAL" ^ (if name = "" then "" else " " ^ name) ^ ": waiting for lock");
    46           Mutex.lock critical_lock;
    47           tracing (fn () =>
    48             "CRITICAL" ^ (if name = "" then "" else " " ^ name) ^ ": obtained lock"));
    49       val _ = critical_thread := SOME (Thread.self ());
    50       val result = Exn.capture e ();
    51       val _ = critical_thread := NONE;
    52       val _ = Mutex.unlock critical_lock;
    53     in Exn.release result end;
    54 
    55 fun CRITICAL e = NAMED_CRITICAL "" e;
    56 
    57 end;
    58 
    59 
    60 (* scheduling -- non-interruptible threads working on a queue of tasks *)
    61 
    62 fun schedule n next_task tasks =
    63   let
    64     (*protected execution*)
    65     val lock = Mutex.mutex ();
    66     fun PROTECTED k e =
    67       let
    68         val _ =
    69           if Mutex.trylock lock then ()
    70           else
    71            (tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": waiting for lock");
    72             Mutex.lock lock;
    73             tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": obtained lock"));
    74         val res = Exn.capture e ();
    75         val _ = Mutex.unlock lock;
    76       in Exn.release res end;
    77 
    78     (*the queue of tasks*)
    79     val queue = ref tasks;
    80     fun dequeue k = PROTECTED k (fn () =>
    81       let
    82         val (next, tasks') = next_task (! queue);
    83         val _ = queue := tasks';
    84       in next end);
    85 
    86     (*worker threads*)
    87     val running = ref 0;
    88     val status = ref ([]: exn list);
    89     val finished = ConditionVar.conditionVar ();
    90     fun wait () = ConditionVar.waitUntil (finished, lock, Time.fromMilliseconds 500);
    91     fun continue cont k =
    92       (PROTECTED k (fn () => queue := cont (! queue)); ConditionVar.signal finished; work k ())
    93     and work k () =
    94       (case dequeue k of
    95         (Task.Task f, cont) =>
    96           (tracing (fn () => "TASK " ^ Int.toString k);
    97            case Exn.capture f () of
    98             Exn.Result () => continue cont k
    99           | Exn.Exn exn =>
   100               (PROTECTED k (fn () => status := exn :: ! status); continue cont k))
   101       | (Task.Running, _) =>
   102           (tracing (fn () => "WAITING " ^ Int.toString k);
   103            PROTECTED k wait; work k ())
   104       | (Task.Finished, _) =>
   105          (tracing (fn () => "TERMINATING " ^ Int.toString k);
   106           PROTECTED k (fn () => running := ! running - 1);
   107           ConditionVar.signal finished));
   108 
   109     (*main control: fork and wait*)
   110     fun fork 0 = ()
   111       | fork k =
   112          (running := ! running + 1;
   113           Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
   114           fork (k - 1));
   115     val _ = PROTECTED 0 (fn () =>
   116      (fork (Int.max (n, 1)); while ! running <> 0 do (tracing (fn () => "MAIN WAIT"); wait ())));
   117 
   118   in ! status end;
   119 
   120 end;
   121 
   122 val NAMED_CRITICAL = Multithreading.NAMED_CRITICAL;
   123 val CRITICAL = Multithreading.CRITICAL;