moved Multithreading.task/schedule to Concurrent/schedule.ML;
authorwenzelm
Thu Sep 04 16:03:47 2008 +0200 (2008-09-04 ago)
changeset 2812410a1f1f4c6ae
parent 28123 53cd972d7db9
child 28125 e99427bcf7f3
moved Multithreading.task/schedule to Concurrent/schedule.ML;
src/Pure/ML-Systems/multithreading_polyml.ML
     1.1 --- a/src/Pure/ML-Systems/multithreading_polyml.ML	Thu Sep 04 16:03:46 2008 +0200
     1.2 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Thu Sep 04 16:03:47 2008 +0200
     1.3 @@ -2,11 +2,9 @@
     1.4      ID:         $Id$
     1.5      Author:     Makarius
     1.6  
     1.7 -Multithreading in Poly/ML 5.1 (cf. polyml/basis/Thread.sml).
     1.8 +Multithreading in Poly/ML 5.1, 5.2 (cf. polyml/basis/Thread.sml).
     1.9  *)
    1.10  
    1.11 -open Thread;
    1.12 -
    1.13  signature MULTITHREADING_POLYML =
    1.14  sig
    1.15    val interruptible: ('a -> 'b) -> 'a -> 'b
    1.16 @@ -50,13 +48,6 @@
    1.17  
    1.18  (* misc utils *)
    1.19  
    1.20 -fun cons x xs = x :: xs;
    1.21 -
    1.22 -fun change r f = r := f (! r);
    1.23 -
    1.24 -fun inc i = (i := ! i + 1; ! i);
    1.25 -fun dec i = (i := ! i - 1; ! i);
    1.26 -
    1.27  fun show "" = "" | show name = " " ^ name;
    1.28  fun show' "" = "" | show' name = " [" ^ name ^ "]";
    1.29  
    1.30 @@ -238,93 +229,6 @@
    1.31  end;
    1.32  
    1.33  
    1.34 -(* scheduling -- multiple threads working on a queue of tasks *)
    1.35 -
    1.36 -datatype 'a task =
    1.37 -  Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
    1.38 -
    1.39 -fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks =>
    1.40 -  let
    1.41 -    (*protected execution*)
    1.42 -    val lock = Mutex.mutex ();
    1.43 -    val protected_name = ref "";
    1.44 -    fun PROTECTED name e =
    1.45 -      let
    1.46 -        val name' = ! protected_name;
    1.47 -        val _ =
    1.48 -          if Mutex.trylock lock then ()
    1.49 -          else
    1.50 -            let
    1.51 -              val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": waiting");
    1.52 -              val _ = Mutex.lock lock;
    1.53 -              val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": passed");
    1.54 -            in () end;
    1.55 -        val _ = protected_name := name;
    1.56 -        val res = Exn.capture e ();
    1.57 -        val _ = protected_name := "";
    1.58 -        val _ = Mutex.unlock lock;
    1.59 -      in Exn.release res end;
    1.60 -
    1.61 -    (*wakeup condition*)
    1.62 -    val wakeup = ConditionVar.conditionVar ();
    1.63 -    fun wakeup_all () = ConditionVar.broadcast wakeup;
    1.64 -    fun wait () = ConditionVar.wait (wakeup, lock);
    1.65 -    fun wait_timeout () = ConditionVar.waitUntil (wakeup, lock, Time.now () + Time.fromSeconds 1);
    1.66 -
    1.67 -    (*queue of tasks*)
    1.68 -    val queue = ref tasks;
    1.69 -    val active = ref 0;
    1.70 -    fun trace_active () = tracing 1 (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active");
    1.71 -    fun dequeue () =
    1.72 -      let
    1.73 -        val (next, tasks') = next_task (! queue);
    1.74 -        val _ = queue := tasks';
    1.75 -      in
    1.76 -        (case next of Wait =>
    1.77 -          (dec active; trace_active ();
    1.78 -            wait ();
    1.79 -            inc active; trace_active ();
    1.80 -            dequeue ())
    1.81 -        | _ => next)
    1.82 -      end;
    1.83 -
    1.84 -    (*pool of running threads*)
    1.85 -    val status = ref ([]: exn list);
    1.86 -    val running = ref ([]: Thread.thread list);
    1.87 -    fun start f =
    1.88 -      (inc active;
    1.89 -       change running (cons (Thread.fork (f, [Thread.InterruptState Thread.InterruptDefer]))));
    1.90 -    fun stop () =
    1.91 -      (dec active;
    1.92 -       change running (List.filter (fn t => not (Thread.equal (t, Thread.self ())))));
    1.93 -
    1.94 -   (*worker thread*)
    1.95 -    fun worker () =
    1.96 -      (case PROTECTED "dequeue" dequeue of
    1.97 -        Task {body, cont, fail} =>
    1.98 -          (case Exn.capture (restore_attributes body) () of
    1.99 -            Exn.Result () =>
   1.100 -              (PROTECTED "cont" (fn () => (change queue cont; wakeup_all ())); worker ())
   1.101 -          | Exn.Exn exn =>
   1.102 -              PROTECTED "fail" (fn () =>
   1.103 -                (change status (cons exn); change queue fail; stop (); wakeup_all ())))
   1.104 -      | Terminate => PROTECTED "terminate" (fn () => (stop (); wakeup_all ())));
   1.105 -
   1.106 -    (*main control: fork and wait*)
   1.107 -    fun fork 0 = ()
   1.108 -      | fork k = (start worker; fork (k - 1));
   1.109 -    val _ = PROTECTED "main" (fn () =>
   1.110 -     (fork (Int.max (n, 1));
   1.111 -      while not (List.null (! running)) do
   1.112 -      (trace_active ();
   1.113 -       if not (List.null (! status))
   1.114 -       then (List.app (fn t => Thread.interrupt t handle Thread _ => ()) (! running))
   1.115 -       else ();
   1.116 -       wait_timeout ())));
   1.117 -
   1.118 -  in ! status end);
   1.119 -
   1.120 -
   1.121  (* profiling *)
   1.122  
   1.123  local val profile_orig = profile in
   1.124 @@ -347,18 +251,13 @@
   1.125  val serial = uninterruptible (fn _ => fn () =>
   1.126    let
   1.127      val _ = Mutex.lock serial_lock;
   1.128 -    val res = inc serial_count;
   1.129 +    val _ = serial_count := ! serial_count + 1;
   1.130 +    val res = ! serial_count;
   1.131      val _ = Mutex.unlock serial_lock;
   1.132    in res end);
   1.133  
   1.134  end;
   1.135  
   1.136 -
   1.137 -(* thread data *)
   1.138 -
   1.139 -val get_data = Thread.getLocal;
   1.140 -val put_data = Thread.setLocal;
   1.141 -
   1.142  end;
   1.143  
   1.144  structure BasicMultithreading: BASIC_MULTITHREADING = Multithreading;