| author | wenzelm | 
| Thu, 11 May 2023 12:21:50 +0200 | |
| changeset 78033 | 9c18535a9fcd | 
| parent 77409 | d2711c9ffa51 | 
| child 78612 | f7df1a444dbb | 
| permissions | -rw-r--r-- | 
| 34217 | 1 | /* Title: Pure/Concurrent/future.scala | 
| 2 | Author: Makarius | |
| 3 | ||
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 4 | Value-oriented parallel execution via futures and promises. | 
| 34217 | 5 | */ | 
| 6 | ||
| 7 | package isabelle | |
| 8 | ||
| 9 | ||
| 61563 | 10 | import java.util.concurrent.Callable | 
| 11 | ||
| 12 | ||
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 13 | /* futures and promises */ | 
| 34217 | 14 | |
| 75393 | 15 | object Future {
 | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 16 | def value[A](x: A): Future[A] = new Value_Future(x) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 17 | def fork[A](body: => A): Future[A] = new Task_Future[A](body) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 18 | def promise[A]: Promise[A] = new Promise_Future[A] | 
| 71690 | 19 | |
| 20 | def thread[A]( | |
| 21 | name: String = "", | |
| 71692 | 22 | group: ThreadGroup = Isabelle_Thread.current_thread_group, | 
| 71690 | 23 | pri: Int = Thread.NORM_PRIORITY, | 
| 24 | daemon: Boolean = false, | |
| 25 | inherit_locals: Boolean = false, | |
| 75393 | 26 | uninterruptible: Boolean = false)( | 
| 27 | body: => A | |
| 28 |   ): Future[A] = {
 | |
| 29 | new Thread_Future[A](name, group, pri, daemon, inherit_locals, uninterruptible, body) | |
| 30 | } | |
| 34217 | 31 | } | 
| 32 | ||
| 75393 | 33 | trait Future[A] {
 | 
| 34217 | 34 | def peek: Option[Exn.Result[A]] | 
| 35 | def is_finished: Boolean = peek.isDefined | |
| 73120 
c3589f2dff31
more informative errors: simplify diagnosis of spurious failures reported by users;
 wenzelm parents: 
71692diff
changeset | 36 |   def get_finished: A = { require(is_finished, "future not finished"); Exn.release(peek.get) }
 | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 37 | def join_result: Exn.Result[A] | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 38 | def join: A = Exn.release(join_result) | 
| 34217 | 39 |   def map[B](f: A => B): Future[B] = Future.fork { f(join) }
 | 
| 73367 | 40 | def cancel(): Unit | 
| 34217 | 41 | |
| 57912 | 42 | override def toString: String = | 
| 34217 | 43 |     peek match {
 | 
| 44 | case None => "<future>" | |
| 45 | case Some(Exn.Exn(_)) => "<failed>" | |
| 46 | case Some(Exn.Res(x)) => x.toString | |
| 47 | } | |
| 48 | } | |
| 49 | ||
| 75393 | 50 | trait Promise[A] extends Future[A] {
 | 
| 34262 | 51 | def fulfill_result(res: Exn.Result[A]): Unit | 
| 56674 | 52 | def fulfill(x: A): Unit | 
| 34240 
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
 wenzelm parents: 
34217diff
changeset | 53 | } | 
| 
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
 wenzelm parents: 
34217diff
changeset | 54 | |
| 56673 
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
 wenzelm parents: 
