added Par_List in Scala, in accordance to ML version;
authorwenzelm
Thu, 11 Dec 2014 23:31:30 +0100
changeset 59136 c2b23cb8a677
parent 59135 580de802aafd
child 59137 fd748d770770
added Par_List in Scala, in accordance to ML version; system property "isabelle.threads" determines size of Scala thread pool, like system option "threads" for ML; avoid ".par" framework with its hard-wired thread pool, which also has problems with cancellation; tuned;
etc/settings
src/Pure/Concurrent/par_list.ML
src/Pure/Concurrent/par_list.scala
src/Pure/Concurrent/simple_thread.scala
src/Pure/General/exn.scala
src/Pure/ML-Systems/multithreading_polyml.ML
src/Pure/Thy/thy_info.scala
src/Pure/Tools/build.scala
src/Pure/build-jars
--- a/etc/settings	Thu Dec 11 15:24:28 2014 +0100
+++ b/etc/settings	Thu Dec 11 23:31:30 2014 +0100
@@ -14,7 +14,7 @@
 
 ISABELLE_SCALA_BUILD_OPTIONS="-encoding UTF-8 -nowarn -target:jvm-1.7 -Xmax-classfile-name 130"
 
-ISABELLE_JAVA_SYSTEM_OPTIONS="-Dfile.encoding=UTF-8 -server"
+ISABELLE_JAVA_SYSTEM_OPTIONS="-server -Dfile.encoding=UTF-8 -Disabelle.threads=0"
 
 classpath "$ISABELLE_HOME/lib/classes/Pure.jar"
 
--- a/src/Pure/Concurrent/par_list.ML	Thu Dec 11 15:24:28 2014 +0100
+++ b/src/Pure/Concurrent/par_list.ML	Thu Dec 11 23:31:30 2014 +0100
@@ -56,21 +56,20 @@
 
 fun get_some f xs =
   let
-    exception FOUND of 'b option;
-    fun found (Exn.Exn (FOUND some)) = some
-      | found _ = NONE;
+    exception FOUND of 'b;
     val results =
       managed_results "Par_List.get_some"
-        (fn x => (case f x of NONE => () | some => raise FOUND some)) xs;
+        (fn x => (case f x of NONE => () | SOME y => raise FOUND y)) xs;
   in
-    (case get_first found results of
-      SOME y => SOME y
-    | NONE => (Par_Exn.release_first results; NONE))
+    (case get_first (fn Exn.Exn (FOUND res) => SOME res | _ => NONE) results of
+      NONE => (Par_Exn.release_first results; NONE)
+    | some => some)
   end;
 
 fun find_some P = get_some (fn x => if P x then SOME x else NONE);
 
 fun exists P = is_some o get_some (fn x => if P x then SOME () else NONE);
+
 fun forall P = not o exists (not o P);
 
 end;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/Concurrent/par_list.scala	Thu Dec 11 23:31:30 2014 +0100
@@ -0,0 +1,70 @@
+/*  Title:      Pure/Concurrent/par_list.scala
+    Author:     Makarius
+
+Parallel list combinators.
+*/
+
+
+package isabelle
+
+
+import java.util.concurrent.{Future => JFuture, CancellationException}
+
+
+object Par_List
+{
+  def managed_results[A, B](f: A => B, xs: List[A]): List[Exn.Result[B]] =
+    if (xs.isEmpty || xs.tail.isEmpty) xs.map(x => Exn.capture { f(x) })
+    else {
+      val state = Synchronized((List.empty[JFuture[Exn.Result[B]]], false))
+
+      def cancel_other(self: Int = -1): Unit =
+        state.change { case (tasks, canceled) =>
+          if (!canceled) {
+            for ((task, i) <- tasks.iterator.zipWithIndex if i != self)
+              task.cancel(true)
+          }
+          (tasks, true)
+        }
+
+      try {
+        state.change(_ =>
+          (xs.iterator.zipWithIndex.map({ case (x, self) =>
+            Simple_Thread.submit_task {
+              val result = Exn.capture { f(x) }
+              result match { case Exn.Exn(_) => cancel_other(self) case _ => }
+              result
+            }
+          }).toList, false))
+
+        state.value._1.map(future =>
+          try { future.get }
+          catch { case _: CancellationException => Exn.Exn(Exn.Interrupt()): Exn.Result[B] })
+      }
+      finally { cancel_other() }
+    }
+
+  def map[A, B](f: A => B, xs: List[A]): List[B] =
+    Exn.release_first(managed_results(f, xs))
+
+  def get_some[A, B](f: A => Option[B], xs: List[A]): Option[B] =
+  {
+    class Found(val res: B) extends Exception
+    val results =
+      managed_results(
+        (x: A) => f(x) match { case None => () case Some(y) => throw new Found(y) }, xs)
+    results.collectFirst({ case Exn.Exn(found: Found) => found.res }) match {
+      case None => Exn.release_first(results); None
+      case some => some
+    }
+  }
+
+  def find_some[A](P: A => Boolean, xs: List[A]): Option[A] =
+    get_some((x: A) => if (P(x)) Some(x) else None, xs)
+
+  def exists[A](P: A => Boolean, xs: List[A]): Boolean =
+    get_some((x: A) => if (P(x)) Some(()) else None, xs).isDefined
+
+  def forall[A](P: A => Boolean, xs: List[A]): Boolean = !exists((x: A) => !P(x), xs)
+}
+
--- a/src/Pure/Concurrent/simple_thread.scala	Thu Dec 11 15:24:28 2014 +0100
+++ b/src/Pure/Concurrent/simple_thread.scala	Thu Dec 11 23:31:30 2014 +0100
@@ -9,9 +9,8 @@
 
 
 import java.lang.Thread
