1 /* Title: Pure/Concurrent/future.scala |
1 /* Title: Pure/Concurrent/future.scala |
2 Module: PIDE |
2 Module: PIDE |
3 Author: Makarius |
3 Author: Makarius |
4 |
4 |
5 Value-oriented parallel execution via futures and promises in Scala -- with |
5 Value-oriented parallel execution via futures and promises. |
6 signatures as in Isabelle/ML. |
|
7 */ |
6 */ |
8 |
7 |
9 package isabelle |
8 package isabelle |
10 |
9 |
11 |
10 |
12 import scala.util.{Success, Failure} |
11 /* futures and promises */ |
13 import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, |
|
14 Future => Scala_Future, Promise => Scala_Promise, Await} |
|
15 import scala.concurrent.duration.Duration |
|
16 |
|
17 |
12 |
18 object Future |
13 object Future |
19 { |
14 { |
20 lazy val execution_context: ExecutionContextExecutor = |
15 def value[A](x: A): Future[A] = new Value_Future(x) |
21 ExecutionContext.fromExecutorService(Standard_Thread.default_pool) |
16 def fork[A](body: => A): Future[A] = new Task_Future[A](body) |
22 |
17 def promise[A]: Promise[A] = new Promise_Future[A] |
23 def value[A](x: A): Future[A] = new Finished_Future(x) |
18 def thread[A](name: String = "", daemon: Boolean = false)(body: => A): Future[A] = |
24 |
19 new Thread_Future[A](name, daemon, body) |
25 def fork[A](body: => A): Future[A] = |
|
26 new Pending_Future(Scala_Future[A](body)(execution_context)) |
|
27 |
|
28 def promise[A]: Promise[A] = |
|
29 new Promise_Future[A](Scala_Promise[A]()) |
|
30 } |
20 } |
31 |
21 |
32 trait Future[A] |
22 trait Future[A] |
33 { |
23 { |
34 def peek: Option[Exn.Result[A]] |
24 def peek: Option[Exn.Result[A]] |
35 def is_finished: Boolean = peek.isDefined |
25 def is_finished: Boolean = peek.isDefined |
36 def get_finished: A = { require(is_finished); Exn.release(peek.get) } |
26 def get_finished: A = { require(is_finished); Exn.release(peek.get) } |
37 def join: A |
27 def join_result: Exn.Result[A] |
|
28 def join: A = Exn.release(join_result) |
38 def map[B](f: A => B): Future[B] = Future.fork { f(join) } |
29 def map[B](f: A => B): Future[B] = Future.fork { f(join) } |
|
30 def cancel: Unit |
39 |
31 |
40 override def toString: String = |
32 override def toString: String = |
41 peek match { |
33 peek match { |
42 case None => "<future>" |
34 case None => "<future>" |
43 case Some(Exn.Exn(_)) => "<failed>" |
35 case Some(Exn.Exn(_)) => "<failed>" |
45 } |
37 } |
46 } |
38 } |
47 |
39 |
48 trait Promise[A] extends Future[A] |
40 trait Promise[A] extends Future[A] |
49 { |
41 { |
50 def cancel: Unit |
|
51 def fulfill_result(res: Exn.Result[A]): Unit |
42 def fulfill_result(res: Exn.Result[A]): Unit |
52 def fulfill(x: A): Unit |
43 def fulfill(x: A): Unit |
53 } |
44 } |
54 |
45 |
55 |
46 |
56 private class Finished_Future[A](x: A) extends Future[A] |
47 /* value future */ |
|
48 |
|
49 private class Value_Future[A](x: A) extends Future[A] |
57 { |
50 { |
58 val peek: Option[Exn.Result[A]] = Some(Exn.Res(x)) |
51 val peek: Option[Exn.Result[A]] = Some(Exn.Res(x)) |
59 val join: A = x |
52 def join_result: Exn.Result[A] = peek.get |
|
53 def cancel {} |
60 } |
54 } |
61 |
55 |
62 private class Pending_Future[A](future: Scala_Future[A]) extends Future[A] |
56 |
|
57 /* task future via thread pool */ |
|
58 |
|
59 private class Task_Future[A](body: => A) extends Future[A] |
63 { |
60 { |
|
61 private sealed abstract class Status |
|
62 private case object Ready extends Status |
|
63 private case class Running(thread: Thread) extends Status |
|
64 private case object Terminated extends Status |
|
65 private case class Finished(result: Exn.Result[A]) extends Status |
|
66 |
|
67 private val status = Synchronized[Status](Ready) |
|
68 |
64 def peek: Option[Exn.Result[A]] = |
69 def peek: Option[Exn.Result[A]] = |
65 future.value match { |
70 status.value match { |
66 case Some(Success(x)) => Some(Exn.Res(x)) |
71 case Finished(result) => Some(result) |
67 case Some(Failure(e)) => Some(Exn.Exn(e)) |
72 case _ => None |
68 case None => None |
|
69 } |
73 } |
70 override def is_finished: Boolean = future.isCompleted |
|
71 |
74 |
72 def join: A = Await.result(future, Duration.Inf) |
75 private def try_run() |
73 override def map[B](f: A => B): Future[B] = |
76 { |
74 new Pending_Future[B](future.map(f)(Future.execution_context)) |
77 val do_run = |
|
78 status.change_result { |
|
79 case Ready => (true, Running(Thread.currentThread)) |
|
80 case st => (false, st) |
|
81 } |
|
82 if (do_run) { |
|
83 val result = Exn.capture(body) |
|
84 status.change(_ => Terminated) |
|
85 status.change(_ => Finished(if (Thread.interrupted) Exn.Exn(Exn.Interrupt()) else result)) |
|
86 } |
|
87 } |
|
88 private val task = Standard_Thread.submit_task { try_run() } |
|
89 |
|
90 def join_result: Exn.Result[A] = |
|
91 { |
|
92 try_run() |
|
93 status.guarded_access { |
|
94 case st @ Finished(result) => Some((result, st)) |
|
95 case _ => None |
|
96 } |
|
97 } |
|
98 |
|
99 def cancel = |
|
100 { |
|
101 status.change { |
|
102 case Ready => task.cancel(false); Finished(Exn.Exn(Exn.Interrupt())) |
|
103 case st @ Running(thread) => thread.interrupt; st |
|
104 case st => st |
|
105 } |
|
106 } |
75 } |
107 } |
76 |
108 |
77 private class Promise_Future[A](promise: Scala_Promise[A]) |
109 |
78 extends Pending_Future(promise.future) with Promise[A] |
110 /* promise future */ |
|
111 |
|
112 private class Promise_Future[A] extends Promise[A] |
79 { |
113 { |
80 override def is_finished: Boolean = promise.isCompleted |
114 private val state = Synchronized[Option[Exn.Result[A]]](None) |
|
115 def peek: Option[Exn.Result[A]] = state.value |
|
116 |
|
117 def join_result: Exn.Result[A] = |
|
118 state.guarded_access(st => if (st.isEmpty) None else Some((st.get, st))) |
|
119 |
|
120 def fulfill_result(result: Exn.Result[A]): Unit = |
|
121 state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException) |
|
122 |
|
123 def fulfill(x: A): Unit = fulfill_result(Exn.Res(x)) |
81 |
124 |
82 def cancel: Unit = |
125 def cancel: Unit = |
83 try { fulfill_result(Exn.Exn(Exn.Interrupt())) } |
126 state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st) |
84 catch { case _: IllegalStateException => } |
127 } |
85 |
128 |
86 def fulfill_result(res: Exn.Result[A]): Unit = |
129 |
87 res match { |
130 /* thread future */ |
88 case Exn.Res(x) => promise.success(x) |
131 |
89 case Exn.Exn(e) => promise.failure(e) |
132 private class Thread_Future[A](name: String, daemon: Boolean, body: => A) extends Future[A] |
90 } |
133 { |
91 def fulfill(x: A): Unit = promise.success(x) |
134 private val result = Future.promise[A] |
|
135 private val thread = |
|
136 Standard_Thread.fork(name, daemon) { result.fulfill_result(Exn.capture(body)) } |
|
137 |
|
138 def peek: Option[Exn.Result[A]] = result.peek |
|
139 def join_result: Exn.Result[A] = result.join_result |
|
140 def cancel: Unit = thread.interrupt |
92 } |
141 } |