56663diff
changeset | 55 | |
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 56 | /* value future */ | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 57 | |
| 75393 | 58 | private class Value_Future[A](x: A) extends Future[A] {
 | 
| 34217 | 59 | val peek: Option[Exn.Result[A]] = Some(Exn.Res(x)) | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 60 | def join_result: Exn.Result[A] = peek.get | 
| 73367 | 61 |   def cancel(): Unit = {}
 | 
| 34217 | 62 | } | 
| 63 | ||
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 64 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 65 | /* task future via thread pool */ | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 66 | |
| 75393 | 67 | private class Task_Future[A](body: => A) extends Future[A] {
 | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 68 | private sealed abstract class Status | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 69 | private case object Ready extends Status | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 70 | private case class Running(thread: Thread) extends Status | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 71 | private case object Terminated extends Status | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 72 | private case class Finished(result: Exn.Result[A]) extends Status | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 73 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 74 | private val status = Synchronized[Status](Ready) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 75 | |
| 56673 
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
 wenzelm parents: 
56663diff
changeset | 76 | def peek: Option[Exn.Result[A]] = | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 77 |     status.value match {
 | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 78 | case Finished(result) => Some(result) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 79 | case _ => None | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 80 | } | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 81 | |
| 75393 | 82 |   private def try_run(): Unit = {
 | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 83 | val do_run = | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 84 |       status.change_result {
 | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 85 | case Ready => (true, Running(Thread.currentThread)) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 86 | case st => (false, st) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 87 | } | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 88 |     if (do_run) {
 | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 89 | val result = Exn.capture(body) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 90 | status.change(_ => Terminated) | 
| 73367 | 91 | status.change(_ => Finished(if (Thread.interrupted()) Exn.Exn(Exn.Interrupt()) else result)) | 
| 56673 
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
 wenzelm parents: 
56663diff
changeset | 92 | } | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 93 | } | 
| 71692 | 94 |   private val task = Isabelle_Thread.pool.submit(new Callable[Unit] { def call = try_run() })
 | 
| 34217 | 95 | |
| 75393 | 96 |   def join_result: Exn.Result[A] = {
 | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 97 | try_run() | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 98 |     status.guarded_access {
 | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 99 | case st @ Finished(result) => Some((result, st)) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 100 | case _ => None | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 101 | } | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 102 | } | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 103 | |
| 75393 | 104 |   def cancel(): Unit = {
 | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 105 |     status.change {
 | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 106 | case Ready => task.cancel(false); Finished(Exn.Exn(Exn.Interrupt())) | 
| 73367 | 107 | case st @ Running(thread) => thread.interrupt(); st | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 108 | case st => st | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 109 | } | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 110 | } | 
| 34217 | 111 | } | 
| 112 | ||
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 113 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 114 | /* promise future */ | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 115 | |
| 75393 | 116 | private class Promise_Future[A] extends Promise[A] {
 | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 117 | private val state = Synchronized[Option[Exn.Result[A]]](None) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 118 | def peek: Option[Exn.Result[A]] = state.value | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 119 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 120 | def join_result: Exn.Result[A] = | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 121 | state.guarded_access(st => if (st.isEmpty) None else Some((st.get, st))) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 122 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 123 | def fulfill_result(result: Exn.Result[A]): Unit = | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 124 | state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 125 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 126 | def fulfill(x: A): Unit = fulfill_result(Exn.Res(x)) | 
| 34240 
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
 wenzelm parents: 
34217diff
changeset | 127 | |
| 73367 | 128 | def cancel(): Unit = | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 129 | state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 130 | } | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 131 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 132 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 133 | /* thread future */ | 
| 59365 | 134 | |
| 71690 | 135 | private class Thread_Future[A]( | 
| 136 | name: String, | |
| 137 | group: ThreadGroup, | |
| 138 | pri: Int, | |
| 139 | daemon: Boolean, | |
| 140 | inherit_locals: Boolean, | |
| 141 | uninterruptible: Boolean, | |
| 77409 | 142 | body: => A | 
| 143 | ) extends Future[A] {
 | |
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 144 | private val result = Future.promise[A] | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 145 | private val thread = | 
| 71692 | 146 | Isabelle_Thread.fork(name = name, group = group, pri = pri, daemon = daemon, | 
| 75393 | 147 |       inherit_locals = inherit_locals, uninterruptible = uninterruptible) {
 | 
| 148 | result.fulfill_result(Exn.capture(body)) } | |
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 149 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 150 | def peek: Option[Exn.Result[A]] = result.peek | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 151 | def join_result: Exn.Result[A] = result.join_result | 
| 73367 | 152 | def cancel(): Unit = thread.interrupt() | 
| 34240 
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
 wenzelm parents: 
34217diff
changeset | 153 | } |