# HG changeset patch # User wenzelm # Date 1220537027 -7200 # Node ID 10a1f1f4c6ae00975deb9198924c50397263d0f2 # Parent 53cd972d7db9797ee048f9ef59abe015ffa01450 moved Multithreading.task/schedule to Concurrent/schedule.ML; diff -r 53cd972d7db9 -r 10a1f1f4c6ae src/Pure/ML-Systems/multithreading_polyml.ML --- a/src/Pure/ML-Systems/multithreading_polyml.ML Thu Sep 04 16:03:46 2008 +0200 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML Thu Sep 04 16:03:47 2008 +0200 @@ -2,11 +2,9 @@ ID: $Id$ Author: Makarius -Multithreading in Poly/ML 5.1 (cf. polyml/basis/Thread.sml). +Multithreading in Poly/ML 5.1, 5.2 (cf. polyml/basis/Thread.sml). *) -open Thread; - signature MULTITHREADING_POLYML = sig val interruptible: ('a -> 'b) -> 'a -> 'b @@ -50,13 +48,6 @@ (* misc utils *) -fun cons x xs = x :: xs; - -fun change r f = r := f (! r); - -fun inc i = (i := ! i + 1; ! i); -fun dec i = (i := ! i - 1; ! i); - fun show "" = "" | show name = " " ^ name; fun show' "" = "" | show' name = " [" ^ name ^ "]"; @@ -238,93 +229,6 @@ end; -(* scheduling -- multiple threads working on a queue of tasks *) - -datatype 'a task = - Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate; - -fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks => - let - (*protected execution*) - val lock = Mutex.mutex (); - val protected_name = ref ""; - fun PROTECTED name e = - let - val name' = ! protected_name; - val _ = - if Mutex.trylock lock then () - else - let - val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": waiting"); - val _ = Mutex.lock lock; - val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": passed"); - in () end; - val _ = protected_name := name; - val res = Exn.capture e (); - val _ = protected_name := ""; - val _ = Mutex.unlock lock; - in Exn.release res end; - - (*wakeup condition*) - val wakeup = ConditionVar.conditionVar (); - fun wakeup_all () = ConditionVar.broadcast wakeup; - fun wait () = ConditionVar.wait (wakeup, lock); - fun wait_timeout () = ConditionVar.waitUntil (wakeup, lock, Time.now () + Time.fromSeconds 1); - - (*queue of tasks*) - val queue = ref tasks; - val active = ref 0; - fun trace_active () = tracing 1 (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active"); - fun dequeue () = - let - val (next, tasks') = next_task (! queue); - val _ = queue := tasks'; - in - (case next of Wait => - (dec active; trace_active (); - wait (); - inc active; trace_active (); - dequeue ()) - | _ => next) - end; - - (*pool of running threads*) - val status = ref ([]: exn list); - val running = ref ([]: Thread.thread list); - fun start f = - (inc active; - change running (cons (Thread.fork (f, [Thread.InterruptState Thread.InterruptDefer])))); - fun stop () = - (dec active; - change running (List.filter (fn t => not (Thread.equal (t, Thread.self ()))))); - - (*worker thread*) - fun worker () = - (case PROTECTED "dequeue" dequeue of - Task {body, cont, fail} => - (case Exn.capture (restore_attributes body) () of - Exn.Result () => - (PROTECTED "cont" (fn () => (change queue cont; wakeup_all ())); worker ()) - | Exn.Exn exn => - PROTECTED "fail" (fn () => - (change status (cons exn); change queue fail; stop (); wakeup_all ()))) - | Terminate => PROTECTED "terminate" (fn () => (stop (); wakeup_all ()))); - - (*main control: fork and wait*) - fun fork 0 = () - | fork k = (start worker; fork (k - 1)); - val _ = PROTECTED "main" (fn () => - (fork (Int.max (n, 1)); - while not (List.null (! running)) do - (trace_active (); - if not (List.null (! status)) - then (List.app (fn t => Thread.interrupt t handle Thread _ => ()) (! running)) - else (); - wait_timeout ()))); - - in ! status end); - - (* profiling *) local val profile_orig = profile in @@ -347,18 +251,13 @@ val serial = uninterruptible (fn _ => fn () => let val _ = Mutex.lock serial_lock; - val res = inc serial_count; + val _ = serial_count := ! serial_count + 1; + val res = ! serial_count; val _ = Mutex.unlock serial_lock; in res end); end; - -(* thread data *) - -val get_data = Thread.getLocal; -val put_data = Thread.setLocal; - end; structure BasicMultithreading: BASIC_MULTITHREADING = Multithreading;