-import java.util.concurrent.{Callable, Future => JFuture}
-
-import scala.collection.parallel.ForkJoinTasks
+import java.util.concurrent.{Callable, Future => JFuture, ThreadPoolExecutor,
+  TimeUnit, LinkedBlockingQueue}
 
 
 object Simple_Thread
@@ -41,7 +40,12 @@
 
   /* thread pool */
 
-  lazy val default_pool = ForkJoinTasks.defaultForkJoinPool
+  lazy val default_pool =
+    {
+      val m = Properties.Value.Int.unapply(System.getProperty("isabelle.threads", "0")) getOrElse 0
+      val n = if (m > 0) m else (Runtime.getRuntime.availableProcessors max 1) min 8
+      new ThreadPoolExecutor(n, n, 2500L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable])
+    }
 
   def submit_task[A](body: => A): JFuture[A] =
     default_pool.submit(new Callable[A] { def call = body })
--- a/src/Pure/General/exn.scala	Thu Dec 11 15:24:28 2014 +0100
+++ b/src/Pure/General/exn.scala	Thu Dec 11 23:31:30 2014 +0100
@@ -26,6 +26,15 @@
       case Exn(exn) => throw exn
     }
 
+  def release_first[A](results: List[Result[A]]): List[A] =
+    if (results.forall({ case Res(_) => true case _ => false }))
+      results.map(release(_))
+    else
+      results.find({ case Exn(exn) => !is_interrupt(exn) case _ => false }) match {
+        case Some(Exn(exn)) => throw exn
+        case _ => results match { case Exn(exn) :: _ => throw exn case _ => ??? }
+      }
+
 
   /* interrupts */
 
--- a/src/Pure/ML-Systems/multithreading_polyml.ML	Thu Dec 11 15:24:28 2014 +0100
+++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Thu Dec 11 23:31:30 2014 +0100
@@ -80,8 +80,7 @@
 val available = true;
 
 fun max_threads_result m =
-  if m > 0 then m
-  else Int.min (Int.max (Thread.numProcessors (), 1), 8);
+  if m > 0 then m else Int.min (Int.max (Thread.numProcessors (), 1), 8);
 
 val max_threads = ref 1;
 
--- a/src/Pure/Thy/thy_info.scala	Thu Dec 11 15:24:28 2014 +0100
+++ b/src/Pure/Thy/thy_info.scala	Thu Dec 11 23:31:30 2014 +0100
@@ -85,13 +85,11 @@
     def loaded_files: List[Path] =
     {
       val dep_files =
-        rev_deps.par.map(dep =>
-          Exn.capture {
-            dep.loaded_files(syntax).map(a => Path.explode(dep.name.master_dir) + Path.explode(a))
-          }).toList
-      ((Nil: List[Path]) /: dep_files) {
-        case (acc_files, files) => Exn.release(files) ::: acc_files
-      }
+        Par_List.map(
+          (dep: Dep) =>
+            dep.loaded_files(syntax).map(a => Path.explode(dep.name.master_dir) + Path.explode(a)),
+          rev_deps)
+      ((Nil: List[Path]) /: dep_files) { case (acc_files, files) => files ::: acc_files }
     }
   }
 
--- a/src/Pure/Tools/build.scala	Thu Dec 11 15:24:28 2014 +0100
+++ b/src/Pure/Tools/build.scala	Thu Dec 11 23:31:30 2014 +0100
@@ -345,9 +345,7 @@
       val graph = tree.graph
       val sessions = graph.keys
 
-      val timings =
-        sessions.par.map((name: String) =>
-          Exn.capture { (name, load_timings(name)) }).toList.map(Exn.release(_))
+      val timings = Par_List.map((name: String) => (name, load_timings(name)), sessions)
       val command_timings =
         Map(timings.map({ case (name, (ts, _)) => (name, ts) }): _*).withDefaultValue(Nil)
       val session_timing =
--- a/src/Pure/build-jars	Thu Dec 11 15:24:28 2014 +0100
+++ b/src/Pure/build-jars	Thu Dec 11 23:31:30 2014 +0100
@@ -14,6 +14,7 @@
   Concurrent/event_timer.scala
   Concurrent/future.scala
   Concurrent/mailbox.scala
+  Concurrent/par_list.scala
   Concurrent/simple_thread.scala
   Concurrent/synchronized.scala
   GUI/color_value.scala