added Par_List in Scala, in accordance to ML version;
authorwenzelm
Thu Dec 11 23:31:30 2014 +0100 (2014-12-11)
changeset 59136c2b23cb8a677
parent 59135 580de802aafd
child 59137 fd748d770770
added Par_List in Scala, in accordance to ML version;
system property "isabelle.threads" determines size of Scala thread pool, like system option "threads" for ML;
avoid ".par" framework with its hard-wired thread pool, which also has problems with cancellation;
tuned;
etc/settings
src/Pure/Concurrent/par_list.ML
src/Pure/Concurrent/par_list.scala
src/Pure/Concurrent/simple_thread.scala
src/Pure/General/exn.scala
src/Pure/ML-Systems/multithreading_polyml.ML
src/Pure/Thy/thy_info.scala
src/Pure/Tools/build.scala
src/Pure/build-jars
     1.1 --- a/etc/settings	Thu Dec 11 15:24:28 2014 +0100
     1.2 +++ b/etc/settings	Thu Dec 11 23:31:30 2014 +0100
     1.3 @@ -14,7 +14,7 @@
     1.4  
     1.5  ISABELLE_SCALA_BUILD_OPTIONS="-encoding UTF-8 -nowarn -target:jvm-1.7 -Xmax-classfile-name 130"
     1.6  
     1.7 -ISABELLE_JAVA_SYSTEM_OPTIONS="-Dfile.encoding=UTF-8 -server"
     1.8 +ISABELLE_JAVA_SYSTEM_OPTIONS="-server -Dfile.encoding=UTF-8 -Disabelle.threads=0"
     1.9  
    1.10  classpath "$ISABELLE_HOME/lib/classes/Pure.jar"
    1.11  
     2.1 --- a/src/Pure/Concurrent/par_list.ML	Thu Dec 11 15:24:28 2014 +0100
     2.2 +++ b/src/Pure/Concurrent/par_list.ML	Thu Dec 11 23:31:30 2014 +0100
     2.3 @@ -56,21 +56,20 @@
     2.4  
     2.5  fun get_some f xs =
     2.6    let
     2.7 -    exception FOUND of 'b option;
     2.8 -    fun found (Exn.Exn (FOUND some)) = some
     2.9 -      | found _ = NONE;
    2.10 +    exception FOUND of 'b;
    2.11      val results =
    2.12        managed_results "Par_List.get_some"
    2.13 -        (fn x => (case f x of NONE => () | some => raise FOUND some)) xs;
    2.14 +        (fn x => (case f x of NONE => () | SOME y => raise FOUND y)) xs;
    2.15    in
    2.16 -    (case get_first found results of
    2.17 -      SOME y => SOME y
    2.18 -    | NONE => (Par_Exn.release_first results; NONE))
    2.19 +    (case get_first (fn Exn.Exn (FOUND res) => SOME res | _ => NONE) results of
    2.20 +      NONE => (Par_Exn.release_first results; NONE)
    2.21 +    | some => some)
    2.22    end;
    2.23  
    2.24  fun find_some P = get_some (fn x => if P x then SOME x else NONE);
    2.25  
    2.26  fun exists P = is_some o get_some (fn x => if P x then SOME () else NONE);
    2.27 +
    2.28  fun forall P = not o exists (not o P);
    2.29  
    2.30  end;
     3.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     3.2 +++ b/src/Pure/Concurrent/par_list.scala	Thu Dec 11 23:31:30 2014 +0100
     3.3 @@ -0,0 +1,70 @@
     3.4 +/*  Title:      Pure/Concurrent/par_list.scala
     3.5 +    Author:     Makarius
     3.6 +
     3.7 +Parallel list combinators.
     3.8 +*/
     3.9 +
    3.10 +
    3.11 +package isabelle
    3.12 +
    3.13 +
    3.14 +import java.util.concurrent.{Future => JFuture, CancellationException}
    3.15 +
    3.16 +
    3.17 +object Par_List
    3.18 +{
    3.19 +  def managed_results[A, B](f: A => B, xs: List[A]): List[Exn.Result[B]] =
    3.20 +    if (xs.isEmpty || xs.tail.isEmpty) xs.map(x => Exn.capture { f(x) })
    3.21 +    else {
    3.22 +      val state = Synchronized((List.empty[JFuture[Exn.Result[B]]], false))
    3.23 +
    3.24 +      def cancel_other(self: Int = -1): Unit =
    3.25 +        state.change { case (tasks, canceled) =>
    3.26 +          if (!canceled) {
    3.27 +            for ((task, i) <- tasks.iterator.zipWithIndex if i != self)
    3.28 +              task.cancel(true)
    3.29 +          }
    3.30 +          (tasks, true)
    3.31 +        }
    3.32 +
    3.33 +      try {
    3.34 +        state.change(_ =>
    3.35 +          (xs.iterator.zipWithIndex.map({ case (x, self) =>
    3.36 +            Simple_Thread.submit_task {
    3.37 +              val result = Exn.capture { f(x) }
    3.38 +              result match { case Exn.Exn(_) => cancel_other(self) case _ => }
    3.39 +              result
    3.40 +            }
    3.41 +          }).toList, false))
    3.42 +
    3.43 +        state.value._1.map(future =>
    3.44 +          try { future.get }
    3.45 +          catch { case _: CancellationException => Exn.Exn(Exn.Interrupt()): Exn.Result[B] })
    3.46 +      }
    3.47 +      finally { cancel_other() }
    3.48 +    }
    3.49 +
    3.50 +  def map[A, B](f: A => B, xs: List[A]): List[B] =
    3.51 +    Exn.release_first(managed_results(f, xs))
    3.52 +
    3.53 +  def get_some[A, B](f: A => Option[B], xs: List[A]): Option[B] =
    3.54 +  {
    3.55 +    class Found(val res: B) extends Exception
    3.56 +    val results =
    3.57 +      managed_results(
    3.58 +        (x: A) => f(x) match { case None => () case Some(y) => throw new Found(y) }, xs)
    3.59 +    results.collectFirst({ case Exn.Exn(found: Found) => found.res }) match {
    3.60 +      case None => Exn.release_first(results); None
    3.61 +      case some => some
    3.62 +    }
    3.63 +  }
    3.64 +
    3.65 +  def find_some[A](P: A => Boolean, xs: List[A]): Option[A] =
    3.66 +    get_some((x: A) => if (P(x)) Some(x) else None, xs)
    3.67 +
    3.68 +  def exists[A](P: A => Boolean, xs: List[A]): Boolean =
    3.69 +    get_some((x: A) => if (P(x)) Some(()) else None, xs).isDefined
    3.70 +
    3.71 +  def forall[A](P: A => Boolean, xs: List[A]): Boolean = !exists((x: A) => !P(x), xs)
    3.72 +}
    3.73 +
     4.1 --- a/src/Pure/Concurrent/simple_thread.scala	Thu Dec 11 15:24:28 2014 +0100
     4.2 +++ b/src/Pure/Concurrent/simple_thread.scala	Thu Dec 11 23:31:30 2014 +0100
     4.3 @@ -9,9 +9,8 @@
     4.4  
     4.5  
     4.6  import java.lang.Thread
     4.7 -import java.util.concurrent.{Callable, Future => JFuture}
     4.8 -
     4.9 -import scala.collection.parallel.ForkJoinTasks
    4.10 +import java.util.concurrent.{Callable, Future => JFuture, ThreadPoolExecutor,
    4.11 +  TimeUnit, LinkedBlockingQueue}
    4.12  
    4.13  
    4.14  object Simple_Thread
    4.15 @@ -41,7 +40,12 @@
    4.16  
    4.17    /* thread pool */
    4.18  
    4.19 -  lazy val default_pool = ForkJoinTasks.defaultForkJoinPool
    4.20 +  lazy val default_pool =
    4.21 +    {
    4.22 +      val m = Properties.Value.Int.unapply(System.getProperty("isabelle.threads", "0")) getOrElse 0
    4.23 +      val n = if (m > 0) m else (Runtime.getRuntime.availableProcessors max 1) min 8
    4.24 +      new ThreadPoolExecutor(n, n, 2500L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable])
    4.25 +    }
    4.26  
    4.27    def submit_task[A](body: => A): JFuture[A] =
    4.28      default_pool.submit(new Callable[A] { def call = body })
     5.1 --- a/src/Pure/General/exn.scala	Thu Dec 11 15:24:28 2014 +0100
     5.2 +++ b/src/Pure/General/exn.scala	Thu Dec 11 23:31:30 2014 +0100
     5.3 @@ -26,6 +26,15 @@
     5.4        case Exn(exn) => throw exn
     5.5      }
     5.6  
     5.7 +  def release_first[A](results: List[Result[A]]): List[A] =
     5.8 +    if (results.forall({ case Res(_) => true case _ => false }))
     5.9 +      results.map(release(_))
    5.10 +    else
    5.11 +      results.find({ case Exn(exn) => !is_interrupt(exn) case _ => false }) match {
    5.12 +        case Some(Exn(exn)) => throw exn
    5.13 +        case _ => results match { case Exn(exn) :: _ => throw exn case _ => ??? }
    5.14 +      }
    5.15 +
    5.16  
    5.17    /* interrupts */
    5.18  
     6.1 --- a/src/Pure/ML-Systems/multithreading_polyml.ML	Thu Dec 11 15:24:28 2014 +0100
     6.2 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Thu Dec 11 23:31:30 2014 +0100
     6.3 @@ -80,8 +80,7 @@
     6.4  val available = true;
     6.5  
     6.6  fun max_threads_result m =
     6.7 -  if m > 0 then m
     6.8 -  else Int.min (Int.max (Thread.numProcessors (), 1), 8);
     6.9 +  if m > 0 then m else Int.min (Int.max (Thread.numProcessors (), 1), 8);
    6.10  
    6.11  val max_threads = ref 1;
    6.12  
     7.1 --- a/src/Pure/Thy/thy_info.scala	Thu Dec 11 15:24:28 2014 +0100
     7.2 +++ b/src/Pure/Thy/thy_info.scala	Thu Dec 11 23:31:30 2014 +0100
     7.3 @@ -85,13 +85,11 @@
     7.4      def loaded_files: List[Path] =
     7.5      {
     7.6        val dep_files =
     7.7 -        rev_deps.par.map(dep =>
     7.8 -          Exn.capture {
     7.9 -            dep.loaded_files(syntax).map(a => Path.explode(dep.name.master_dir) + Path.explode(a))
    7.10 -          }).toList
    7.11 -      ((Nil: List[Path]) /: dep_files) {
    7.12 -        case (acc_files, files) => Exn.release(files) ::: acc_files
    7.13 -      }
    7.14 +        Par_List.map(
    7.15 +          (dep: Dep) =>
    7.16 +            dep.loaded_files(syntax).map(a => Path.explode(dep.name.master_dir) + Path.explode(a)),
    7.17 +          rev_deps)
    7.18 +      ((Nil: List[Path]) /: dep_files) { case (acc_files, files) => files ::: acc_files }
    7.19      }
    7.20    }
    7.21  
     8.1 --- a/src/Pure/Tools/build.scala	Thu Dec 11 15:24:28 2014 +0100
     8.2 +++ b/src/Pure/Tools/build.scala	Thu Dec 11 23:31:30 2014 +0100
     8.3 @@ -345,9 +345,7 @@
     8.4        val graph = tree.graph
     8.5        val sessions = graph.keys
     8.6  
     8.7 -      val timings =
     8.8 -        sessions.par.map((name: String) =>
     8.9 -          Exn.capture { (name, load_timings(name)) }).toList.map(Exn.release(_))
    8.10 +      val timings = Par_List.map((name: String) => (name, load_timings(name)), sessions)
    8.11        val command_timings =
    8.12          Map(timings.map({ case (name, (ts, _)) => (name, ts) }): _*).withDefaultValue(Nil)
    8.13        val session_timing =
     9.1 --- a/src/Pure/build-jars	Thu Dec 11 15:24:28 2014 +0100
     9.2 +++ b/src/Pure/build-jars	Thu Dec 11 23:31:30 2014 +0100
     9.3 @@ -14,6 +14,7 @@
     9.4    Concurrent/event_timer.scala
     9.5    Concurrent/future.scala
     9.6    Concurrent/mailbox.scala
     9.7 +  Concurrent/par_list.scala
     9.8    Concurrent/simple_thread.scala
     9.9    Concurrent/synchronized.scala
    9.10    GUI/color_value.scala