| author | wenzelm | 
| Thu, 23 Nov 2023 11:36:38 +0100 | |
| changeset 79043 | 22c41ee13939 | 
| parent 78612 | f7df1a444dbb | 
| permissions | -rw-r--r-- | 
| 34217 | 1 | /* Title: Pure/Concurrent/future.scala | 
| 2 | Author: Makarius | |
| 3 | ||
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 4 | Value-oriented parallel execution via futures and promises. | 
| 34217 | 5 | */ | 
| 6 | ||
| 7 | package isabelle | |
| 8 | ||
| 9 | ||
| 61563 | 10 | import java.util.concurrent.Callable | 
| 11 | ||
| 12 | ||
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 13 | /* futures and promises */ | 
| 34217 | 14 | |
| 75393 | 15 | object Future {
 | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 16 | def value[A](x: A): Future[A] = new Value_Future(x) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 17 | def fork[A](body: => A): Future[A] = new Task_Future[A](body) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 18 | def promise[A]: Promise[A] = new Promise_Future[A] | 
| 71690 | 19 | |
| 20 | def thread[A]( | |
| 21 | name: String = "", | |
| 71692 | 22 | group: ThreadGroup = Isabelle_Thread.current_thread_group, | 
| 71690 | 23 | pri: Int = Thread.NORM_PRIORITY, | 
| 24 | daemon: Boolean = false, | |
| 25 | inherit_locals: Boolean = false, | |
| 75393 | 26 | uninterruptible: Boolean = false)( | 
| 27 | body: => A | |
| 28 |   ): Future[A] = {
 | |
| 29 | new Thread_Future[A](name, group, pri, daemon, inherit_locals, uninterruptible, body) | |
| 30 | } | |
| 34217 | 31 | } | 
| 32 | ||
| 75393 | 33 | trait Future[A] {
 | 
| 34217 | 34 | def peek: Option[Exn.Result[A]] | 
| 35 | def is_finished: Boolean = peek.isDefined | |
| 73120 
c3589f2dff31
more informative errors: simplify diagnosis of spurious failures reported by users;
 wenzelm parents: 
71692diff
changeset | 36 |   def get_finished: A = { require(is_finished, "future not finished"); Exn.release(peek.get) }
 | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 37 | def join_result: Exn.Result[A] | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 38 | def join: A = Exn.release(join_result) | 
| 34217 | 39 |   def map[B](f: A => B): Future[B] = Future.fork { f(join) }
 | 
| 73367 | 40 | def cancel(): Unit | 
| 34217 | 41 | |
| 57912 | 42 | override def toString: String = | 
| 34217 | 43 |     peek match {
 | 
| 44 | case None => "<future>" | |
| 45 | case Some(Exn.Exn(_)) => "<failed>" | |
| 46 | case Some(Exn.Res(x)) => x.toString | |
| 47 | } | |
| 48 | } | |
| 49 | ||
| 75393 | 50 | trait Promise[A] extends Future[A] {
 | 
| 34262 | 51 | def fulfill_result(res: Exn.Result[A]): Unit | 
| 56674 | 52 | def fulfill(x: A): Unit | 
| 34240 
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
 wenzelm parents: 
34217diff
changeset | 53 | } | 
| 
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
 wenzelm parents: 
34217diff
changeset | 54 | |
| 56673 
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
 wenzelm parents: 
