src/Pure/Concurrent/future.scala
author wenzelm
Tue Nov 03 16:35:38 2015 +0100 (2015-11-03)
changeset 61559 313eca3fa847
parent 61556 0d4ee4168e41
child 61563 91c3aedbfc5e
permissions -rw-r--r--
more direct task future implementation, with proper cancel operation;
more uniform Future.thread;
wenzelm@34217
     1
/*  Title:      Pure/Concurrent/future.scala
wenzelm@45673
     2
    Module:     PIDE
wenzelm@34217
     3
    Author:     Makarius
wenzelm@34217
     4
wenzelm@61559
     5
Value-oriented parallel execution via futures and promises.
wenzelm@34217
     6
*/
wenzelm@34217
     7
wenzelm@34217
     8
package isabelle
wenzelm@34217
     9
wenzelm@34217
    10
wenzelm@61559
    11
/* futures and promises */
wenzelm@34217
    12
wenzelm@34217
    13
object Future
wenzelm@34217
    14
{
wenzelm@61559
    15
  def value[A](x: A): Future[A] = new Value_Future(x)
wenzelm@61559
    16
  def fork[A](body: => A): Future[A] = new Task_Future[A](body)
wenzelm@61559
    17
  def promise[A]: Promise[A] = new Promise_Future[A]
wenzelm@61559
    18
  def thread[A](name: String = "", daemon: Boolean = false)(body: => A): Future[A] =
wenzelm@61559
    19
    new Thread_Future[A](name, daemon, body)
wenzelm@34217
    20
}
wenzelm@34217
    21
wenzelm@56673
    22
trait Future[A]
wenzelm@34217
    23
{
wenzelm@34217
    24
  def peek: Option[Exn.Result[A]]
wenzelm@34217
    25
  def is_finished: Boolean = peek.isDefined
wenzelm@38848
    26
  def get_finished: A = { require(is_finished); Exn.release(peek.get) }
wenzelm@61559
    27
  def join_result: Exn.Result[A]
wenzelm@61559
    28
  def join: A = Exn.release(join_result)
wenzelm@34217
    29
  def map[B](f: A => B): Future[B] = Future.fork { f(join) }
wenzelm@61559
    30
  def cancel: Unit
wenzelm@34217
    31
wenzelm@57912
    32
  override def toString: String =
wenzelm@34217
    33
    peek match {
wenzelm@34217
    34
      case None => "<future>"
wenzelm@34217
    35
      case Some(Exn.Exn(_)) => "<failed>"
wenzelm@34217
    36
      case Some(Exn.Res(x)) => x.toString
wenzelm@34217
    37
    }
wenzelm@34217
    38
}
wenzelm@34217
    39
wenzelm@56673
    40
trait Promise[A] extends Future[A]
wenzelm@34240
    41
{
wenzelm@34262
    42
  def fulfill_result(res: Exn.Result[A]): Unit
wenzelm@56674
    43
  def fulfill(x: A): Unit
wenzelm@34240
    44
}
wenzelm@34240
    45
wenzelm@56673
    46
wenzelm@61559
    47
/* value future */
wenzelm@61559
    48
wenzelm@61559
    49
private class Value_Future[A](x: A) extends Future[A]
wenzelm@34217
    50
{
wenzelm@34217
    51
  val peek: Option[Exn.Result[A]] = Some(Exn.Res(x))
wenzelm@61559
    52
  def join_result: Exn.Result[A] = peek.get
wenzelm@61559
    53
  def cancel {}
wenzelm@34217
    54
}
wenzelm@34217
    55
wenzelm@61559
    56
wenzelm@61559
    57
/* task future via thread pool */
wenzelm@61559
    58
wenzelm@61559
    59
