author | wenzelm |
Sun, 13 Dec 2020 23:11:41 +0100 | |
changeset 72907 | 3883f536d84d |
parent 71692 | f8e52c0152fe |
child 73120 | c3589f2dff31 |
permissions | -rw-r--r-- |
34217 | 1 |
/* Title: Pure/Concurrent/future.scala |
2 |
Author: Makarius |
|
3 |
||
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
4 |
Value-oriented parallel execution via futures and promises. |
34217 | 5 |
*/ |
6 |
||
7 |
package isabelle |
|
8 |
||
9 |
||
61563 | 10 |
import java.util.concurrent.Callable |
11 |
||
12 |
||
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
13 |
/* futures and promises */ |
34217 | 14 |
|
15 |
object Future |
|
16 |
{ |
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
17 |
def value[A](x: A): Future[A] = new Value_Future(x) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
18 |
def fork[A](body: => A): Future[A] = new Task_Future[A](body) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
19 |
def promise[A]: Promise[A] = new Promise_Future[A] |
71690 | 20 |
|
21 |
def thread[A]( |
|
22 |
name: String = "", |
|
71692 | 23 |
group: ThreadGroup = Isabelle_Thread.current_thread_group, |
71690 | 24 |
pri: Int = Thread.NORM_PRIORITY, |
25 |
daemon: Boolean = false, |
|
26 |
inherit_locals: Boolean = false, |
|
27 |
uninterruptible: Boolean = false)(body: => A): Future[A] = |
|
28 |
{ |
|
29 |
new Thread_Future[A](name, group, pri, daemon, inherit_locals, uninterruptible, body) |
|
30 |
} |
|
34217 | 31 |
} |
32 |
||
56673
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
wenzelm
parents:
56663
diff
changeset
|
33 |
trait Future[A] |
34217 | 34 |
{ |
35 |
def peek: Option[Exn.Result[A]] |
|
36 |
def is_finished: Boolean = peek.isDefined |
|
38848
9483bb678d96
use Future.get_finished where this is the intended meaning -- prefer immediate crash over deadlock due to promises that are never fulfilled;
wenzelm
parents:
34262
diff
changeset
|
37 |
def get_finished: A = { require(is_finished); Exn.release(peek.get) } |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
38 |
def join_result: Exn.Result[A] |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
39 |
def join: A = Exn.release(join_result) |
34217 | 40 |
def map[B](f: A => B): Future[B] = Future.fork { f(join) } |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
41 |
def cancel: Unit |
34217 | 42 |
|
57912 | 43 |
override def toString: String = |
34217 | 44 |
peek match { |
45 |
case None => "<future>" |
|
46 |
case Some(Exn.Exn(_)) => "<failed>" |
|
47 |
case Some(Exn.Res(x)) => x.toString |
|
48 |
} |
|
49 |
} |
|
50 |
||
56673
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
wenzelm
parents:
56663
diff
changeset
|
51 |
trait Promise[A] extends Future[A] |
34240
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
wenzelm
parents:
34217
diff
changeset
|
52 |
{ |
34262 | 53 |
def fulfill_result(res: Exn.Result[A]): Unit |
56674 | 54 |
def fulfill(x: A): Unit |
34240
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
wenzelm
parents:
34217
diff
changeset
|
55 |
} |
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
wenzelm
parents:
34217
diff
changeset
|
56 |
|
56673
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
wenzelm
parents:
56663
diff
changeset
|
57 |
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
58 |
/* value future */ |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
59 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
60 |
private class Value_Future[A](x: A) extends Future[A] |
34217 | 61 |
{ |
62 |
val peek: Option[Exn.Result[A]] = Some(Exn.Res(x)) |
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
63 |
def join_result: Exn.Result[A] = peek.get |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
64 |
def cancel {} |
34217 | 65 |
} |
66 |
||
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
67 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
68 |
/* task future via thread pool */ |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
69 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
70 |
private class Task_Future[A](body: => A) extends Future[A] |
34217 | 71 |
{ |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
72 |
private sealed abstract class Status |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
73 |
private case object Ready extends Status |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
74 |
private case class Running(thread: Thread) extends Status |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
75 |
private case object Terminated extends Status |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
76 |
private case class Finished(result: Exn.Result[A]) extends Status |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
77 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
78 |
private val status = Synchronized[Status](Ready) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
79 |
|
56673
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
wenzelm
parents:
56663
diff
changeset
|
80 |
def peek: Option[Exn.Result[A]] = |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
81 |
status.value match { |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
82 |
case Finished(result) => Some(result) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
83 |
case _ => None |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
84 |
} |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
85 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
86 |
private def try_run() |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
87 |
{ |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
88 |
val do_run = |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
89 |
status.change_result { |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
90 |
case Ready => (true, Running(Thread.currentThread)) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
91 |
case st => (false, st) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
92 |
} |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
93 |
if (do_run) { |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
94 |
val result = Exn.capture(body) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
95 |
status.change(_ => Terminated) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
96 |
status.change(_ => Finished(if (Thread.interrupted) Exn.Exn(Exn.Interrupt()) else result)) |
56673
42bf8fffdf6a
modernized Future/Promise implementation, bypassing old actors;
wenzelm
parents:
56663
diff
changeset
|
97 |
} |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
98 |
} |
71692 | 99 |
private val task = Isabelle_Thread.pool.submit(new Callable[Unit] { def call = try_run() }) |
34217 | 100 |
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
101 |
def join_result: Exn.Result[A] = |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
102 |
{ |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
103 |
try_run() |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
104 |
status.guarded_access { |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
105 |
case st @ Finished(result) => Some((result, st)) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
106 |
case _ => None |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
107 |
} |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
108 |
} |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
109 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
110 |
def cancel = |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
111 |
{ |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
112 |
status.change { |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
113 |
case Ready => task.cancel(false); Finished(Exn.Exn(Exn.Interrupt())) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
114 |
case st @ Running(thread) => thread.interrupt; st |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
115 |
case st => st |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
116 |
} |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
117 |
} |
34217 | 118 |
} |
119 |
||
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
120 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
121 |
/* promise future */ |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
122 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
123 |
private class Promise_Future[A] extends Promise[A] |
34240
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
wenzelm
parents:
34217
diff
changeset
|
124 |
{ |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
125 |
private val state = Synchronized[Option[Exn.Result[A]]](None) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
126 |
def peek: Option[Exn.Result[A]] = state.value |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
127 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
128 |
def join_result: Exn.Result[A] = |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
129 |
state.guarded_access(st => if (st.isEmpty) None else Some((st.get, st))) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
130 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
131 |
def fulfill_result(result: Exn.Result[A]): Unit = |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
132 |
state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
133 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
134 |
def fulfill(x: A): Unit = fulfill_result(Exn.Res(x)) |
34240
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
wenzelm
parents:
34217
diff
changeset
|
135 |
|
59365 | 136 |
def cancel: Unit = |
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
137 |
state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st) |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
138 |
} |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
139 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
140 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
141 |
/* thread future */ |
59365 | 142 |
|
71690 | 143 |
private class Thread_Future[A]( |
144 |
name: String, |
|
145 |
group: ThreadGroup, |
|
146 |
pri: Int, |
|
147 |
daemon: Boolean, |
|
148 |
inherit_locals: Boolean, |
|
149 |
uninterruptible: Boolean, |
|
150 |
body: => A) extends Future[A] |
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
151 |
{ |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
152 |
private val result = Future.promise[A] |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
153 |
private val thread = |
71692 | 154 |
Isabelle_Thread.fork(name = name, group = group, pri = pri, daemon = daemon, |
71690 | 155 |
inherit_locals = inherit_locals, uninterruptible = uninterruptible) |
156 |
{ result.fulfill_result(Exn.capture(body)) } |
|
61559
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
157 |
|
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
158 |
def peek: Option[Exn.Result[A]] = result.peek |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
159 |
def join_result: Exn.Result[A] = result.join_result |
313eca3fa847
more direct task future implementation, with proper cancel operation;
wenzelm
parents:
61556
diff
changeset
|
160 |
def cancel: Unit = thread.interrupt |
34240
3274571e45c1
added Future.promise -- essentially a single-assignment variable with signalling, using the Future interface;
wenzelm
parents:
34217
diff
changeset
|
161 |
} |