src/Pure/Concurrent/consumer_thread.scala
changeset 78533 dd0501accda8
parent 75393 87ebf5a50283
child 78534 879e1ba3868b
equal deleted inserted replaced
78532:62c6164f0fc1 78533:dd0501accda8
    23       finish: () => Unit = () => ()
    23       finish: () => Unit = () => ()
    24     ): Consumer_Thread[A] = {
    24     ): Consumer_Thread[A] = {
    25     def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = {
    25     def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = {
    26       assert(args.length == 1)
    26       assert(args.length == 1)
    27       Exn.capture { consume(args.head) } match {
    27       Exn.capture { consume(args.head) } match {
    28         case Exn.Res(continue) => (List(Exn.Res(())), continue)
    28         case Exn.Res(cont) => (List(Exn.Res(())), cont)
    29         case Exn.Exn(exn) => (List(Exn.Exn(exn)), true)
    29         case Exn.Exn(exn) => (List(Exn.Exn(exn)), true)
    30       }
    30       }
    31     }
    31     }
    32 
    32 
    33     fork_bulk(name = name, daemon = daemon)(_ => false, consume_single, finish = finish)
    33     fork_bulk(name = name, daemon = daemon)(_ => false, consume_single, finish = finish)
    86         val reqs =
    86         val reqs =
    87           proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg)))
    87           proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg)))
    88             .getOrElse(msgs.take(1))
    88             .getOrElse(msgs.take(1))
    89             .map(_.get)
    89             .map(_.get)
    90 
    90 
    91         val (results, continue) = consume(reqs.map(_.arg))
    91         val (results, cont) = consume(reqs.map(_.arg))
    92 
    92 
    93         for {
    93         for {
    94           (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None)
    94           (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None)
    95         } {
    95         } {
    96           (req.ack, res) match {
    96           (req.ack, res) match {
    98             case (None, Exn.Res(_)) =>
    98             case (None, Exn.Res(_)) =>
    99             case (None, Exn.Exn(exn)) => failure(exn)
    99             case (None, Exn.Exn(exn)) => failure(exn)
   100           }
   100           }
   101         }
   101         }
   102 
   102 
   103         if (continue) main_loop(msgs.drop(reqs.length))
   103         if (cont) main_loop(msgs.drop(reqs.length))
   104         else robust_finish()
   104         else robust_finish()
   105     }
   105     }
   106 
   106 
   107 
   107 
   108   /* main methods */
   108   /* main methods */