# HG changeset patch # User wenzelm # Date 1185310428 -7200 # Node ID b6ce6de5b700ca319f650cd957f5acb45fe5b057 # Parent 9c418fa38f7e90c1b4514f8cfd28a181dfc31365 renamed number_of_threads to max_threads; added schedule operator; diff -r 9c418fa38f7e -r b6ce6de5b700 src/Pure/ML-Systems/multithreading_dummy.ML --- a/src/Pure/ML-Systems/multithreading_dummy.ML Tue Jul 24 22:53:46 2007 +0200 +++ b/src/Pure/ML-Systems/multithreading_dummy.ML Tue Jul 24 22:53:48 2007 +0200 @@ -7,19 +7,22 @@ signature MULTITHREADING = sig - val number_of_threads: int ref + val max_threads: int ref val self_critical: unit -> bool val CRITICAL: (unit -> 'a) -> 'a + val schedule: int -> ('a -> (unit -> unit) option * 'a) -> 'a -> exn list end; structure Multithreading: MULTITHREADING = struct -val number_of_threads = ref 0; +val max_threads = ref 1; fun self_critical () = false; fun CRITICAL e = e (); +fun schedule _ _ _ = raise Fail ("No multithreading support for " ^ ml_system); + end; val CRITICAL = Multithreading.CRITICAL; diff -r 9c418fa38f7e -r b6ce6de5b700 src/Pure/ML-Systems/multithreading_polyml.ML --- a/src/Pure/ML-Systems/multithreading_polyml.ML Tue Jul 24 22:53:46 2007 +0200 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML Tue Jul 24 22:53:48 2007 +0200 @@ -10,14 +10,14 @@ structure Multithreading: MULTITHREADING = struct -val number_of_threads = ref 0; - - (* FIXME tmp *) fun message s = (TextIO.output (TextIO.stdErr, (">>> " ^ s ^ "\n")); TextIO.flushOut TextIO.stdErr); +val max_threads = ref 1; + + (* critical section -- may be nested within the same thread *) local @@ -39,9 +39,9 @@ val _ = if Mutex.trylock critical_lock then () else - (message "Waiting for critical lock"; + (message "CRITICAL: waiting for lock"; Mutex.lock critical_lock; - message "Obtained critical lock"); + message "CRITICAL: obtained lock"); val _ = critical_thread := SOME (Thread.self ()); val result = Exn.capture e (); val _ = critical_thread := NONE; @@ -50,6 +50,55 @@ end; + +(* scheduling -- non-interruptible threads working on a queue of tasks *) + +fun schedule n next_task tasks = + let + (*protected execution*) + val lock = Mutex.mutex (); + fun PROTECTED e = + let + val _ = Mutex.lock lock; + val res = Exn.capture e (); + val _ = Mutex.unlock lock; + in Exn.release res end; + + (*queue of tasks*) + val queue = ref tasks; + fun dequeue () = PROTECTED (fn () => + let + val (task, tasks') = next_task (! queue); + val _ = queue := tasks'; + in task end); + + (*worker threads*) + val running = ref 0; + val status = ref ([]: exn list); + val finished = ConditionVar.conditionVar (); + fun work k () = + (message ("WORKER THREAD " ^ Int.toString k); + case dequeue () of + SOME f => + (case Exn.capture f () of + Exn.Result () => work k () + | Exn.Exn exn => (PROTECTED (fn () => status := exn :: ! status); work k ())) + | NONE => + (PROTECTED (fn () => running := ! running - 1); + ConditionVar.signal finished)); + + (*main control: fork and wait*) + fun fork 0 = () + | fork k = + (running := ! running + 1; + Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]); + fork (k - 1)); + val _ = PROTECTED (fn () => + (fork (Int.max (n, 1)); + while ! running <> 0 do ConditionVar.wait (finished, lock))); + + in ! status end; + end; val CRITICAL = Multithreading.CRITICAL;