src/Pure/Concurrent/future.scala
changeset 73367 77ef8bef0593
parent 73340 0ffcad1f6130
child 75393 87ebf5a50283
equal deleted inserted replaced
73366:5f388e514ab8 73367:77ef8bef0593
    36   def is_finished: Boolean = peek.isDefined
    36   def is_finished: Boolean = peek.isDefined
    37   def get_finished: A = { require(is_finished, "future not finished"); Exn.release(peek.get) }
    37   def get_finished: A = { require(is_finished, "future not finished"); Exn.release(peek.get) }
    38   def join_result: Exn.Result[A]
    38   def join_result: Exn.Result[A]
    39   def join: A = Exn.release(join_result)
    39   def join: A = Exn.release(join_result)
    40   def map[B](f: A => B): Future[B] = Future.fork { f(join) }
    40   def map[B](f: A => B): Future[B] = Future.fork { f(join) }
    41   def cancel: Unit
    41   def cancel(): Unit
    42 
    42 
    43   override def toString: String =
    43   override def toString: String =
    44     peek match {
    44     peek match {
    45       case None => "<future>"
    45       case None => "<future>"
    46       case Some(Exn.Exn(_)) => "<failed>"
    46       case Some(Exn.Exn(_)) => "<failed>"
    59 
    59 
    60 private class Value_Future[A](x: A) extends Future[A]
    60 private class Value_Future[A](x: A) extends Future[A]
    61 {
    61 {
    62   val peek: Option[Exn.Result[A]] = Some(Exn.Res(x))
    62   val peek: Option[Exn.Result[A]] = Some(Exn.Res(x))
    63   def join_result: Exn.Result[A] = peek.get
    63   def join_result: Exn.Result[A] = peek.get
    64   def cancel: Unit = {}
    64   def cancel(): Unit = {}
    65 }
    65 }
    66 
    66 
    67 
    67 
    68 /* task future via thread pool */
    68 /* task future via thread pool */
    69 
    69 
    91         case st => (false, st)
    91         case st => (false, st)
    92       }
    92       }
    93     if (do_run) {
    93     if (do_run) {
    94       val result = Exn.capture(body)
    94       val result = Exn.capture(body)
    95       status.change(_ => Terminated)
    95       status.change(_ => Terminated)
    96       status.change(_ => Finished(if (Thread.interrupted) Exn.Exn(Exn.Interrupt()) else result))
    96       status.change(_ => Finished(if (Thread.interrupted()) Exn.Exn(Exn.Interrupt()) else result))
    97     }
    97     }
    98   }
    98   }
    99   private val task = Isabelle_Thread.pool.submit(new Callable[Unit] { def call = try_run() })
    99   private val task = Isabelle_Thread.pool.submit(new Callable[Unit] { def call = try_run() })
   100 
   100 
   101   def join_result: Exn.Result[A] =
   101   def join_result: Exn.Result[A] =
   105       case st @ Finished(result) => Some((result, st))
   105       case st @ Finished(result) => Some((result, st))
   106       case _ => None
   106       case _ => None
   107     }
   107     }
   108   }
   108   }
   109 
   109 
   110   def cancel =
   110   def cancel(): Unit =
   111   {
   111   {
   112     status.change {
   112     status.change {
   113       case Ready => task.cancel(false); Finished(Exn.Exn(Exn.Interrupt()))
   113       case Ready => task.cancel(false); Finished(Exn.Exn(Exn.Interrupt()))
   114       case st @ Running(thread) => thread.interrupt; st
   114       case st @ Running(thread) => thread.interrupt(); st
   115       case st => st
   115       case st => st
   116     }
   116     }
   117   }
   117   }
   118 }
   118 }
   119 
   119 
   131   def fulfill_result(result: Exn.Result[A]): Unit =
   131   def fulfill_result(result: Exn.Result[A]): Unit =
   132     state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException)
   132     state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException)
   133 
   133 
   134   def fulfill(x: A): Unit = fulfill_result(Exn.Res(x))
   134   def fulfill(x: A): Unit = fulfill_result(Exn.Res(x))
   135 
   135 
   136   def cancel: Unit =
   136   def cancel(): Unit =
   137     state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st)
   137     state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st)
   138 }
   138 }
   139 
   139 
   140 
   140 
   141 /* thread future */
   141 /* thread future */
   155       inherit_locals = inherit_locals, uninterruptible = uninterruptible)
   155       inherit_locals = inherit_locals, uninterruptible = uninterruptible)
   156     { result.fulfill_result(Exn.capture(body)) }
   156     { result.fulfill_result(Exn.capture(body)) }
   157 
   157 
   158   def peek: Option[Exn.Result[A]] = result.peek
   158   def peek: Option[Exn.Result[A]] = result.peek
   159   def join_result: Exn.Result[A] = result.join_result
   159   def join_result: Exn.Result[A] = result.join_result
   160   def cancel: Unit = thread.interrupt
   160   def cancel(): Unit = thread.interrupt()
   161 }
   161 }