parallel scheduling of jobs;
authorwenzelm
Sun, 22 Jul 2012 23:31:57 +0200
changeset 48425 0d95980e9aae
parent 48424 e6b0c14f04c8
child 48429 4b7f4482c552
parallel scheduling of jobs; misc tuning;
lib/Tools/build
src/Pure/System/build.scala
src/Pure/library.scala
--- a/lib/Tools/build	Sun Jul 22 21:59:14 2012 +0200
+++ b/lib/Tools/build	Sun Jul 22 23:31:57 2012 +0200
@@ -18,8 +18,10 @@
   echo "    -a           all sessions"
   echo "    -b           build target images"
   echo "    -d DIR       additional session directory with ROOT file"
+  echo "    -j INT       maximum number of jobs (default 1)"
   echo "    -l           list sessions only"
   echo "    -o OPTION    override session configuration OPTION (via NAME=VAL or NAME)"
+  echo "    -v           verbose"
   echo
   echo "  Build and manage Isabelle sessions, depending on implicit"
   echo "  ISABELLE_BUILD_OPTIONS=\"$ISABELLE_BUILD_OPTIONS\""
@@ -38,17 +40,24 @@
   exit 2
 }
 
+function check_number()
+{
+  [ -n "$1" -a -z "$(echo "$1" | tr -d '[0-9]')" ] || fail "Bad number: \"$1\""
+}
+
 
 ## process command line
 
 ALL_SESSIONS=false
 BUILD_IMAGES=false
+MAX_JOBS=1
 LIST_ONLY=false
+VERBOSE=false
 
 declare -a MORE_DIRS=()
 eval "declare -a BUILD_OPTIONS=($ISABELLE_BUILD_OPTIONS)"
 
-while getopts "abd:lo:" OPT
+while getopts "abd:j:lo:v" OPT
 do
   case "$OPT" in
     a)
@@ -60,12 +69,19 @@
     d)
       MORE_DIRS["${#MORE_DIRS[@]}"]="$OPTARG"
       ;;
+    j)
+      check_number "$OPTARG"
+      MAX_JOBS="$OPTARG"
+      ;;
     l)
       LIST_ONLY="true"
       ;;
     o)
       BUILD_OPTIONS["${#BUILD_OPTIONS[@]}"]="$OPTARG"
       ;;
+    v)
+      VERBOSE="true"
+      ;;
     \?)
       usage
       ;;
@@ -80,5 +96,5 @@
 [ -e "$ISABELLE_HOME/Admin/build" ] && { "$ISABELLE_HOME/Admin/build" jars || exit $?; }
 
 exec "$ISABELLE_TOOL" java isabelle.Build \
-  "$ALL_SESSIONS" "$BUILD_IMAGES" "$LIST_ONLY" \
+  "$ALL_SESSIONS" "$BUILD_IMAGES" "$MAX_JOBS" "$LIST_ONLY" "$VERBOSE" \
   "${MORE_DIRS[@]}" $'\n' "${BUILD_OPTIONS[@]}" $'\n' "$@"
--- a/src/Pure/System/build.scala	Sun Jul 22 21:59:14 2012 +0200
+++ b/src/Pure/System/build.scala	Sun Jul 22 23:31:57 2012 +0200
@@ -62,8 +62,10 @@
       keys: Map[String, Key] = Map.empty,
       graph: Graph[Key, Info] = Graph.empty(Key.Ordering))
     {
+      def is_empty: Boolean = graph.is_empty
+
+      def apply(name: String): Info = graph.get_node(keys(name))
       def defined(name: String): Boolean = keys.isDefinedAt(name)
-
       def is_inner(name: String): Boolean = !graph.is_maximal(keys(name))
 
       def + (key: Key, info: Info): Queue =
@@ -85,6 +87,8 @@
         new Queue(keys1, graph1)
       }
 
+      def - (name: String): Queue = new Queue(keys - name, graph.del_node(keys(name)))
+
       def required(names: List[String]): Queue =
       {
         val req = graph.all_preds(names.map(keys(_))).map(_.name).toSet
@@ -93,6 +97,14 @@
         new Queue(keys1, graph1)
       }
 
+      def dequeue(skip: String => Boolean): Option[(String, Info)] =
+      {
+        val it = graph.entries.dropWhile(
+          { case (key, (_, (deps, _))) => !deps.isEmpty || skip(key.name) })
+        if (it.hasNext) { val (key, (info, _)) = it.next; Some((key.name, info)) }
+        else None
+      }
+
       def topological_order: List[(String, Info)] =
         graph.topological_order.map(key => (key.name, graph.get_node(key)))
     }
@@ -325,7 +337,7 @@
 
     def terminate: Unit = thread.interrupt
     def is_finished: Boolean = result.is_finished
-    def join: (String, String, Int) = { val rc = result.join; args_file.delete; rc }
+    def join: (String, String, Int) = { val res = result.join; args_file.delete; res }
   }
 
   private def start_job(save: Boolean, name: String, info: Session.Info): Job =
@@ -366,12 +378,13 @@
   }
 
 
