--- a/src/Pure/Concurrent/consumer_thread.scala Fri Apr 01 11:51:42 2022 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala Fri Apr 01 17:06:10 2022 +0200
@@ -11,8 +11,7 @@
import scala.annotation.tailrec
-object Consumer_Thread
-{
+object Consumer_Thread {
def fork_bulk[A](name: String = "", daemon: Boolean = false)(
bulk: A => Boolean,
consume: List[A] => (List[Exn.Result[Unit]], Boolean),
@@ -21,10 +20,9 @@
def fork[A](name: String = "", daemon: Boolean = false)(
consume: A => Boolean,
- finish: () => Unit = () => ()): Consumer_Thread[A] =
- {
- def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) =
- {
+ finish: () => Unit = () => ()
+ ): Consumer_Thread[A] = {
+ def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = {
assert(args.length == 1)
Exn.capture { consume(args.head) } match {
case Exn.Res(continue) => (List(Exn.Res(())), continue)
@@ -40,8 +38,8 @@
name: String, daemon: Boolean,
bulk: A => Boolean,
consume: List[A] => (List[Exn.Result[Unit]], Boolean),
- finish: () => Unit)
-{
+ finish: () => Unit
+) {
/* thread */
private var active = true
@@ -61,21 +59,18 @@
/* requests */
- private class Request(val arg: A, acknowledge: Boolean = false)
- {
+ private class Request(val arg: A, acknowledge: Boolean = false) {
val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] =
if (acknowledge) Some(Synchronized(None)) else None
- def await(): Unit =
- {
+ def await(): Unit = {
for (a <- ack) {
Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) }))
}
}
}
- private def request(req: Request): Unit =
- {
+ private def request(req: Request): Unit = {
synchronized {
if (is_active()) mailbox.send(Some(req))
else error("Consumer thread not active: " + quote(thread.getName))
@@ -95,8 +90,9 @@
val (results, continue) = consume(reqs.map(_.arg))
- for { (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None) }
- {
+ for {
+ (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None)
+ } {
(req.ack, res) match {
case (Some(a), _) => a.change(_ => Some(res))
case (None, Exn.Res(_)) =>
@@ -116,8 +112,7 @@
def send(arg: A): Unit = request(new Request(arg))
def send_wait(arg: A): Unit = request(new Request(arg, acknowledge = true))
- def shutdown(): Unit =
- {
+ def shutdown(): Unit = {
synchronized { if (is_active()) { active = false; mailbox.send(None) } }
thread.join()
}