src/Pure/ML-Systems/multithreading_polyml.ML
author wenzelm
Tue Jul 24 22:53:48 2007 +0200 (2007-07-24)
changeset 23973 b6ce6de5b700
parent 23961 9e7e1e309ebd
child 23981 03b71bf91318
permissions -rw-r--r--
renamed number_of_threads to max_threads;
added schedule operator;
     1 (*  Title:      Pure/ML-Systems/multithreading_polyml.ML
     2     ID:         $Id$
     3     Author:     Makarius
     4 
     5 Multithreading in Poly/ML (version 5.1).
     6 *)
     7 
     8 open Thread;
     9 
    10 structure Multithreading: MULTITHREADING =
    11 struct
    12 
    13 (* FIXME tmp *)
    14 fun message s =
    15   (TextIO.output (TextIO.stdErr, (">>> " ^ s ^ "\n")); TextIO.flushOut TextIO.stdErr);
    16 
    17 
    18 val max_threads = ref 1;
    19 
    20 
    21 (* critical section -- may be nested within the same thread *)
    22 
    23 local
    24 
    25 val critical_lock = Mutex.mutex ();
    26 val critical_thread = ref (NONE: Thread.thread option);
    27 
    28 in
    29 
    30 fun self_critical () =
    31   (case ! critical_thread of
    32     NONE => false
    33   | SOME id => Thread.equal (id, Thread.self ()));
    34 
    35 fun CRITICAL e =
    36   if self_critical () then e ()
    37   else
    38     let
    39       val _ =
    40         if Mutex.trylock critical_lock then ()
    41         else
    42           (message "CRITICAL: waiting for lock";
    43            Mutex.lock critical_lock;
    44            message "CRITICAL: obtained lock");
    45       val _ = critical_thread := SOME (Thread.self ());
    46       val result = Exn.capture e ();
    47       val _ = critical_thread := NONE;
    48       val _ = Mutex.unlock critical_lock;
    49     in Exn.release result end;
    50 
    51 end;
    52 
    53 
    54 (* scheduling -- non-interruptible threads working on a queue of tasks *)
    55 
    56 fun schedule n next_task tasks =
    57   let
    58     (*protected execution*)
    59     val lock = Mutex.mutex ();
    60     fun PROTECTED e =
    61       let
    62         val _ = Mutex.lock lock;
    63         val res = Exn.capture e ();
    64         val _ = Mutex.unlock lock;
    65       in Exn.release res end;
    66 
    67     (*queue of tasks*)
    68     val queue = ref tasks;
    69     fun dequeue () = PROTECTED (fn () =>
    70       let
    71         val (task, tasks') = next_task (! queue);
    72         val _ = queue := tasks';
    73       in task end);
    74 
    75     (*worker threads*)
    76     val running = ref 0;
    77     val status = ref ([]: exn list);
    78     val finished = ConditionVar.conditionVar ();
    79     fun work k () =
    80       (message ("WORKER THREAD " ^ Int.toString k);
    81        case dequeue () of
    82         SOME f =>
    83           (case Exn.capture f () of
    84             Exn.Result () => work k ()
    85           | Exn.Exn exn => (PROTECTED (fn () => status := exn :: ! status); work k ()))
    86       | NONE =>
    87          (PROTECTED (fn () => running := ! running - 1);
    88           ConditionVar.signal finished));
    89 
    90     (*main control: fork and wait*)
    91     fun fork 0 = ()
    92       | fork k =
    93          (running := ! running + 1;
    94           Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
    95           fork (k - 1));
    96     val _ = PROTECTED (fn () =>
    97      (fork (Int.max (n, 1));
    98       while ! running <> 0 do ConditionVar.wait (finished, lock)));
    99 
   100   in ! status end;
   101 
   102 end;
   103 
   104 val CRITICAL = Multithreading.CRITICAL;