src/Pure/Concurrent/simple_thread.scala
changeset 61556 0d4ee4168e41
parent 61555 e27cfd2bf094
child 61557 f6387515f951
equal deleted inserted replaced
61555:e27cfd2bf094 61556:0d4ee4168e41
     1 /*  Title:      Pure/Concurrent/simple_thread.scala
       
     2     Module:     PIDE
       
     3     Author:     Makarius
       
     4 
       
     5 Simplified thread operations.
       
     6 */
       
     7 
       
     8 package isabelle
       
     9 
       
    10 
       
    11 import java.lang.Thread
       
    12 import java.util.concurrent.{Callable, Future => JFuture, ThreadPoolExecutor,
       
    13   TimeUnit, LinkedBlockingQueue}
       
    14 
       
    15 
       
    16 object Simple_Thread
       
    17 {
       
    18   /* plain thread */
       
    19 
       
    20   def fork(name: String = "", daemon: Boolean = false)(body: => Unit): Thread =
       
    21   {
       
    22     val thread =
       
    23       if (name == null || name == "") new Thread() { override def run = body }
       
    24       else new Thread(name) { override def run = body }
       
    25     thread.setDaemon(daemon)
       
    26     thread.start
       
    27     thread
       
    28   }
       
    29 
       
    30 
       
    31   /* future result via thread */
       
    32 
       
    33   def future[A](name: String = "", daemon: Boolean = false)(body: => A): (Thread, Future[A]) =
       
    34   {
       
    35     val result = Future.promise[A]
       
    36     val thread = fork(name, daemon) { result.fulfill_result(Exn.capture(body)) }
       
    37     (thread, result)
       
    38   }
       
    39 
       
    40 
       
    41   /* thread pool */
       
    42 
       
    43   lazy val default_pool =
       
    44     {
       
    45       val m = Properties.Value.Int.unapply(System.getProperty("isabelle.threads", "0")) getOrElse 0
       
    46       val n = if (m > 0) m else (Runtime.getRuntime.availableProcessors max 1) min 8
       
    47       new ThreadPoolExecutor(n, n, 2500L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable])
       
    48     }
       
    49 
       
    50   def submit_task[A](body: => A): JFuture[A] =
       
    51     default_pool.submit(new Callable[A] { def call = body })
       
    52 
       
    53 
       
    54   /* delayed events */
       
    55 
       
    56   final class Delay private [Simple_Thread](
       
    57     first: Boolean, delay: => Time, cancel: () => Unit, event: => Unit)
       
    58   {
       
    59     private var running: Option[Event_Timer.Request] = None
       
    60 
       
    61     private def run: Unit =
       
    62     {
       
    63       val do_run = synchronized {
       
    64         if (running.isDefined) { running = None; true } else false
       
    65       }
       
    66       if (do_run) event
       
    67     }
       
    68 
       
    69     def invoke(): Unit = synchronized
       
    70     {
       
    71       val new_run =
       
    72         running match {
       
    73           case Some(request) => if (first) false else { request.cancel; cancel(); true }
       
    74           case None => cancel(); true
       
    75         }
       
    76       if (new_run)
       
    77         running = Some(Event_Timer.request(Time.now() + delay)(run))
       
    78     }
       
    79 
       
    80     def revoke(): Unit = synchronized
       
    81     {
       
    82       running match {
       
    83         case Some(request) => request.cancel; cancel(); running = None
       
    84         case None => cancel()
       
    85       }
       
    86     }
       
    87 
       
    88     def postpone(alt_delay: Time): Unit = synchronized
       
    89     {
       
    90       running match {
       
    91         case Some(request) =>
       
    92           val alt_time = Time.now() + alt_delay
       
    93           if (request.time < alt_time && request.cancel) {
       
    94             cancel()
       
    95             running = Some(Event_Timer.request(alt_time)(run))
       
    96           }
       
    97           else cancel()
       
    98         case None => cancel()
       
    99       }
       
   100     }
       
   101   }
       
   102 
       
   103   // delayed event after first invocation
       
   104   def delay_first(delay: => Time, cancel: () => Unit = () => ())(event: => Unit): Delay =
       
   105     new Delay(true, delay, cancel, event)
       
   106 
       
   107   // delayed event after last invocation
       
   108   def delay_last(delay: => Time, cancel: () => Unit = () => ())(event: => Unit): Delay =
       
   109     new Delay(false, delay, cancel, event)
       
   110 }