--- a/src/Pure/ML-Systems/multithreading_dummy.ML Tue Jul 24 22:53:46 2007 +0200
+++ b/src/Pure/ML-Systems/multithreading_dummy.ML Tue Jul 24 22:53:48 2007 +0200
@@ -7,19 +7,22 @@
signature MULTITHREADING =
sig
- val number_of_threads: int ref
+ val max_threads: int ref
val self_critical: unit -> bool
val CRITICAL: (unit -> 'a) -> 'a
+ val schedule: int -> ('a -> (unit -> unit) option * 'a) -> 'a -> exn list
end;
structure Multithreading: MULTITHREADING =
struct
-val number_of_threads = ref 0;
+val max_threads = ref 1;
fun self_critical () = false;
fun CRITICAL e = e ();
+fun schedule _ _ _ = raise Fail ("No multithreading support for " ^ ml_system);
+
end;
val CRITICAL = Multithreading.CRITICAL;
--- a/src/Pure/ML-Systems/multithreading_polyml.ML Tue Jul 24 22:53:46 2007 +0200
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML Tue Jul 24 22:53:48 2007 +0200
@@ -10,14 +10,14 @@
structure Multithreading: MULTITHREADING =
struct
-val number_of_threads = ref 0;
-
-
(* FIXME tmp *)
fun message s =
(TextIO.output (TextIO.stdErr, (">>> " ^ s ^ "\n")); TextIO.flushOut TextIO.stdErr);
+val max_threads = ref 1;
+
+
(* critical section -- may be nested within the same thread *)
local
@@ -39,9 +39,9 @@
val _ =
if Mutex.trylock critical_lock then ()
else
- (message "Waiting for critical lock";
+ (message "CRITICAL: waiting for lock";
Mutex.lock critical_lock;
- message "Obtained critical lock");
+ message "CRITICAL: obtained lock");
val _ = critical_thread := SOME (Thread.self ());
val result = Exn.capture e ();
val _ = critical_thread := NONE;
@@ -50,6 +50,55 @@
end;
+
+(* scheduling -- non-interruptible threads working on a queue of tasks *)
+
+fun schedule n next_task tasks =
+ let
+ (*protected execution*)
+ val lock = Mutex.mutex ();
+ fun PROTECTED e =
+ let
+ val _ = Mutex.lock lock;
+ val res = Exn.capture e ();
+ val _ = Mutex.unlock lock;
+ in Exn.release res end;
+
+ (*queue of tasks*)
+ val queue = ref tasks;
+ fun dequeue () = PROTECTED (fn () =>
+ let
+ val (task, tasks') = next_task (! queue);
+ val _ = queue := tasks';
+ in task end);
+
+ (*worker threads*)
+ val running = ref 0;
+ val status = ref ([]: exn list);
+ val finished = ConditionVar.conditionVar ();
+ fun work k () =
+ (message ("WORKER THREAD " ^ Int.toString k);
+ case dequeue () of
+ SOME f =>
+ (case Exn.capture f () of
+ Exn.Result () => work k ()
+ | Exn.Exn exn => (PROTECTED (fn () => status := exn :: ! status); work k ()))
+ | NONE =>
+ (PROTECTED (fn () => running := ! running - 1);
+ ConditionVar.signal finished));
+
+ (*main control: fork and wait*)
+ fun fork 0 = ()
+ | fork k =
+ (running := ! running + 1;
+ Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
+ fork (k - 1));
+ val _ = PROTECTED (fn () =>
+ (fork (Int.max (n, 1));
+ while ! running <> 0 do ConditionVar.wait (finished, lock)));
+
+ in ! status end;
+
end;
val CRITICAL = Multithreading.CRITICAL;