src/Pure/Tools/build_process.scala
changeset 77258 ca8eda3c1808
parent 77257 68a7ad1385bc
child 77259 61fc2afe4c8b
equal deleted inserted replaced
77257:68a7ad1385bc 77258:ca8eda3c1808
   132     def build_heap(session: String): Boolean =
   132     def build_heap(session: String): Boolean =
   133       Sessions.is_pure(session) || !sessions_structure.build_graph.is_maximal(session)
   133       Sessions.is_pure(session) || !sessions_structure.build_graph.is_maximal(session)
   134   }
   134   }
   135 
   135 
   136 
   136 
   137   /* queue with scheduling information */
       
   138 
       
   139   private object Queue {
       
   140     def apply(build_context: Build_Process.Context): Queue = {
       
   141       val build_graph = build_context.sessions_structure.build_graph
       
   142       val build_order = SortedSet.from(build_graph.keys)(build_context.ordering)
       
   143       new Queue(build_graph, build_order)
       
   144     }
       
   145   }
       
   146 
       
   147   private class Queue(
       
   148     build_graph: Graph[String, Sessions.Info],
       
   149     build_order: SortedSet[String]
       
   150   ) {
       
   151     def is_empty: Boolean = build_graph.is_empty
       
   152 
       
   153     def - (name: String): Queue =
       
   154       new Queue(build_graph.del_node(name), build_order - name)
       
   155 
       
   156     def dequeue(skip: String => Boolean): Option[String] =
       
   157       build_order.iterator.dropWhile(name => skip(name) || !build_graph.is_minimal(name))
       
   158         .nextOption()
       
   159   }
       
   160 
       
   161 
       
   162   /* main */
   137   /* main */
   163 
   138 
   164   private def session_finished(session_name: String, process_result: Process_Result): String =
   139   private def session_finished(session_name: String, process_result: Process_Result): String =
   165     "Finished " + session_name + " (" + process_result.timing.message_resources + ")"
   140     "Finished " + session_name + " (" + process_result.timing.message_resources + ")"
   166 
   141 
   204         case "" => No_Logger
   179         case "" => No_Logger
   205         case "-" => Logger.make(progress)
   180         case "-" => Logger.make(progress)
   206         case log_file => Logger.make(Some(Path.explode(log_file)))
   181         case log_file => Logger.make(Some(Path.explode(log_file)))
   207       }
   182       }
   208 
   183 
       
   184     // global state
   209     val numa_nodes = new NUMA.Nodes(numa_shuffling)
   185     val numa_nodes = new NUMA.Nodes(numa_shuffling)
   210 
   186     var build_graph = build_context.sessions_structure.build_graph
   211     @tailrec def loop(
   187     var build_order = SortedSet.from(build_graph.keys)(build_context.ordering)
   212       pending: Queue,
   188     var running = Map.empty[String, (SHA1.Shasum, Build_Job)]
   213       running: Map[String, (SHA1.Shasum, Build_Job)],
   189     var results = Map.empty[String, Result]
   214       results: Map[String, Result]
   190 
   215     ): Map[String, Result] = {
   191     def remove_pending(name: String): Unit = {
   216       def used_node(i: Int): Boolean =
   192       build_graph = build_graph.del_node(name)
   217         running.iterator.exists(
   193       build_order = build_order - name
   218           { case (_, (_, job)) => job.numa_node.isDefined && job.numa_node.get == i })
   194     }
   219 
   195 
   220       if (pending.is_empty) results
   196     def next_pending(): Option[String] =
   221       else {
   197       build_order.iterator
       
   198         .dropWhile(name => running.isDefinedAt(name) || !build_graph.is_minimal(name))
       
   199         .nextOption()
       
   200 
       
   201     def used_node(i: Int): Boolean =
       
   202       running.iterator.exists(
       
   203         { case (_, (_, job)) => job.numa_node.isDefined && job.numa_node.get == i })
       
   204 
       
   205     @tailrec def loop(): Unit = {
       
   206       if (!build_graph.is_empty) {
   222         if (progress.stopped) {
   207         if (progress.stopped) {
   223           for ((_, (_, job)) <- running) job.terminate()
   208           for ((_, (_, job)) <- running) job.terminate()
   224         }
   209         }
   225 
   210 
   226         running.find({ case (_, (_, job)) => job.is_finished }) match {
   211         running.find({ case (_, (_, job)) => job.is_finished }) match {
   277             else {
   262             else {
   278               progress.echo(session_name + " FAILED")
   263               progress.echo(session_name + " FAILED")
   279               if (!process_result.interrupted) progress.echo(process_result_tail.out)
   264               if (!process_result.interrupted) progress.echo(process_result_tail.out)
   280             }
   265             }
   281 
   266 
   282             loop(pending - session_name, running - session_name,
   267             remove_pending(session_name)
   283               results + (session_name -> Result(false, output_heap, process_result_tail)))
   268             running -= session_name
       
   269             results += (session_name -> Result(false, output_heap, process_result_tail))
       
   270             loop()
   284             //}}}
   271             //}}}
   285           case None if running.size < (max_jobs max 1) =>
   272           case None if running.size < (max_jobs max 1) =>
   286             //{{{ check/start next job
   273             //{{{ check/start next job
   287             pending.dequeue(running.isDefinedAt) match {
   274             next_pending() match {
   288               case Some(session_name) =>
   275               case Some(session_name) =>
   289                 val ancestor_results =
   276                 val ancestor_results =
   290                   build_deps.sessions_structure.build_requirements(List(session_name)).
   277                   build_deps.sessions_structure.build_requirements(List(session_name)).
   291                     filterNot(_ == session_name).map(results(_))
   278                     filterNot(_ == session_name).map(results(_))
   292                 val input_heaps =
   279                 val input_heaps =
   316                   }
   303                   }
   317                 }
   304                 }
   318                 val all_current = current && ancestor_results.forall(_.current)
   305                 val all_current = current && ancestor_results.forall(_.current)
   319 
   306 
   320                 if (all_current) {
   307                 if (all_current) {
   321                   loop(pending - session_name, running,
   308                   remove_pending(session_name)
   322                     results + (session_name -> Result(true, output_heap, Process_Result.ok)))
   309                   results += (session_name -> Result(true, output_heap, Process_Result.ok))
       
   310                   loop()
   323                 }
   311                 }
   324                 else if (no_build) {
   312                 else if (no_build) {
   325                   progress.echo_if(verbose, "Skipping " + session_name + " ...")
   313                   progress.echo_if(verbose, "Skipping " + session_name + " ...")
   326                   loop(pending - session_name, running,
   314                   remove_pending(session_name)
   327                     results + (session_name -> Result(false, output_heap, Process_Result.error)))
   315                   results += (session_name -> Result(false, output_heap, Process_Result.error))
       
   316                   loop()
   328                 }
   317                 }
   329                 else if (ancestor_results.forall(_.ok) && !progress.stopped) {
   318                 else if (ancestor_results.forall(_.ok) && !progress.stopped) {
   330                   progress.echo((if (do_store) "Building " else "Running ") + session_name + " ...")
   319                   progress.echo((if (do_store) "Building " else "Running ") + session_name + " ...")
   331 
   320 
   332                   store.clean_output(session_name)
   321                   store.clean_output(session_name)
   340 
   329 
   341                   val numa_node = numa_nodes.next(used_node)
   330                   val numa_node = numa_nodes.next(used_node)
   342                   val job =
   331                   val job =
   343                     new Build_Job(progress, session_background, store, do_store,
   332                     new Build_Job(progress, session_background, store, do_store,
   344                       resources, session_setup, numa_node)
   333                       resources, session_setup, numa_node)
   345                   loop(pending, running + (session_name -> (input_heaps, job)), results)
   334                   running += (session_name -> (input_heaps, job))
       
   335                   loop()
   346                 }
   336                 }
   347                 else {
   337                 else {
   348                   progress.echo(session_name + " CANCELLED")
   338                   progress.echo(session_name + " CANCELLED")
   349                   loop(pending - session_name, running,
   339                   remove_pending(session_name)
   350                     results + (session_name -> Result(false, output_heap, Process_Result.undefined)))
   340                   results += (session_name -> Result(false, output_heap, Process_Result.undefined))
   351                 }
   341                   loop()
   352               case None => sleep(); loop(pending, running, results)
   342                 }
       
   343               case None => sleep(); loop()
   353             }
   344             }
   354             ///}}}
   345             ///}}}
   355           case None => sleep(); loop(pending, running, results)
   346           case None => sleep(); loop()
   356         }
   347         }
   357       }
   348       }
   358     }
   349     }
   359 
   350 
   360     loop(Queue(build_context), Map.empty, Map.empty)
   351     loop()
       
   352     results
   361   }
   353   }
   362 }
   354 }