src/Pure/Concurrent/consumer_thread.scala
changeset 75393 87ebf5a50283
parent 74254 53e1a14e2ef1
child 78533 dd0501accda8
--- a/src/Pure/Concurrent/consumer_thread.scala	Fri Apr 01 11:51:42 2022 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala	Fri Apr 01 17:06:10 2022 +0200
@@ -11,8 +11,7 @@
 import scala.annotation.tailrec
 
 
-object Consumer_Thread
-{
+object Consumer_Thread {
   def fork_bulk[A](name: String = "", daemon: Boolean = false)(
       bulk: A => Boolean,
       consume: List[A] => (List[Exn.Result[Unit]], Boolean),
@@ -21,10 +20,9 @@
 
   def fork[A](name: String = "", daemon: Boolean = false)(
       consume: A => Boolean,
-      finish: () => Unit = () => ()): Consumer_Thread[A] =
-  {
-    def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) =
-    {
+      finish: () => Unit = () => ()
+    ): Consumer_Thread[A] = {
+    def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = {
       assert(args.length == 1)
       Exn.capture { consume(args.head) } match {
         case Exn.Res(continue) => (List(Exn.Res(())), continue)
@@ -40,8 +38,8 @@
   name: String, daemon: Boolean,
   bulk: A => Boolean,
   consume: List[A] => (List[Exn.Result[Unit]], Boolean),
-  finish: () => Unit)
-{
+  finish: () => Unit
+) {
   /* thread */
 
   private var active = true
@@ -61,21 +59,18 @@
 
   /* requests */
 
-  private class Request(val arg: A, acknowledge: Boolean = false)
-  {
+  private class Request(val arg: A, acknowledge: Boolean = false) {
     val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] =
       if (acknowledge) Some(Synchronized(None)) else None
 
-    def await(): Unit =
-    {
+    def await(): Unit = {
       for (a <- ack) {
         Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) }))
       }
     }
   }
 
-  private def request(req: Request): Unit =
-  {
+  private def request(req: Request): Unit = {
     synchronized {
       if (is_active()) mailbox.send(Some(req))
       else error("Consumer thread not active: " + quote(thread.getName))
@@ -95,8 +90,9 @@
 
         val (results, continue) = consume(reqs.map(_.arg))
 
-        for { (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None) }
-        {
+        for {
+          (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None)
+        } {
           (req.ack, res) match {
             case (Some(a), _) => a.change(_ => Some(res))
             case (None, Exn.Res(_)) =>
@@ -116,8 +112,7 @@
   def send(arg: A): Unit = request(new Request(arg))
   def send_wait(arg: A): Unit = request(new Request(arg, acknowledge = true))
 
-  def shutdown(): Unit =
-  {
+  def shutdown(): Unit = {
     synchronized { if (is_active()) { active = false; mailbox.send(None) } }
     thread.join()
   }