9 |
9 |
10 |
10 |
11 import scala.annotation.tailrec |
11 import scala.annotation.tailrec |
12 |
12 |
13 |
13 |
14 object Consumer_Thread |
14 object Consumer_Thread { |
15 { |
|
16 def fork_bulk[A](name: String = "", daemon: Boolean = false)( |
15 def fork_bulk[A](name: String = "", daemon: Boolean = false)( |
17 bulk: A => Boolean, |
16 bulk: A => Boolean, |
18 consume: List[A] => (List[Exn.Result[Unit]], Boolean), |
17 consume: List[A] => (List[Exn.Result[Unit]], Boolean), |
19 finish: () => Unit = () => ()): Consumer_Thread[A] = |
18 finish: () => Unit = () => ()): Consumer_Thread[A] = |
20 new Consumer_Thread[A](name, daemon, bulk, consume, finish) |
19 new Consumer_Thread[A](name, daemon, bulk, consume, finish) |
21 |
20 |
22 def fork[A](name: String = "", daemon: Boolean = false)( |
21 def fork[A](name: String = "", daemon: Boolean = false)( |
23 consume: A => Boolean, |
22 consume: A => Boolean, |
24 finish: () => Unit = () => ()): Consumer_Thread[A] = |
23 finish: () => Unit = () => () |
25 { |
24 ): Consumer_Thread[A] = { |
26 def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = |
25 def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = { |
27 { |
|
28 assert(args.length == 1) |
26 assert(args.length == 1) |
29 Exn.capture { consume(args.head) } match { |
27 Exn.capture { consume(args.head) } match { |
30 case Exn.Res(continue) => (List(Exn.Res(())), continue) |
28 case Exn.Res(continue) => (List(Exn.Res(())), continue) |
31 case Exn.Exn(exn) => (List(Exn.Exn(exn)), true) |
29 case Exn.Exn(exn) => (List(Exn.Exn(exn)), true) |
32 } |
30 } |
38 |
36 |
39 final class Consumer_Thread[A] private( |
37 final class Consumer_Thread[A] private( |
40 name: String, daemon: Boolean, |
38 name: String, daemon: Boolean, |
41 bulk: A => Boolean, |
39 bulk: A => Boolean, |
42 consume: List[A] => (List[Exn.Result[Unit]], Boolean), |
40 consume: List[A] => (List[Exn.Result[Unit]], Boolean), |
43 finish: () => Unit) |
41 finish: () => Unit |
44 { |
42 ) { |
45 /* thread */ |
43 /* thread */ |
46 |
44 |
47 private var active = true |
45 private var active = true |
48 private val mailbox = Mailbox[Option[Request]] |
46 private val mailbox = Mailbox[Option[Request]] |
49 |
47 |
59 try { finish() } catch { case exn: Throwable => failure(exn) } |
57 try { finish() } catch { case exn: Throwable => failure(exn) } |
60 |
58 |
61 |
59 |
62 /* requests */ |
60 /* requests */ |
63 |
61 |
64 private class Request(val arg: A, acknowledge: Boolean = false) |
62 private class Request(val arg: A, acknowledge: Boolean = false) { |
65 { |
|
66 val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] = |
63 val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] = |
67 if (acknowledge) Some(Synchronized(None)) else None |
64 if (acknowledge) Some(Synchronized(None)) else None |
68 |
65 |
69 def await(): Unit = |
66 def await(): Unit = { |
70 { |
|
71 for (a <- ack) { |
67 for (a <- ack) { |
72 Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })) |
68 Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })) |
73 } |
69 } |
74 } |
70 } |
75 } |
71 } |
76 |
72 |
77 private def request(req: Request): Unit = |
73 private def request(req: Request): Unit = { |
78 { |
|
79 synchronized { |
74 synchronized { |
80 if (is_active()) mailbox.send(Some(req)) |
75 if (is_active()) mailbox.send(Some(req)) |
81 else error("Consumer thread not active: " + quote(thread.getName)) |
76 else error("Consumer thread not active: " + quote(thread.getName)) |
82 } |
77 } |
83 req.await() |
78 req.await() |
93 .getOrElse(msgs.take(1)) |
88 .getOrElse(msgs.take(1)) |
94 .map(_.get) |
89 .map(_.get) |
95 |
90 |
96 val (results, continue) = consume(reqs.map(_.arg)) |
91 val (results, continue) = consume(reqs.map(_.arg)) |
97 |
92 |
98 for { (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None) } |
93 for { |
99 { |
94 (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None) |
|
95 } { |
100 (req.ack, res) match { |
96 (req.ack, res) match { |
101 case (Some(a), _) => a.change(_ => Some(res)) |
97 case (Some(a), _) => a.change(_ => Some(res)) |
102 case (None, Exn.Res(_)) => |
98 case (None, Exn.Res(_)) => |
103 case (None, Exn.Exn(exn)) => failure(exn) |
99 case (None, Exn.Exn(exn)) => failure(exn) |
104 } |
100 } |
114 assert(is_active()) |
110 assert(is_active()) |
115 |
111 |
116 def send(arg: A): Unit = request(new Request(arg)) |
112 def send(arg: A): Unit = request(new Request(arg)) |
117 def send_wait(arg: A): Unit = request(new Request(arg, acknowledge = true)) |
113 def send_wait(arg: A): Unit = request(new Request(arg, acknowledge = true)) |
118 |
114 |
119 def shutdown(): Unit = |
115 def shutdown(): Unit = { |
120 { |
|
121 synchronized { if (is_active()) { active = false; mailbox.send(None) } } |
116 synchronized { if (is_active()) { active = false; mailbox.send(None) } } |
122 thread.join() |
117 thread.join() |
123 } |
118 } |
124 } |
119 } |