# HG changeset patch # User wenzelm # Date 1418337090 -3600 # Node ID c2b23cb8a6778d58783251c265dc746111aa47b1 # Parent 580de802aafd1a7766c0314f768f790a0cb04f48 added Par_List in Scala, in accordance to ML version; system property "isabelle.threads" determines size of Scala thread pool, like system option "threads" for ML; avoid ".par" framework with its hard-wired thread pool, which also has problems with cancellation; tuned; diff -r 580de802aafd -r c2b23cb8a677 etc/settings --- a/etc/settings Thu Dec 11 15:24:28 2014 +0100 +++ b/etc/settings Thu Dec 11 23:31:30 2014 +0100 @@ -14,7 +14,7 @@ ISABELLE_SCALA_BUILD_OPTIONS="-encoding UTF-8 -nowarn -target:jvm-1.7 -Xmax-classfile-name 130" -ISABELLE_JAVA_SYSTEM_OPTIONS="-Dfile.encoding=UTF-8 -server" +ISABELLE_JAVA_SYSTEM_OPTIONS="-server -Dfile.encoding=UTF-8 -Disabelle.threads=0" classpath "$ISABELLE_HOME/lib/classes/Pure.jar" diff -r 580de802aafd -r c2b23cb8a677 src/Pure/Concurrent/par_list.ML --- a/src/Pure/Concurrent/par_list.ML Thu Dec 11 15:24:28 2014 +0100 +++ b/src/Pure/Concurrent/par_list.ML Thu Dec 11 23:31:30 2014 +0100 @@ -56,21 +56,20 @@ fun get_some f xs = let - exception FOUND of 'b option; - fun found (Exn.Exn (FOUND some)) = some - | found _ = NONE; + exception FOUND of 'b; val results = managed_results "Par_List.get_some" - (fn x => (case f x of NONE => () | some => raise FOUND some)) xs; + (fn x => (case f x of NONE => () | SOME y => raise FOUND y)) xs; in - (case get_first found results of - SOME y => SOME y - | NONE => (Par_Exn.release_first results; NONE)) + (case get_first (fn Exn.Exn (FOUND res) => SOME res | _ => NONE) results of + NONE => (Par_Exn.release_first results; NONE) + | some => some) end; fun find_some P = get_some (fn x => if P x then SOME x else NONE); fun exists P = is_some o get_some (fn x => if P x then SOME () else NONE); + fun forall P = not o exists (not o P); end; diff -r 580de802aafd -r c2b23cb8a677 src/Pure/Concurrent/par_list.scala --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/Pure/Concurrent/par_list.scala Thu Dec 11 23:31:30 2014 +0100 @@ -0,0 +1,70 @@ +/* Title: Pure/Concurrent/par_list.scala + Author: Makarius + +Parallel list combinators. +*/ + + +package isabelle + + +import java.util.concurrent.{Future => JFuture, CancellationException} + + +object Par_List +{ + def managed_results[A, B](f: A => B, xs: List[A]): List[Exn.Result[B]] = + if (xs.isEmpty || xs.tail.isEmpty) xs.map(x => Exn.capture { f(x) }) + else { + val state = Synchronized((List.empty[JFuture[Exn.Result[B]]], false)) + + def cancel_other(self: Int = -1): Unit = + state.change { case (tasks, canceled) => + if (!canceled) { + for ((task, i) <- tasks.iterator.zipWithIndex if i != self) + task.cancel(true) + } + (tasks, true) + } + + try { + state.change(_ => + (xs.iterator.zipWithIndex.map({ case (x, self) => + Simple_Thread.submit_task { + val result = Exn.capture { f(x) } + result match { case Exn.Exn(_) => cancel_other(self) case _ => } + result + } + }).toList, false)) + + state.value._1.map(future => + try { future.get } + catch { case _: CancellationException => Exn.Exn(Exn.Interrupt()): Exn.Result[B] }) + } + finally { cancel_other() } + } + + def map[A, B](f: A => B, xs: List[A]): List[B] = + Exn.release_first(managed_results(f, xs)) + + def get_some[A, B](f: A => Option[B], xs: List[A]): Option[B] = + { + class Found(val res: B) extends Exception + val results = + managed_results( + (x: A) => f(x) match { case None => () case Some(y) => throw new Found(y) }, xs) + results.collectFirst({ case Exn.Exn(found: Found) => found.res }) match { + case None => Exn.release_first(results); None + case some => some + } + } + + def find_some[A](P: A => Boolean, xs: List[A]): Option[A] = + get_some((x: A) => if (P(x)) Some(x) else None, xs) + + def exists[A](P: A => Boolean, xs: List[A]): Boolean = + get_some((x: A) => if (P(x)) Some(()) else None, xs).isDefined + + def forall[A](P: A => Boolean, xs: List[A]): Boolean = !exists((x: A) => !P(x), xs) +} + diff -r 580de802aafd -r c2b23cb8a677 src/Pure/Concurrent/simple_thread.scala --- a/src/Pure/Concurrent/simple_thread.scala Thu Dec 11 15:24:28 2014 +0100 +++ b/src/Pure/Concurrent/simple_thread.scala Thu Dec 11 23:31:30 2014 +0100 @@ -9,9 +9,8 @@ import java.lang.Thread -import java.util.concurrent.{Callable, Future => JFuture} - -import scala.collection.parallel.ForkJoinTasks +import java.util.concurrent.{Callable, Future => JFuture, ThreadPoolExecutor, + TimeUnit, LinkedBlockingQueue} object Simple_Thread @@ -41,7 +40,12 @@ /* thread pool */ - lazy val default_pool = ForkJoinTasks.defaultForkJoinPool + lazy val default_pool = + { + val m = Properties.Value.Int.unapply(System.getProperty("isabelle.threads", "0")) getOrElse 0 + val n = if (m > 0) m else (Runtime.getRuntime.availableProcessors max 1) min 8 + new ThreadPoolExecutor(n, n, 2500L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable]) + } def submit_task[A](body: => A): JFuture[A] = default_pool.submit(new Callable[A] { def call = body }) diff -r 580de802aafd -r c2b23cb8a677 src/Pure/General/exn.scala --- a/src/Pure/General/exn.scala Thu Dec 11 15:24:28 2014 +0100 +++ b/src/Pure/General/exn.scala Thu Dec 11 23:31:30 2014 +0100 @@ -26,6 +26,15 @@ case Exn(exn) => throw exn } + def release_first[A](results: List[Result[A]]): List[A] = + if (results.forall({ case Res(_) => true case _ => false })) + results.map(release(_)) + else + results.find({ case Exn(exn) => !is_interrupt(exn) case _ => false }) match { + case Some(Exn(exn)) => throw exn + case _ => results match { case Exn(exn) :: _ => throw exn case _ => ??? } + } + /* interrupts */ diff -r 580de802aafd -r c2b23cb8a677 src/Pure/ML-Systems/multithreading_polyml.ML --- a/src/Pure/ML-Systems/multithreading_polyml.ML Thu Dec 11 15:24:28 2014 +0100 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML Thu Dec 11 23:31:30 2014 +0100 @@ -80,8 +80,7 @@ val available = true; fun max_threads_result m = - if m > 0 then m - else Int.min (Int.max (Thread.numProcessors (), 1), 8); + if m > 0 then m else Int.min (Int.max (Thread.numProcessors (), 1), 8); val max_threads = ref 1; diff -r 580de802aafd -r c2b23cb8a677 src/Pure/Thy/thy_info.scala --- a/src/Pure/Thy/thy_info.scala Thu Dec 11 15:24:28 2014 +0100 +++ b/src/Pure/Thy/thy_info.scala Thu Dec 11 23:31:30 2014 +0100 @@ -85,13 +85,11 @@ def loaded_files: List[Path] = { val dep_files = - rev_deps.par.map(dep => - Exn.capture { - dep.loaded_files(syntax).map(a => Path.explode(dep.name.master_dir) + Path.explode(a)) - }).toList - ((Nil: List[Path]) /: dep_files) { - case (acc_files, files) => Exn.release(files) ::: acc_files - } + Par_List.map( + (dep: Dep) => + dep.loaded_files(syntax).map(a => Path.explode(dep.name.master_dir) + Path.explode(a)), + rev_deps) + ((Nil: List[Path]) /: dep_files) { case (acc_files, files) => files ::: acc_files } } } diff -r 580de802aafd -r c2b23cb8a677 src/Pure/Tools/build.scala --- a/src/Pure/Tools/build.scala Thu Dec 11 15:24:28 2014 +0100 +++ b/src/Pure/Tools/build.scala Thu Dec 11 23:31:30 2014 +0100 @@ -345,9 +345,7 @@ val graph = tree.graph val sessions = graph.keys - val timings = - sessions.par.map((name: String) => - Exn.capture { (name, load_timings(name)) }).toList.map(Exn.release(_)) + val timings = Par_List.map((name: String) => (name, load_timings(name)), sessions) val command_timings = Map(timings.map({ case (name, (ts, _)) => (name, ts) }): _*).withDefaultValue(Nil) val session_timing = diff -r 580de802aafd -r c2b23cb8a677 src/Pure/build-jars --- a/src/Pure/build-jars Thu Dec 11 15:24:28 2014 +0100 +++ b/src/Pure/build-jars Thu Dec 11 23:31:30 2014 +0100 @@ -14,6 +14,7 @@ Concurrent/event_timer.scala Concurrent/future.scala Concurrent/mailbox.scala + Concurrent/par_list.scala Concurrent/simple_thread.scala Concurrent/synchronized.scala GUI/color_value.scala