private class Task_Future[A](body: => A) extends Future[A]
wenzelm@34217
    60
{
wenzelm@61559
    61
  private sealed abstract class Status
wenzelm@61559
    62
  private case object Ready extends Status
wenzelm@61559
    63
  private case class Running(thread: Thread) extends Status
wenzelm@61559
    64
  private case object Terminated extends Status
wenzelm@61559
    65
  private case class Finished(result: Exn.Result[A]) extends Status
wenzelm@61559
    66
wenzelm@61559
    67
  private val status = Synchronized[Status](Ready)
wenzelm@61559
    68
wenzelm@56673
    69
  def peek: Option[Exn.Result[A]] =
wenzelm@61559
    70
    status.value match {
wenzelm@61559
    71
      case Finished(result) => Some(result)
wenzelm@61559
    72
      case _ => None
wenzelm@61559
    73
    }
wenzelm@61559
    74
wenzelm@61559
    75
  private def try_run()
wenzelm@61559
    76
  {
wenzelm@61559
    77
    val do_run =
wenzelm@61559
    78
      status.change_result {
wenzelm@61559
    79
        case Ready => (true, Running(Thread.currentThread))
wenzelm@61559
    80
        case st => (false, st)
wenzelm@61559
    81
      }
wenzelm@61559
    82
    if (do_run) {
wenzelm@61559
    83
      val result = Exn.capture(body)
wenzelm@61559
    84
      status.change(_ => Terminated)
wenzelm@61559
    85
      status.change(_ => Finished(if (Thread.interrupted) Exn.Exn(Exn.Interrupt()) else result))
wenzelm@56673
    86
    }
wenzelm@61559
    87
  }
wenzelm@61559
    88
  private val task = Standard_Thread.submit_task { try_run() }
wenzelm@34217
    89
wenzelm@61559
    90
  def join_result: Exn.Result[A] =
wenzelm@61559
    91
  {
wenzelm@61559
    92
    try_run()
wenzelm@61559
    93
    status.guarded_access {
wenzelm@61559
    94
      case st @ Finished(result) => Some((result, st))
wenzelm@61559
    95
      case _ => None
wenzelm@61559
    96
    }
wenzelm@61559
    97
  }
wenzelm@61559
    98
wenzelm@61559
    99
  def cancel =
wenzelm@61559
   100
  {
wenzelm@61559
   101
    status.change {
wenzelm@61559
   102
      case Ready => task.cancel(false); Finished(Exn.Exn(Exn.Interrupt()))
wenzelm@61559
   103
      case st @ Running(thread) => thread.interrupt; st
wenzelm@61559
   104
      case st => st
wenzelm@61559
   105
    }
wenzelm@61559
   106
  }
wenzelm@34217
   107
}
wenzelm@34217
   108
wenzelm@61559
   109
wenzelm@61559
   110
/* promise future */
wenzelm@61559
   111
wenzelm@61559
   112
private class Promise_Future[A] extends Promise[A]
wenzelm@34240
   113
{
wenzelm@61559
   114
  private val state = Synchronized[Option[Exn.Result[A]]](None)
wenzelm@61559
   115
  def peek: Option[Exn.Result[A]] = state.value
wenzelm@61559
   116
wenzelm@61559
   117
  def join_result: Exn.Result[A] =
wenzelm@61559
   118
    state.guarded_access(st => if (st.isEmpty) None else Some((st.get, st)))
wenzelm@61559
   119
wenzelm@61559
   120
  def fulfill_result(result: Exn.Result[A]): Unit =
wenzelm@61559
   121
    state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException)
wenzelm@61559
   122
wenzelm@61559
   123
  def fulfill(x: A): Unit = fulfill_result(Exn.Res(x))
wenzelm@34240
   124
wenzelm@59365
   125
  def cancel: Unit =
wenzelm@61559
   126
    state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st)
wenzelm@61559
   127
}
wenzelm@61559
   128
wenzelm@61559
   129
wenzelm@61559
   130
/* thread future */
wenzelm@59365
   131
wenzelm@61559
   132
private class Thread_Future[A](name: String, daemon: Boolean, body: => A) extends Future[A]
wenzelm@61559
   133
{
wenzelm@61559
   134
  private val result = Future.promise[A]
wenzelm@61559
   135
  private val thread =
wenzelm@61559
   136
    Standard_Thread.fork(name, daemon) { result.fulfill_result(Exn.capture(body)) }
wenzelm@61559
   137
wenzelm@61559
   138
  def peek: Option[Exn.Result[A]] = result.peek
wenzelm@61559
   139
  def join_result: Exn.Result[A] = result.join_result
wenzelm@61559
   140
  def cancel: Unit = thread.interrupt
wenzelm@34240
   141
}