author | wenzelm |
Thu, 21 Apr 2022 11:28:50 +0200 | |
changeset 75442 | d5041b68a237 |
parent 75393 | 87ebf5a50283 |
child 77409 | d2711c9ffa51 |
permissions | -rw-r--r-- |
34217 | 1 |
/* Title: Pure/Concurrent/future.scala |
2 |
Author: Makarius |
|
3 |
||
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
4 |
Value-oriented parallel execution via futures and promises. |
34217 | 5 |
*/ |
6 |
||
7 |
package isabelle |
|
8 |
||
9 |
||
61563 | 10 |
import java.util.concurrent.Callable |
11 |
||
12 |
||
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
13 |
/* futures and promises */ |
34217 | 14 |
|
75393 | 15 |
object Future { |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
16 |
def value[A](x: A): Future[A] = new Value_Future(x) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
17 |
def fork[A](body: => A): Future[A] = new Task_Future[A](body) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
18 |
def promise[A]: Promise[A] = new Promise_Future[A] |
71690 | 19 |
|
20 |
def thread[A]( |
|
21 |
name: String = "", |
|
71692 | 22 |
group: ThreadGroup = Isabelle_Thread.current_thread_group, |
71690 | 23 |
pri: Int = Thread.NORM_PRIORITY, |
24 |
daemon: Boolean = false, |
|
25 |
inherit_locals: Boolean = false, |
|
75393 | 26 |
uninterruptible: Boolean = false)( |
27 |
body: => A |
|
28 |
): Future[A] = { |
|
29 |
new Thread_Future[A](name, group, pri, daemon, inherit_locals, uninterruptible, body) |
|
30 |
} |
|
34217 | 31 |
} |
32 |
||
75393 | 33 |
trait Future[A] { |
34217 | 34 |
def peek: Option[Exn.Result[A]] |
35 |
def is_finished: Boolean = peek.isDefined |
|
73120
c3589f2dff31
more informative errors: simplify diagnosis of spurious failures reported by users;
wenzelm
parents:
71692
diff
changeset
|
36 |
def get_finished: A = { require(is_finished, "future not finished"); Exn.release(peek.get) } |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
37 |
def join_result: Exn.Result[A] |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
38 |
def join: A = Exn.release(join_result) |
34217 | 39 |
def map[B](f: A => B): Future[B] = Future.fork { f(join) } |
73367 | 40 |
def cancel(): Unit |
34217 | 41 |
|
57912 | 42 |
override def toString: String = |
34217 | 43 |
peek match { |
44 |
case None => "<future>" |
|
45 |
case Some(Exn.Exn(_)) => "<failed>" |
|
46 |
case Some(Exn.Res(x)) => x.toString |
|
47 |
} |
|
48 |
} |
|
49 |
||
75393 | 50 |
trait Promise[A] extends Future[A] { |
34262 | 51 |
def fulfill_result(res: Exn.Result[A]): Unit |
56674 | 52 |
def fulfill(x: A): Unit |
34240
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
wenzelm
parents:
34217
diff
changeset
|
53 |
} |
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
wenzelm
parents:
34217
diff
changeset
|
54 |
|
56673
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
wenzelm
parents:
56663
diff
changeset
|
55 |
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
56 |
/* value future */ |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
57 |
|
75393 | 58 |
private class Value_Future[A](x: A) extends Future[A] { |
34217 | 59 |
val peek: Option[Exn.Result[A]] = Some(Exn.Res(x)) |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
60 |
def join_result: Exn.Result[A] = peek.get |
73367 | 61 |
def cancel(): Unit = {} |
34217 | 62 |
} |
63 |
||
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
64 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
65 |
/* task future via thread pool */ |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
66 |
|
75393 | 67 |
private class Task_Future[A](body: => A) extends Future[A] { |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
68 |
private sealed abstract class Status |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
69 |
private case object Ready extends Status |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
70 |
private case class Running(thread: Thread) extends Status |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
71 |
private case object Terminated extends Status |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
72 |
private case class Finished(result: Exn.Result[A]) extends Status |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
73 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
74 |
private val status = Synchronized[Status](Ready) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
75 |
|
56673
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
wenzelm
parents:
56663
diff
changeset
|
76 |
def peek: Option[Exn.Result[A]] = |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
77 |
status.value match { |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
78 |
case Finished(result) => Some(result) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
79 |
case _ => None |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
80 |
} |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
81 |
|
75393 | 82 |
private def try_run(): Unit = { |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
83 |
val do_run = |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
84 |
status.change_result { |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
85 |
case Ready => (true, Running(Thread.currentThread)) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
86 |
case st => (false, st) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
87 |
} |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
88 |
if (do_run) { |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
89 |
val result = Exn.capture(body) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
90 |
status.change(_ => Terminated) |
73367 | 91 |
status.change(_ => Finished(if (Thread.interrupted()) Exn.Exn(Exn.Interrupt()) else result)) |
56673
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
wenzelm
parents:
56663
diff
changeset
|
92 |
} |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
93 |
} |
71692 | 94 |
private val task = Isabelle_Thread.pool.submit(new Callable[Unit] { def call = try_run() }) |
34217 | 95 |
|
75393 | 96 |
def join_result: Exn.Result[A] = { |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
97 |
try_run() |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
98 |
status.guarded_access { |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
99 |
case st @ Finished(result) => Some((result, st)) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
100 |
case _ => None |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
101 |
} |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
102 |
} |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
103 |
|
75393 | 104 |
def cancel(): Unit = { |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
105 |
status.change { |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
106 |
case Ready => task.cancel(false); Finished(Exn.Exn(Exn.Interrupt())) |
73367 | 107 |
case st @ Running(thread) => thread.interrupt(); st |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
108 |
case st => st |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
109 |
} |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
110 |
} |
34217 | 111 |
} |
112 |
||
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
113 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
114 |
/* promise future */ |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
115 |
|
75393 | 116 |
private class Promise_Future[A] extends Promise[A] { |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
117 |
private val state = Synchronized[Option[Exn.Result[A]]](None) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
118 |
def peek: Option[Exn.Result[A]] = state.value |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
119 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
120 |
def join_result: Exn.Result[A] = |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
121 |
state.guarded_access(st => if (st.isEmpty) None else Some((st.get, st))) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
122 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
123 |
def fulfill_result(result: Exn.Result[A]): Unit = |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
124 |
state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
125 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
126 |
def fulfill(x: A): Unit = fulfill_result(Exn.Res(x)) |
34240
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
wenzelm
parents:
34217
diff
changeset
|
127 |
|
73367 | 128 |
def cancel(): Unit = |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
129 |
state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
130 |
} |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
131 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
132 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
133 |
/* thread future */ |
59365 | 134 |
|
71690 | 135 |
private class Thread_Future[A]( |
136 |
name: String, |
|
137 |
group: ThreadGroup, |
|
138 |
pri: Int, |
|
139 |
daemon: Boolean, |
|
140 |
inherit_locals: Boolean, |
|
141 |
uninterruptible: Boolean, |
|
75393 | 142 |
body: => A) extends Future[A] { |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
143 |
private val result = Future.promise[A] |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
144 |
private val thread = |
71692 | 145 |
Isabelle_Thread.fork(name = name, group = group, pri = pri, daemon = daemon, |
75393 | 146 |
inherit_locals = inherit_locals, uninterruptible = uninterruptible) { |
147 |
result.fulfill_result(Exn.capture(body)) } |
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
148 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
149 |
def peek: Option[Exn.Result[A]] = result.peek |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
150 |
def join_result: Exn.Result[A] = result.join_result |
73367 | 151 |
def cancel(): Unit = thread.interrupt() |
34240
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
wenzelm
parents:
34217
diff
changeset
|
152 |
} |