added trace flag, official tracing operation;
authorwenzelm
Wed Jul 25 17:05:49 2007 +0200 (2007-07-25)
changeset 2398103b71bf91318
parent 23980 d35dc9344515
child 23982 e3c4c0b9ae05
added trace flag, official tracing operation;
added named CRITICAL';
schedule: tuned signature, actually observe dependencies on running tasks;
src/Pure/ML-Systems/multithreading_polyml.ML
     1.1 --- a/src/Pure/ML-Systems/multithreading_polyml.ML	Wed Jul 25 17:05:48 2007 +0200
     1.2 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Wed Jul 25 17:05:49 2007 +0200
     1.3 @@ -10,11 +10,13 @@
     1.4  structure Multithreading: MULTITHREADING =
     1.5  struct
     1.6  
     1.7 -(* FIXME tmp *)
     1.8 -fun message s =
     1.9 -  (TextIO.output (TextIO.stdErr, (">>> " ^ s ^ "\n")); TextIO.flushOut TextIO.stdErr);
    1.10 +val trace = ref false;
    1.11 +fun tracing msg =
    1.12 +  if ! trace
    1.13 +  then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
    1.14 +  else ();
    1.15  
    1.16 -
    1.17 +val available = true;
    1.18  val max_threads = ref 1;
    1.19  
    1.20  
    1.21 @@ -32,22 +34,24 @@
    1.22      NONE => false
    1.23    | SOME id => Thread.equal (id, Thread.self ()));
    1.24  
    1.25 -fun CRITICAL e =
    1.26 +fun CRITICAL' name e =
    1.27    if self_critical () then e ()
    1.28    else
    1.29      let
    1.30        val _ =
    1.31          if Mutex.trylock critical_lock then ()
    1.32          else
    1.33 -          (message "CRITICAL: waiting for lock";
    1.34 -           Mutex.lock critical_lock;
    1.35 -           message "CRITICAL: obtained lock");
    1.36 +         (tracing (fn () => "CRITICAL " ^ name ^ ": waiting for lock");
    1.37 +          Mutex.lock critical_lock;
    1.38 +          tracing (fn () => "CRITICAL " ^ name ^ ": obtained lock"));
    1.39        val _ = critical_thread := SOME (Thread.self ());
    1.40        val result = Exn.capture e ();
    1.41        val _ = critical_thread := NONE;
    1.42        val _ = Mutex.unlock critical_lock;
    1.43      in Exn.release result end;
    1.44  
    1.45 +fun CRITICAL e = CRITICAL' "" e;
    1.46 +
    1.47  end;
    1.48  
    1.49  
    1.50 @@ -57,34 +61,47 @@
    1.51    let
    1.52      (*protected execution*)
    1.53      val lock = Mutex.mutex ();
    1.54 -    fun PROTECTED e =
    1.55 +    fun PROTECTED k e =
    1.56        let
    1.57 -        val _ = Mutex.lock lock;
    1.58 +        val _ =
    1.59 +          if Mutex.trylock lock then ()
    1.60 +          else
    1.61 +           (tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": waiting for lock");
    1.62 +            Mutex.lock lock;
    1.63 +            tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": obtained lock"));
    1.64          val res = Exn.capture e ();
    1.65          val _ = Mutex.unlock lock;
    1.66        in Exn.release res end;
    1.67  
    1.68 -    (*queue of tasks*)
    1.69 +    (*the queue of tasks*)
    1.70      val queue = ref tasks;
    1.71 -    fun dequeue () = PROTECTED (fn () =>
    1.72 +    fun dequeue k = PROTECTED k (fn () =>
    1.73        let
    1.74 -        val (task, tasks') = next_task (! queue);
    1.75 +        val (next, tasks') = next_task (! queue);
    1.76          val _ = queue := tasks';
    1.77 -      in task end);
    1.78 +      in next end);
    1.79  
    1.80      (*worker threads*)
    1.81      val running = ref 0;
    1.82      val status = ref ([]: exn list);
    1.83      val finished = ConditionVar.conditionVar ();
    1.84 -    fun work k () =
    1.85 -      (message ("WORKER THREAD " ^ Int.toString k);
    1.86 -       case dequeue () of
    1.87 -        SOME f =>
    1.88 -          (case Exn.capture f () of
    1.89 -            Exn.Result () => work k ()
    1.90 -          | Exn.Exn exn => (PROTECTED (fn () => status := exn :: ! status); work k ()))
    1.91 -      | NONE =>
    1.92 -         (PROTECTED (fn () => running := ! running - 1);
    1.93 +    fun wait () = ConditionVar.waitUntil (finished, lock, Time.fromMilliseconds 500);
    1.94 +    fun continue cont k =
    1.95 +      (PROTECTED k (fn () => queue := cont (! queue)); ConditionVar.signal finished; work k ())
    1.96 +    and work k () =
    1.97 +      (case dequeue k of
    1.98 +        (Task.Task f, cont) =>
    1.99 +          (tracing (fn () => "TASK " ^ Int.toString k);
   1.100 +           case Exn.capture f () of
   1.101 +            Exn.Result () => continue cont k
   1.102 +          | Exn.Exn exn =>
   1.103 +              (PROTECTED k (fn () => status := exn :: ! status); continue cont k))
   1.104 +      | (Task.Running, _) =>
   1.105 +          (tracing (fn () => "WAITING " ^ Int.toString k);
   1.106 +           PROTECTED k wait; work k ())
   1.107 +      | (Task.Finished, _) =>
   1.108 +         (tracing (fn () => "TERMINATING " ^ Int.toString k);
   1.109 +          PROTECTED k (fn () => running := ! running - 1);
   1.110            ConditionVar.signal finished));
   1.111  
   1.112      (*main control: fork and wait*)
   1.113 @@ -93,12 +110,12 @@
   1.114           (running := ! running + 1;
   1.115            Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
   1.116            fork (k - 1));
   1.117 -    val _ = PROTECTED (fn () =>
   1.118 -     (fork (Int.max (n, 1));
   1.119 -      while ! running <> 0 do ConditionVar.wait (finished, lock)));
   1.120 +    val _ = PROTECTED 0 (fn () =>
   1.121 +     (fork (Int.max (n, 1)); while ! running <> 0 do (tracing (fn () => "MAIN WAIT"); wait ())));
   1.122  
   1.123    in ! status end;
   1.124  
   1.125  end;
   1.126  
   1.127 +val CRITICAL' = Multithreading.CRITICAL';
   1.128  val CRITICAL = Multithreading.CRITICAL;