src/Pure/ML-Systems/multithreading_polyml.ML
author wenzelm
Wed Aug 01 16:50:16 2007 +0200 (2007-08-01)
changeset 24109 952efb77cf91
parent 24108 24e5587603b4
child 24119 06965b38c5e9
permissions -rw-r--r--
oops -- fixed syntax;
     1 (*  Title:      Pure/ML-Systems/multithreading_polyml.ML
     2     ID:         $Id$
     3     Author:     Makarius
     4 
     5 Multithreading in Poly/ML (version 5.1).
     6 *)
     7 
     8 open Thread;
     9 
    10 structure Multithreading: MULTITHREADING =
    11 struct
    12 
    13 (* options *)
    14 
    15 val trace = ref false;
    16 fun tracing msg =
    17   if ! trace
    18   then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
    19   else ();
    20 
    21 val available = true;
    22 val max_threads = ref 1;
    23 
    24 
    25 (* misc utils *)
    26 
    27 fun show "" = ""
    28   | show name = " " ^ name;
    29 
    30 fun show' "" = ""
    31   | show' name = " [" ^ name ^ "]";
    32 
    33 fun inc i = (i := ! i + 1; ! i);
    34 fun dec i = (i := ! i - 1; ! i);
    35 
    36 
    37 (* critical section -- may be nested within the same thread *)
    38 
    39 local
    40 
    41 val critical_lock = Mutex.mutex ();
    42 val critical_thread = ref (NONE: Thread.thread option);
    43 val critical_name = ref "";
    44 
    45 in
    46 
    47 fun self_critical () =
    48   (case ! critical_thread of
    49     NONE => false
    50   | SOME id => Thread.equal (id, Thread.self ()));
    51 
    52 fun NAMED_CRITICAL name e =
    53   if self_critical () then e ()
    54   else
    55     let
    56       val _ =
    57         if Mutex.trylock critical_lock then ()
    58         else
    59           let
    60             val timer = Timer.startRealTimer ();
    61             val _ = tracing (fn () =>
    62               "CRITICAL" ^ show name ^ show' (! critical_name) ^ ": waiting");
    63             val _ = Mutex.lock critical_lock;
    64             val _ = tracing (fn () =>
    65               "CRITICAL" ^ show name ^ show' (! critical_name) ^ ": passed after " ^
    66               Time.toString (Timer.checkRealTimer timer));
    67           in () end;
    68       val _ = critical_thread := SOME (Thread.self ());
    69       val _ = critical_name := name;
    70       val result = Exn.capture e ();
    71       val _ = critical_name := "";
    72       val _ = critical_thread := NONE;
    73       val _ = Mutex.unlock critical_lock;
    74     in Exn.release result end;
    75 
    76 fun CRITICAL e = NAMED_CRITICAL "" e;
    77 
    78 end;
    79 
    80 
    81 (* scheduling -- non-interruptible threads working on a queue of tasks *)
    82 
    83 local
    84 
    85 val protected_name = ref "";
    86 
    87 in
    88 
    89 fun schedule n next_task tasks =
    90   let
    91     (*protected execution*)
    92     val lock = Mutex.mutex ();
    93     fun PROTECTED name e =
    94       let
    95         val _ =
    96           if Mutex.trylock lock then ()
    97           else
    98            (tracing (fn () => "PROTECTED" ^ show name ^ show' (! protected_name) ^ ": waiting");
    99             Mutex.lock lock;
   100             tracing (fn () => "PROTECTED" ^ show name ^ show' (! protected_name) ^ ": passed"));
   101         val _ = protected_name := name;
   102         val res = Exn.capture e ();
   103         val _ = protected_name := "";
   104         val _ = Mutex.unlock lock;
   105       in Exn.release res end;
   106 
   107     (*wakeup condition*)
   108     val wakeup = ConditionVar.conditionVar ();
   109     fun wakeup_all () = ConditionVar.broadcast wakeup;
   110     fun wait () = ConditionVar.wait (wakeup, lock);
   111 
   112     (*the queue of tasks*)
   113     val queue = ref tasks;
   114     val active = ref 0;
   115     fun trace_active () = tracing (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active");
   116     fun dequeue () =
   117       let
   118         val (next, tasks') = next_task (! queue);
   119         val _ = queue := tasks';
   120       in
   121         if Task.is_running (#1 next) then
   122          (dec active; trace_active ();
   123           wait ();
   124           inc active; trace_active ();
   125           dequeue ())
   126         else next
   127       end;
   128 
   129     (*worker threads*)
   130     val running = ref 0;
   131     val status = ref ([]: exn list);
   132     fun work () =
   133       (case PROTECTED "dequeue" dequeue of
   134         (Task.Task f, cont) =>
   135           (case Exn.capture f () of
   136             Exn.Result () => continue cont
   137           | Exn.Exn exn =>
   138               (PROTECTED "status" (fn () => status := exn :: ! status); continue cont))
   139       | (Task.Finished, _) =>
   140          (PROTECTED "running" (fn () => (dec active; dec running; wakeup_all ()))))
   141     and continue cont =
   142       (PROTECTED "cont" (fn () => (queue := cont (! queue); wakeup_all ())); work ());
   143 
   144     (*main control: fork and wait*)
   145     fun fork 0 = ()
   146       | fork k =
   147          (inc running; inc active;
   148           Thread.fork (work, [Thread.InterruptState Thread.InterruptDefer]);
   149           fork (k - 1));
   150     val _ = PROTECTED "main" (fn () =>
   151      (fork (Int.max (n, 1));
   152       while ! running <> 0 do (trace_active (); wait ())));
   153 
   154   in ! status end;
   155 
   156 end;
   157 
   158 end;
   159 
   160 val NAMED_CRITICAL = Multithreading.NAMED_CRITICAL;
   161 val CRITICAL = Multithreading.CRITICAL;