-  /* Scala entry point */
+  /* build */
 
   private def echo(msg: String) { java.lang.System.out.println(msg) }
-  private def echo_n(msg: String) { java.lang.System.out.print(msg) }
+  private def sleep(): Unit = Thread.sleep(500)
 
-  def build(all_sessions: Boolean, build_images: Boolean, list_only: Boolean,
+  def build(all_sessions: Boolean, build_images: Boolean, max_jobs: Int,
+    list_only: Boolean, verbose: Boolean,
     more_dirs: List[Path], more_options: List[String], sessions: List[String]): Int =
   {
     val options = (Options.init() /: more_options)(_.define_simple(_))
@@ -396,37 +409,60 @@
     val log_dir = Path.explode("$ISABELLE_OUTPUT/log")
     log_dir.file.mkdirs()
 
+    // scheduler loop
+    @tailrec def loop(
+      pending: Session.Queue,
+      running: Map[String, Job],
+      results: Map[String, Int]): Map[String, Int] =
+    {
+      if (pending.is_empty) results
+      else if (running.exists({ case (_, job) => job.is_finished })) {
+        val (name, job) = running.find({ case (_, job) => job.is_finished }).get
 
-    val rcs =
-      for ((name, info) <- queue.topological_order) yield
-      {
-        if (list_only) { echo(name + " in " + info.dir); 0 }
-        else {
-          val save = build_images || queue.is_inner(name)
-          echo((if (save) "Building " else "Running ") + name + " ...")
-
-          val (out, err, rc) = start_job(save, name, info).join
-          echo_n(err)
+        val (out, err, rc) = job.join
+        echo(Library.trim_line(err))
 
-          val log = log_dir + Path.basic(name)
-          if (rc == 0) {
-            val sources =
-              (info.digest :: deps.sources(name).map(_._2)).map(_.toString).sorted
-                .mkString("sources: ", " ", "\n")
-            File.write_zip(log.ext("gz"), sources + out)
-          }
-          else {
-            File.write(log, out)
-            echo(name + " FAILED")
-            echo("(see also " + log.file + ")")
-            val lines = split_lines(out)
-            val tail = lines.drop(lines.length - 20 max 0)
-            echo("\n" + cat_lines(tail))
-          }
-          rc
+        val log = log_dir + Path.basic(name)
+        if (rc == 0) {
+          val sources =
+            (queue(name).digest :: deps.sources(name).map(_._2)).map(_.toString).sorted
+              .mkString("sources: ", " ", "\n")
+          File.write_zip(log.ext("gz"), sources + out)
+        }
+        else {
+          File.write(log, out)
+          echo(name + " FAILED")
+          echo("(see also " + log.file + ")")
+          val lines = split_lines(out)
+          val tail = lines.drop(lines.length - 20 max 0)
+          echo("\n" + cat_lines(tail))
+        }
+        loop(pending - name, running - name, results + (name -> rc))
+      }
+      else if (running.size < (max_jobs max 1)) {
+        pending.dequeue(running.isDefinedAt(_)) match {
+          case Some((name, info)) =>
+            if (list_only) {
+              echo(name + " in " + info.dir)
+              loop(pending - name, running, results + (name -> 0))
+            }
+            else if (info.parent.map(results(_)).forall(_ == 0)) {
+              val save = build_images || queue.is_inner(name)
+              echo((if (save) "Building " else "Running ") + name + " ...")
+              val job = start_job(save, name, info)
+              loop(pending, running + (name -> job), results)
+            }
+            else {
+              echo(name + " CANCELLED")
+              loop(pending - name, running, results + (name -> 1))
+            }
+          case None => sleep(); loop(pending, running, results)
         }
       }
-    (0 /: rcs)(_ max _)
+      else { sleep(); loop(pending, running, results) }
+    }
+
+    (0 /: loop(queue, Map.empty, Map.empty))({ case (rc1, (_, rc2)) => rc1 max rc2 })
   }
 
 
@@ -439,9 +475,11 @@
         case
           Properties.Value.Boolean(all_sessions) ::
           Properties.Value.Boolean(build_images) ::
+          Properties.Value.Int(max_jobs) ::
           Properties.Value.Boolean(list_only) ::
+          Properties.Value.Boolean(verbose) ::
           Command_Line.Chunks(more_dirs, options, sessions) =>
-            build(all_sessions, build_images, list_only,
+            build(all_sessions, build_images, max_jobs, list_only, verbose,
               more_dirs.map(Path.explode), options, sessions)
         case _ => error("Bad arguments:\n" + cat_lines(args))
       }
--- a/src/Pure/library.scala	Sun Jul 22 21:59:14 2012 +0200
+++ b/src/Pure/library.scala	Sun Jul 22 23:31:57 2012 +0200
@@ -62,6 +62,10 @@
 
   def split_lines(str: String): List[String] = space_explode('\n', str)
 
+  def trim_line(str: String): String =
+    if (str.endsWith("\n")) str.substring(0, str.length - 1)
+    else str
+
 
   /* iterate over chunks (cf. space_explode) */