added Par_List in Scala, in accordance to ML version;
system property "isabelle.threads" determines size of Scala thread pool, like system option "threads" for ML;
avoid ".par" framework with its hard-wired thread pool, which also has problems with cancellation;
tuned;
--- a/etc/settings Thu Dec 11 15:24:28 2014 +0100
+++ b/etc/settings Thu Dec 11 23:31:30 2014 +0100
@@ -14,7 +14,7 @@
ISABELLE_SCALA_BUILD_OPTIONS="-encoding UTF-8 -nowarn -target:jvm-1.7 -Xmax-classfile-name 130"
-ISABELLE_JAVA_SYSTEM_OPTIONS="-Dfile.encoding=UTF-8 -server"
+ISABELLE_JAVA_SYSTEM_OPTIONS="-server -Dfile.encoding=UTF-8 -Disabelle.threads=0"
classpath "$ISABELLE_HOME/lib/classes/Pure.jar"
--- a/src/Pure/Concurrent/par_list.ML Thu Dec 11 15:24:28 2014 +0100
+++ b/src/Pure/Concurrent/par_list.ML Thu Dec 11 23:31:30 2014 +0100
@@ -56,21 +56,20 @@
fun get_some f xs =
let
- exception FOUND of 'b option;
- fun found (Exn.Exn (FOUND some)) = some
- | found _ = NONE;
+ exception FOUND of 'b;
val results =
managed_results "Par_List.get_some"
- (fn x => (case f x of NONE => () | some => raise FOUND some)) xs;
+ (fn x => (case f x of NONE => () | SOME y => raise FOUND y)) xs;
in
- (case get_first found results of
- SOME y => SOME y
- | NONE => (Par_Exn.release_first results; NONE))
+ (case get_first (fn Exn.Exn (FOUND res) => SOME res | _ => NONE) results of
+ NONE => (Par_Exn.release_first results; NONE)
+ | some => some)
end;
fun find_some P = get_some (fn x => if P x then SOME x else NONE);
fun exists P = is_some o get_some (fn x => if P x then SOME () else NONE);
+
fun forall P = not o exists (not o P);
end;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/Concurrent/par_list.scala Thu Dec 11 23:31:30 2014 +0100
@@ -0,0 +1,70 @@
+/* Title: Pure/Concurrent/par_list.scala
+ Author: Makarius
+
+Parallel list combinators.
+*/
+
+
+package isabelle
+
+
+import java.util.concurrent.{Future => JFuture, CancellationException}
+
+
+object Par_List
+{
+ def managed_results[A, B](f: A => B, xs: List[A]): List[Exn.Result[B]] =
+ if (xs.isEmpty || xs.tail.isEmpty) xs.map(x => Exn.capture { f(x) })
+ else {
+ val state = Synchronized((List.empty[JFuture[Exn.Result[B]]], false))
+
+ def cancel_other(self: Int = -1): Unit =
+ state.change { case (tasks, canceled) =>
+ if (!canceled) {
+ for ((task, i) <- tasks.iterator.zipWithIndex if i != self)
+ task.cancel(true)
+ }
+ (tasks, true)
+ }
+
+ try {
+ state.change(_ =>
+ (xs.iterator.zipWithIndex.map({ case (x, self) =>
+ Simple_Thread.submit_task {
+ val result = Exn.capture { f(x) }
+ result match { case Exn.Exn(_) => cancel_other(self) case _ => }
+ result
+ }
+ }).toList, false))
+
+ state.value._1.map(future =>
+ try { future.get }
+ catch { case _: CancellationException => Exn.Exn(Exn.Interrupt()): Exn.Result[B] })
+ }
+ finally { cancel_other() }
+ }
+
+ def map[A, B](f: A => B, xs: List[A]): List[B] =
+ Exn.release_first(managed_results(f, xs))
+
+ def get_some[A, B](f: A => Option[B], xs: List[A]): Option[B] =
+ {
+ class Found(val res: B) extends Exception
+ val results =
+ managed_results(
+ (x: A) => f(x) match { case None => () case Some(y) => throw new Found(y) }, xs)
+ results.collectFirst({ case Exn.Exn(found: Found) => found.res }) match {
+ case None => Exn.release_first(results); None
+ case some => some
+ }
+ }
+
+ def find_some[A](P: A => Boolean, xs: List[A]): Option[A] =
+ get_some((x: A) => if (P(x)) Some(x) else None, xs)
+
+ def exists[A](P: A => Boolean, xs: List[A]): Boolean =
+ get_some((x: A) => if (P(x)) Some(()) else None, xs).isDefined
+
+ def forall[A](P: A => Boolean, xs: List[A]): Boolean = !exists((x: A) => !P(x), xs)
+}
+
--- a/src/Pure/Concurrent/simple_thread.scala Thu Dec 11 15:24:28 2014 +0100
+++ b/src/Pure/Concurrent/simple_thread.scala Thu Dec 11 23:31:30 2014 +0100
@@ -9,9 +9,8 @@
import java.lang.Thread
-import java.util.concurrent.{Callable, Future => JFuture}
-
-import scala.collection.parallel.ForkJoinTasks
+import java.util.concurrent.{Callable, Future => JFuture, ThreadPoolExecutor,
+ TimeUnit, LinkedBlockingQueue}
object Simple_Thread
@@ -41,7 +40,12 @@
/* thread pool */
- lazy val default_pool = ForkJoinTasks.defaultForkJoinPool
+ lazy val default_pool =
+ {
+ val m = Properties.Value.Int.unapply(System.getProperty("isabelle.threads", "0")) getOrElse 0
+ val n = if (m > 0) m else (Runtime.getRuntime.availableProcessors max 1) min 8
+ new ThreadPoolExecutor(n, n, 2500L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable])
+ }
def submit_task[A](body: => A): JFuture[A] =
default_pool.submit(new Callable[A] { def call = body })
--- a/src/Pure/General/exn.scala Thu Dec 11 15:24:28 2014 +0100
+++ b/src/Pure/General/exn.scala Thu Dec 11 23:31:30 2014 +0100
@@ -26,6 +26,15 @@
case Exn(exn) => throw exn
}
+ def release_first[A](results: List[Result[A]]): List[A] =
+ if (results.forall({ case Res(_) => true case _ => false }))
+ results.map(release(_))
+ else
+ results.find({ case Exn(exn) => !is_interrupt(exn) case _ => false }) match {
+ case Some(Exn(exn)) => throw exn
+ case _ => results match { case Exn(exn) :: _ => throw exn case _ => ??? }
+ }
+
/* interrupts */
--- a/src/Pure/ML-Systems/multithreading_polyml.ML Thu Dec 11 15:24:28 2014 +0100
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML Thu Dec 11 23:31:30 2014 +0100
@@ -80,8 +80,7 @@
val available = true;
fun max_threads_result m =
- if m > 0 then m
- else Int.min (Int.max (Thread.numProcessors (), 1), 8);
+ if m > 0 then m else Int.min (Int.max (Thread.numProcessors (), 1), 8);
val max_threads = ref 1;
--- a/src/Pure/Thy/thy_info.scala Thu Dec 11 15:24:28 2014 +0100
+++ b/src/Pure/Thy/thy_info.scala Thu Dec 11 23:31:30 2014 +0100
@@ -85,13 +85,11 @@
def loaded_files: List[Path] =
{
val dep_files =
- rev_deps.par.map(dep =>
- Exn.capture {
- dep.loaded_files(syntax).map(a => Path.explode(dep.name.master_dir) + Path.explode(a))
- }).toList
- ((Nil: List[Path]) /: dep_files) {
- case (acc_files, files) => Exn.release(files) ::: acc_files
- }
+ Par_List.map(
+ (dep: Dep) =>
+ dep.loaded_files(syntax).map(a => Path.explode(dep.name.master_dir) + Path.explode(a)),
+ rev_deps)
+ ((Nil: List[Path]) /: dep_files) { case (acc_files, files) => files ::: acc_files }
}
}
--- a/src/Pure/Tools/build.scala Thu Dec 11 15:24:28 2014 +0100
+++ b/src/Pure/Tools/build.scala Thu Dec 11 23:31:30 2014 +0100
@@ -345,9 +345,7 @@
val graph = tree.graph
val sessions = graph.keys
- val timings =
- sessions.par.map((name: String) =>
- Exn.capture { (name, load_timings(name)) }).toList.map(Exn.release(_))
+ val timings = Par_List.map((name: String) => (name, load_timings(name)), sessions)
val command_timings =
Map(timings.map({ case (name, (ts, _)) => (name, ts) }): _*).withDefaultValue(Nil)
val session_timing =
--- a/src/Pure/build-jars Thu Dec 11 15:24:28 2014 +0100
+++ b/src/Pure/build-jars Thu Dec 11 23:31:30 2014 +0100
@@ -14,6 +14,7 @@
Concurrent/event_timer.scala
Concurrent/future.scala
Concurrent/mailbox.scala
+ Concurrent/par_list.scala
Concurrent/simple_thread.scala
Concurrent/synchronized.scala
GUI/color_value.scala