| author | paulson <lp15@cam.ac.uk> |
| Sat, 15 Jul 2023 23:34:42 +0100 | |
| changeset 78336 | 6bae28577994 |
| parent 77409 | d2711c9ffa51 |
| child 78612 | f7df1a444dbb |
| permissions | -rw-r--r-- |
| 34217 | 1 |
/* Title: Pure/Concurrent/future.scala |
2 |
Author: Makarius |
|
3 |
||
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
4 |
Value-oriented parallel execution via futures and promises. |
| 34217 | 5 |
*/ |
6 |
||
7 |
package isabelle |
|
8 |
||
9 |
||
| 61563 | 10 |
import java.util.concurrent.Callable |
11 |
||
12 |
||
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
13 |
/* futures and promises */ |
| 34217 | 14 |
|
| 75393 | 15 |
object Future {
|
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
16 |
def value[A](x: A): Future[A] = new Value_Future(x) |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
17 |
def fork[A](body: => A): Future[A] = new Task_Future[A](body) |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
18 |
def promise[A]: Promise[A] = new Promise_Future[A] |
| 71690 | 19 |
|
20 |
def thread[A]( |
|
21 |
name: String = "", |
|
| 71692 | 22 |
group: ThreadGroup = Isabelle_Thread.current_thread_group, |
| 71690 | 23 |
pri: Int = Thread.NORM_PRIORITY, |
24 |
daemon: Boolean = false, |
|
25 |
inherit_locals: Boolean = false, |
|
| 75393 | 26 |
uninterruptible: Boolean = false)( |
27 |
body: => A |
|
28 |
): Future[A] = {
|
|
29 |
new Thread_Future[A](name, group, pri, daemon, inherit_locals, uninterruptible, body) |
|
30 |
} |
|
| 34217 | 31 |
} |
32 |
||
| 75393 | 33 |
trait Future[A] {
|
| 34217 | 34 |
def peek: Option[Exn.Result[A]] |
35 |
def is_finished: Boolean = peek.isDefined |
|
|
73120
c3589f2dff31
more informative errors: simplify diagnosis of spurious failures reported by users;
wenzelm
parents:
71692
diff
changeset
|
36 |
def get_finished: A = { require(is_finished, "future not finished"); Exn.release(peek.get) }
|
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
37 |
def join_result: Exn.Result[A] |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
38 |
def join: A = Exn.release(join_result) |
| 34217 | 39 |
def map[B](f: A => B): Future[B] = Future.fork { f(join) }
|
| 73367 | 40 |
def cancel(): Unit |
| 34217 | 41 |
|
| 57912 | 42 |
override def toString: String = |
| 34217 | 43 |
peek match {
|
44 |
case None => "<future>" |
|
45 |
case Some(Exn.Exn(_)) => "<failed>" |
|
46 |
case Some(Exn.Res(x)) => x.toString |
|
47 |
} |
|
48 |
} |
|
49 |
||
| 75393 | 50 |
trait Promise[A] extends Future[A] {
|
| 34262 | 51 |
def fulfill_result(res: Exn.Result[A]): Unit |
| 56674 | 52 |
def fulfill(x: A): Unit |
|
34240
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
wenzelm
parents:
34217
diff
changeset
|
53 |
} |
|
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
wenzelm
parents:
34217
diff
changeset
|
54 |
|
|
56673
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
wenzelm
parents:
56663
diff
changeset
|
55 |
|
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
56 |
/* value future */ |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
57 |
|
| 75393 | 58 |
private class Value_Future[A](x: A) extends Future[A] {
|
| 34217 | 59 |
val peek: Option[Exn.Result[A]] = Some(Exn.Res(x)) |
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
60 |
def join_result: Exn.Result[A] = peek.get |
| 73367 | 61 |
def cancel(): Unit = {}
|
| 34217 | 62 |
} |
63 |
||
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
64 |
|
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
65 |
/* task future via thread pool */ |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
66 |
|
| 75393 | 67 |
private class Task_Future[A](body: => A) extends Future[A] {
|
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
68 |
private sealed abstract class Status |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
69 |
private case object Ready extends Status |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
70 |
private case class Running(thread: Thread) extends Status |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
71 |
private case object Terminated extends Status |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
72 |
private case class Finished(result: Exn.Result[A]) extends Status |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
73 |
|
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
74 |
private val status = Synchronized[Status](Ready) |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
75 |
|
|
56673
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
wenzelm
parents:
56663
diff
changeset
|
76 |
def peek: Option[Exn.Result[A]] = |
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
77 |
status.value match {
|
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
78 |
case Finished(result) => Some(result) |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
79 |
case _ => None |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
80 |
} |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
81 |
|
| 75393 | 82 |
private def try_run(): Unit = {
|
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
83 |
val do_run = |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
84 |
status.change_result {
|
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
85 |
case Ready => (true, Running(Thread.currentThread)) |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
86 |
case st => (false, st) |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
87 |
} |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
88 |
if (do_run) {
|
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
89 |
val result = Exn.capture(body) |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
90 |
status.change(_ => Terminated) |
| 73367 | 91 |
status.change(_ => Finished(if (Thread.interrupted()) Exn.Exn(Exn.Interrupt()) else result)) |
|
56673
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
wenzelm
parents:
56663
diff
changeset
|
92 |
} |
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
93 |
} |
| 71692 | 94 |
private val task = Isabelle_Thread.pool.submit(new Callable[Unit] { def call = try_run() })
|
| 34217 | 95 |
|
| 75393 | 96 |
def join_result: Exn.Result[A] = {
|
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
97 |
try_run() |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
98 |
status.guarded_access {
|
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
99 |
case st @ Finished(result) => Some((result, st)) |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
100 |
case _ => None |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
101 |
} |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
102 |
} |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
103 |
|
| 75393 | 104 |
def cancel(): Unit = {
|
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
105 |
status.change {
|
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
106 |
case Ready => task.cancel(false); Finished(Exn.Exn(Exn.Interrupt())) |
| 73367 | 107 |
case st @ Running(thread) => thread.interrupt(); st |
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
108 |
case st => st |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
109 |
} |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
110 |
} |
| 34217 | 111 |
} |
112 |
||
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
113 |
|
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
114 |
/* promise future */ |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
115 |
|
| 75393 | 116 |
private class Promise_Future[A] extends Promise[A] {
|
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
117 |
private val state = Synchronized[Option[Exn.Result[A]]](None) |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
118 |
def peek: Option[Exn.Result[A]] = state.value |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
119 |
|
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
120 |
def join_result: Exn.Result[A] = |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
121 |
state.guarded_access(st => if (st.isEmpty) None else Some((st.get, st))) |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
122 |
|
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
123 |
def fulfill_result(result: Exn.Result[A]): Unit = |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
124 |
state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException) |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
125 |
|
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
126 |
def fulfill(x: A): Unit = fulfill_result(Exn.Res(x)) |
|
34240
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
wenzelm
parents:
34217
diff
changeset
|
127 |
|
| 73367 | 128 |
def cancel(): Unit = |
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
129 |
state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st) |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
130 |
} |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
131 |
|
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
132 |
|
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
133 |
/* thread future */ |
| 59365 | 134 |
|
| 71690 | 135 |
private class Thread_Future[A]( |
136 |
name: String, |
|
137 |
group: ThreadGroup, |
|
138 |
pri: Int, |
|
139 |
daemon: Boolean, |
|
140 |
inherit_locals: Boolean, |
|
141 |
uninterruptible: Boolean, |
|
| 77409 | 142 |
body: => A |
143 |
) extends Future[A] {
|
|
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
144 |
private val result = Future.promise[A] |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
145 |
private val thread = |
| 71692 | 146 |
Isabelle_Thread.fork(name = name, group = group, pri = pri, daemon = daemon, |
| 75393 | 147 |
inherit_locals = inherit_locals, uninterruptible = uninterruptible) {
|
148 |
result.fulfill_result(Exn.capture(body)) } |
|
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
149 |
|
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
150 |
def peek: Option[Exn.Result[A]] = result.peek |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
151 |
def join_result: Exn.Result[A] = result.join_result |
| 73367 | 152 |
def cancel(): Unit = thread.interrupt() |
|
34240
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
wenzelm
parents:
34217
diff
changeset
|
153 |
} |