equal
deleted
inserted
replaced
23 finish: () => Unit = () => () |
23 finish: () => Unit = () => () |
24 ): Consumer_Thread[A] = { |
24 ): Consumer_Thread[A] = { |
25 def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = { |
25 def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = { |
26 assert(args.length == 1) |
26 assert(args.length == 1) |
27 Exn.capture { consume(args.head) } match { |
27 Exn.capture { consume(args.head) } match { |
28 case Exn.Res(continue) => (List(Exn.Res(())), continue) |
28 case Exn.Res(cont) => (List(Exn.Res(())), cont) |
29 case Exn.Exn(exn) => (List(Exn.Exn(exn)), true) |
29 case Exn.Exn(exn) => (List(Exn.Exn(exn)), true) |
30 } |
30 } |
31 } |
31 } |
32 |
32 |
33 fork_bulk(name = name, daemon = daemon)(_ => false, consume_single, finish = finish) |
33 fork_bulk(name = name, daemon = daemon)(_ => false, consume_single, finish = finish) |
86 val reqs = |
86 val reqs = |
87 proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg))) |
87 proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg))) |
88 .getOrElse(msgs.take(1)) |
88 .getOrElse(msgs.take(1)) |
89 .map(_.get) |
89 .map(_.get) |
90 |
90 |
91 val (results, continue) = consume(reqs.map(_.arg)) |
91 val (results, cont) = consume(reqs.map(_.arg)) |
92 |
92 |
93 for { |
93 for { |
94 (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None) |
94 (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None) |
95 } { |
95 } { |
96 (req.ack, res) match { |
96 (req.ack, res) match { |
98 case (None, Exn.Res(_)) => |
98 case (None, Exn.Res(_)) => |
99 case (None, Exn.Exn(exn)) => failure(exn) |
99 case (None, Exn.Exn(exn)) => failure(exn) |
100 } |
100 } |
101 } |
101 } |
102 |
102 |
103 if (continue) main_loop(msgs.drop(reqs.length)) |
103 if (cont) main_loop(msgs.drop(reqs.length)) |
104 else robust_finish() |
104 else robust_finish() |
105 } |
105 } |
106 |
106 |
107 |
107 |
108 /* main methods */ |
108 /* main methods */ |