56663diff
changeset | 55 | |
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 56 | /* value future */ | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 57 | |
| 75393 | 58 | private class Value_Future[A](x: A) extends Future[A] {
 | 
| 34217 | 59 | val peek: Option[Exn.Result[A]] = Some(Exn.Res(x)) | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 60 | def join_result: Exn.Result[A] = peek.get | 
| 73367 | 61 |   def cancel(): Unit = {}
 | 
| 34217 | 62 | } | 
| 63 | ||
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 64 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 65 | /* task future via thread pool */ | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 66 | |
| 75393 | 67 | private class Task_Future[A](body: => A) extends Future[A] {
 | 
| 78612 | 68 |   private enum Status {
 | 
| 69 | case Ready extends Status | |
| 70 | case Running(thread: Thread) extends Status | |
| 71 | case Terminated extends Status | |
| 72 | case Finished(result: Exn.Result[A]) extends Status | |
| 73 | } | |
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 74 | |
| 78612 | 75 | private val status = Synchronized[Status](Status.Ready) | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 76 | |
| 56673 
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
 wenzelm parents: 
56663diff
changeset | 77 | def peek: Option[Exn.Result[A]] = | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 78 |     status.value match {
 | 
| 78612 | 79 | case Status.Finished(result) => Some(result) | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 80 | case _ => None | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 81 | } | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 82 | |
| 75393 | 83 |   private def try_run(): Unit = {
 | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 84 | val do_run = | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 85 |       status.change_result {
 | 
| 78612 | 86 | case Status.Ready => (true, Status.Running(Thread.currentThread)) | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 87 | case st => (false, st) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 88 | } | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 89 |     if (do_run) {
 | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 90 | val result = Exn.capture(body) | 
| 78612 | 91 | status.change(_ => Status.Terminated) | 
| 92 | status.change(_ => | |
| 93 | Status.Finished(if (Thread.interrupted()) Exn.Exn(Exn.Interrupt()) else result)) | |
| 56673 
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
 wenzelm parents: 
56663diff
changeset | 94 | } | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 95 | } | 
| 71692 | 96 |   private val task = Isabelle_Thread.pool.submit(new Callable[Unit] { def call = try_run() })
 | 
| 34217 | 97 | |
| 75393 | 98 |   def join_result: Exn.Result[A] = {
 | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 99 | try_run() | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 100 |     status.guarded_access {
 | 
| 78612 | 101 | case st @ Status.Finished(result) => Some((result, st)) | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 102 | case _ => None | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 103 | } | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 104 | } | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 105 | |
| 75393 | 106 |   def cancel(): Unit = {
 | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 107 |     status.change {
 | 
| 78612 | 108 | case Status.Ready => task.cancel(false); Status.Finished(Exn.Exn(Exn.Interrupt())) | 
| 109 | case st @ Status.Running(thread) => thread.interrupt(); st | |
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 110 | case st => st | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 111 | } | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 112 | } | 
| 34217 | 113 | } | 
| 114 | ||
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 115 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 116 | /* promise future */ | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 117 | |
| 75393 | 118 | private class Promise_Future[A] extends Promise[A] {
 | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 119 | private val state = Synchronized[Option[Exn.Result[A]]](None) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 120 | def peek: Option[Exn.Result[A]] = state.value | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 121 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 122 | def join_result: Exn.Result[A] = | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 123 | state.guarded_access(st => if (st.isEmpty) None else Some((st.get, st))) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 124 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 125 | def fulfill_result(result: Exn.Result[A]): Unit = | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 126 | state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 127 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 128 | def fulfill(x: A): Unit = fulfill_result(Exn.Res(x)) | 
| 34240 
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
 wenzelm parents: 
34217diff
changeset | 129 | |
| 73367 | 130 | def cancel(): Unit = | 
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 131 | state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st) | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 132 | } | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 133 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 134 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 135 | /* thread future */ | 
| 59365 | 136 | |
| 71690 | 137 | private class Thread_Future[A]( | 
| 138 | name: String, | |
| 139 | group: ThreadGroup, | |
| 140 | pri: Int, | |
| 141 | daemon: Boolean, | |
| 142 | inherit_locals: Boolean, | |
| 143 | uninterruptible: Boolean, | |
| 77409 | 144 | body: => A | 
| 145 | ) extends Future[A] {
 | |
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 146 | private val result = Future.promise[A] | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 147 | private val thread = | 
| 71692 | 148 | Isabelle_Thread.fork(name = name, group = group, pri = pri, daemon = daemon, | 
| 75393 | 149 |       inherit_locals = inherit_locals, uninterruptible = uninterruptible) {
 | 
| 150 | result.fulfill_result(Exn.capture(body)) } | |
| 61559 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 151 | |
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 152 | def peek: Option[Exn.Result[A]] = result.peek | 
| 
313eca3fa847
more direct task future implementation, with proper cancel operation;
 wenzelm parents: 
61556diff
changeset | 153 | def join_result: Exn.Result[A] = result.join_result | 
| 73367 | 154 | def cancel(): Unit = thread.interrupt() | 
| 34240 
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
 wenzelm parents: 
34217diff
changeset | 155 | } |