--- a/src/Pure/ML-Systems/multithreading_polyml.ML Thu Sep 04 16:03:46 2008 +0200
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML Thu Sep 04 16:03:47 2008 +0200
@@ -2,11 +2,9 @@
ID: $Id$
Author: Makarius
-Multithreading in Poly/ML 5.1 (cf. polyml/basis/Thread.sml).
+Multithreading in Poly/ML 5.1, 5.2 (cf. polyml/basis/Thread.sml).
*)
-open Thread;
-
signature MULTITHREADING_POLYML =
sig
val interruptible: ('a -> 'b) -> 'a -> 'b
@@ -50,13 +48,6 @@
(* misc utils *)
-fun cons x xs = x :: xs;
-
-fun change r f = r := f (! r);
-
-fun inc i = (i := ! i + 1; ! i);
-fun dec i = (i := ! i - 1; ! i);
-
fun show "" = "" | show name = " " ^ name;
fun show' "" = "" | show' name = " [" ^ name ^ "]";
@@ -238,93 +229,6 @@
end;
-(* scheduling -- multiple threads working on a queue of tasks *)
-
-datatype 'a task =
- Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
-
-fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks =>
- let
- (*protected execution*)
- val lock = Mutex.mutex ();
- val protected_name = ref "";
- fun PROTECTED name e =
- let
- val name' = ! protected_name;
- val _ =
- if Mutex.trylock lock then ()
- else
- let
- val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": waiting");
- val _ = Mutex.lock lock;
- val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": passed");
- in () end;
- val _ = protected_name := name;
- val res = Exn.capture e ();
- val _ = protected_name := "";
- val _ = Mutex.unlock lock;
- in Exn.release res end;
-
- (*wakeup condition*)
- val wakeup = ConditionVar.conditionVar ();
- fun wakeup_all () = ConditionVar.broadcast wakeup;
- fun wait () = ConditionVar.wait (wakeup, lock);
- fun wait_timeout () = ConditionVar.waitUntil (wakeup, lock, Time.now () + Time.fromSeconds 1);
-
- (*queue of tasks*)
- val queue = ref tasks;
- val active = ref 0;
- fun trace_active () = tracing 1 (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active");
- fun dequeue () =
- let
- val (next, tasks') = next_task (! queue);
- val _ = queue := tasks';
- in
- (case next of Wait =>
- (dec active; trace_active ();
- wait ();
- inc active; trace_active ();
- dequeue ())
- | _ => next)
- end;
-
- (*pool of running threads*)
- val status = ref ([]: exn list);
- val running = ref ([]: Thread.thread list);
- fun start f =
- (inc active;
- change running (cons (Thread.fork (f, [Thread.InterruptState Thread.InterruptDefer]))));
- fun stop () =
- (dec active;
- change running (List.filter (fn t => not (Thread.equal (t, Thread.self ())))));
-
- (*worker thread*)
- fun worker () =
- (case PROTECTED "dequeue" dequeue of
- Task {body, cont, fail} =>
- (case Exn.capture (restore_attributes body) () of
- Exn.Result () =>
- (PROTECTED "cont" (fn () => (change queue cont; wakeup_all ())); worker ())
- | Exn.Exn exn =>
- PROTECTED "fail" (fn () =>
- (change status (cons exn); change queue fail; stop (); wakeup_all ())))
- | Terminate => PROTECTED "terminate" (fn () => (stop (); wakeup_all ())));
-
- (*main control: fork and wait*)
- fun fork 0 = ()
- | fork k = (start worker; fork (k - 1));
- val _ = PROTECTED "main" (fn () =>
- (fork (Int.max (n, 1));
- while not (List.null (! running)) do
- (trace_active ();
- if not (List.null (! status))
- then (List.app (fn t => Thread.interrupt t handle Thread _ => ()) (! running))
- else ();
- wait_timeout ())));
-
- in ! status end);
-
-
(* profiling *)
local val profile_orig = profile in
@@ -347,18 +251,13 @@
val serial = uninterruptible (fn _ => fn () =>
let
val _ = Mutex.lock serial_lock;
- val res = inc serial_count;
+ val _ = serial_count := ! serial_count + 1;
+ val res = ! serial_count;
val _ = Mutex.unlock serial_lock;
in res end);
end;
-
-(* thread data *)
-
-val get_data = Thread.getLocal;
-val put_data = Thread.setLocal;
-
end;
structure BasicMultithreading: BASIC_MULTITHREADING = Multithreading;