src/Pure/Concurrent/consumer_thread.scala
changeset 75393 87ebf5a50283
parent 74254 53e1a14e2ef1
child 78533 dd0501accda8
equal deleted inserted replaced
75388:b3ca4a6ed74b 75393:87ebf5a50283
     9 
     9 
    10 
    10 
    11 import scala.annotation.tailrec
    11 import scala.annotation.tailrec
    12 
    12 
    13 
    13 
    14 object Consumer_Thread
    14 object Consumer_Thread {
    15 {
       
    16   def fork_bulk[A](name: String = "", daemon: Boolean = false)(
    15   def fork_bulk[A](name: String = "", daemon: Boolean = false)(
    17       bulk: A => Boolean,
    16       bulk: A => Boolean,
    18       consume: List[A] => (List[Exn.Result[Unit]], Boolean),
    17       consume: List[A] => (List[Exn.Result[Unit]], Boolean),
    19       finish: () => Unit = () => ()): Consumer_Thread[A] =
    18       finish: () => Unit = () => ()): Consumer_Thread[A] =
    20     new Consumer_Thread[A](name, daemon, bulk, consume, finish)
    19     new Consumer_Thread[A](name, daemon, bulk, consume, finish)
    21 
    20 
    22   def fork[A](name: String = "", daemon: Boolean = false)(
    21   def fork[A](name: String = "", daemon: Boolean = false)(
    23       consume: A => Boolean,
    22       consume: A => Boolean,
    24       finish: () => Unit = () => ()): Consumer_Thread[A] =
    23       finish: () => Unit = () => ()
    25   {
    24     ): Consumer_Thread[A] = {
    26     def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) =
    25     def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = {
    27     {
       
    28       assert(args.length == 1)
    26       assert(args.length == 1)
    29       Exn.capture { consume(args.head) } match {
    27       Exn.capture { consume(args.head) } match {
    30         case Exn.Res(continue) => (List(Exn.Res(())), continue)
    28         case Exn.Res(continue) => (List(Exn.Res(())), continue)
    31         case Exn.Exn(exn) => (List(Exn.Exn(exn)), true)
    29         case Exn.Exn(exn) => (List(Exn.Exn(exn)), true)
    32       }
    30       }
    38 
    36 
    39 final class Consumer_Thread[A] private(
    37 final class Consumer_Thread[A] private(
    40   name: String, daemon: Boolean,
    38   name: String, daemon: Boolean,
    41   bulk: A => Boolean,
    39   bulk: A => Boolean,
    42   consume: List[A] => (List[Exn.Result[Unit]], Boolean),
    40   consume: List[A] => (List[Exn.Result[Unit]], Boolean),
    43   finish: () => Unit)
    41   finish: () => Unit
    44 {
    42 ) {
    45   /* thread */
    43   /* thread */
    46 
    44 
    47   private var active = true
    45   private var active = true
    48   private val mailbox = Mailbox[Option[Request]]
    46   private val mailbox = Mailbox[Option[Request]]
    49 
    47 
    59     try { finish() } catch { case exn: Throwable => failure(exn) }
    57     try { finish() } catch { case exn: Throwable => failure(exn) }
    60 
    58 
    61 
    59 
    62   /* requests */
    60   /* requests */
    63 
    61 
    64   private class Request(val arg: A, acknowledge: Boolean = false)
    62   private class Request(val arg: A, acknowledge: Boolean = false) {
    65   {
       
    66     val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] =
    63     val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] =
    67       if (acknowledge) Some(Synchronized(None)) else None
    64       if (acknowledge) Some(Synchronized(None)) else None
    68 
    65 
    69     def await(): Unit =
    66     def await(): Unit = {
    70     {
       
    71       for (a <- ack) {
    67       for (a <- ack) {
    72         Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) }))
    68         Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) }))
    73       }
    69       }
    74     }
    70     }
    75   }
    71   }
    76 
    72 
    77   private def request(req: Request): Unit =
    73   private def request(req: Request): Unit = {
    78   {
       
    79     synchronized {
    74     synchronized {
    80       if (is_active()) mailbox.send(Some(req))
    75       if (is_active()) mailbox.send(Some(req))
    81       else error("Consumer thread not active: " + quote(thread.getName))
    76       else error("Consumer thread not active: " + quote(thread.getName))
    82     }
    77     }
    83     req.await()
    78     req.await()
    93             .getOrElse(msgs.take(1))
    88             .getOrElse(msgs.take(1))
    94             .map(_.get)
    89             .map(_.get)
    95 
    90 
    96         val (results, continue) = consume(reqs.map(_.arg))
    91         val (results, continue) = consume(reqs.map(_.arg))
    97 
    92 
    98         for { (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None) }
    93         for {
    99         {
    94           (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None)
       
    95         } {
   100           (req.ack, res) match {
    96           (req.ack, res) match {
   101             case (Some(a), _) => a.change(_ => Some(res))
    97             case (Some(a), _) => a.change(_ => Some(res))
   102             case (None, Exn.Res(_)) =>
    98             case (None, Exn.Res(_)) =>
   103             case (None, Exn.Exn(exn)) => failure(exn)
    99             case (None, Exn.Exn(exn)) => failure(exn)
   104           }
   100           }
   114   assert(is_active())
   110   assert(is_active())
   115 
   111 
   116   def send(arg: A): Unit = request(new Request(arg))
   112   def send(arg: A): Unit = request(new Request(arg))
   117   def send_wait(arg: A): Unit = request(new Request(arg, acknowledge = true))
   113   def send_wait(arg: A): Unit = request(new Request(arg, acknowledge = true))
   118 
   114 
   119   def shutdown(): Unit =
   115   def shutdown(): Unit = {
   120   {
       
   121     synchronized { if (is_active()) { active = false; mailbox.send(None) } }
   116     synchronized { if (is_active()) { active = false; mailbox.send(None) } }
   122     thread.join()
   117     thread.join()
   123   }
   118   }
   124 }
   119 }