src/Pure/ML-Systems/multithreading_polyml.ML
author wenzelm
Fri Aug 03 16:28:23 2007 +0200 (2007-08-03)
changeset 24144 ec51a0f7eefe
parent 24119 06965b38c5e9
child 24208 f4cafbaa05e4
permissions -rw-r--r--
tuned tracing;
     1 (*  Title:      Pure/ML-Systems/multithreading_polyml.ML
     2     ID:         $Id$
     3     Author:     Makarius
     4 
     5 Multithreading in Poly/ML (version 5.1).
     6 *)
     7 
     8 open Thread;
     9 
    10 structure Multithreading: MULTITHREADING =
    11 struct
    12 
    13 (* options *)
    14 
    15 val trace = ref 0;
    16 fun tracing level msg =
    17   if level <= ! trace
    18   then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
    19   else ();
    20 
    21 val available = true;
    22 val max_threads = ref 1;
    23 
    24 
    25 (* misc utils *)
    26 
    27 fun show "" = ""
    28   | show name = " " ^ name;
    29 
    30 fun show' "" = ""
    31   | show' name = " [" ^ name ^ "]";
    32 
    33 fun inc i = (i := ! i + 1; ! i);
    34 fun dec i = (i := ! i - 1; ! i);
    35 
    36 
    37 (* critical section -- may be nested within the same thread *)
    38 
    39 local
    40 
    41 val critical_lock = Mutex.mutex ();
    42 val critical_thread = ref (NONE: Thread.thread option);
    43 val critical_name = ref "";
    44 
    45 in
    46 
    47 fun self_critical () =
    48   (case ! critical_thread of
    49     NONE => false
    50   | SOME id => Thread.equal (id, Thread.self ()));
    51 
    52 fun NAMED_CRITICAL name e =
    53   if self_critical () then e ()
    54   else
    55     let
    56       val name' = ! critical_name;
    57       val _ =
    58         if Mutex.trylock critical_lock then ()
    59         else
    60           let
    61             val timer = Timer.startRealTimer ();
    62             val _ = tracing 4 (fn () => "CRITICAL" ^ show name ^ show' name' ^ ": waiting");
    63             val _ = Mutex.lock critical_lock;
    64             val time = Timer.checkRealTimer timer;
    65             val _ = tracing (if Time.> (time, Time.fromMilliseconds 10) then 3 else 4) (fn () =>
    66               "CRITICAL" ^ show name ^ show' name' ^ ": passed after " ^ Time.toString time);
    67           in () end;
    68       val _ = critical_thread := SOME (Thread.self ());
    69       val _ = critical_name := name;
    70       val result = Exn.capture e ();
    71       val _ = critical_name := "";
    72       val _ = critical_thread := NONE;
    73       val _ = Mutex.unlock critical_lock;
    74     in Exn.release result end;
    75 
    76 fun CRITICAL e = NAMED_CRITICAL "" e;
    77 
    78 end;
    79 
    80 
    81 (* scheduling -- non-interruptible threads working on a queue of tasks *)
    82 
    83 local
    84 
    85 val protected_name = ref "";
    86 
    87 in
    88 
    89 fun schedule n next_task tasks =
    90   let
    91     (*protected execution*)
    92     val lock = Mutex.mutex ();
    93     fun PROTECTED name e =
    94       let
    95         val name' = ! protected_name;
    96         val _ =
    97           if Mutex.trylock lock then ()
    98           else
    99             let
   100               val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": waiting");
   101               val _ = Mutex.lock lock;
   102               val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": passed");
   103             in () end;
   104         val _ = protected_name := name;
   105         val res = Exn.capture e ();
   106         val _ = protected_name := "";
   107         val _ = Mutex.unlock lock;
   108       in Exn.release res end;
   109 
   110     (*wakeup condition*)
   111     val wakeup = ConditionVar.conditionVar ();
   112     fun wakeup_all () = ConditionVar.broadcast wakeup;
   113     fun wait () = ConditionVar.wait (wakeup, lock);
   114 
   115     (*the queue of tasks*)
   116     val queue = ref tasks;
   117     val active = ref 0;
   118     fun trace_active () = tracing 1 (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active");
   119     fun dequeue () =
   120       let
   121         val (next, tasks') = next_task (! queue);
   122         val _ = queue := tasks';
   123       in
   124         if Task.is_running (#1 next) then
   125          (dec active; trace_active ();
   126           wait ();
   127           inc active; trace_active ();
   128           dequeue ())
   129         else next
   130       end;
   131 
   132     (*worker threads*)
   133     val running = ref 0;
   134     val status = ref ([]: exn list);
   135     fun work () =
   136       (case PROTECTED "dequeue" dequeue of
   137         (Task.Task f, cont) =>
   138           (case Exn.capture f () of
   139             Exn.Result () => continue cont
   140           | Exn.Exn exn =>
   141               (PROTECTED "status" (fn () => status := exn :: ! status); continue cont))
   142       | (Task.Finished, _) =>
   143          (PROTECTED "running" (fn () => (dec active; dec running; wakeup_all ()))))
   144     and continue cont =
   145       (PROTECTED "cont" (fn () => (queue := cont (! queue); wakeup_all ())); work ());
   146 
   147     (*main control: fork and wait*)
   148     fun fork 0 = ()
   149       | fork k =
   150          (inc running; inc active;
   151           Thread.fork (work, [Thread.InterruptState Thread.InterruptDefer]);
   152           fork (k - 1));
   153     val _ = PROTECTED "main" (fn () =>
   154      (fork (Int.max (n, 1));
   155       while ! running <> 0 do (trace_active (); wait ())));
   156 
   157   in ! status end;
   158 
   159 end;
   160 
   161 end;
   162 
   163 val NAMED_CRITICAL = Multithreading.NAMED_CRITICAL;
   164 val CRITICAL = Multithreading.CRITICAL;