1 /* Title: Pure/Concurrent/future.scala |
1 /* Title: Pure/Concurrent/future.scala |
2 Module: PIDE |
2 Module: PIDE |
3 Author: Makarius |
3 Author: Makarius |
4 |
4 |
5 Future values. |
5 Value-oriented parallelism via futures and promises in Scala -- with |
6 |
6 signatures as in Isabelle/ML. |
7 Notable differences to scala.actors.Future (as of Scala 2.7.7): |
|
8 |
|
9 * We capture/release exceptions as in the ML variant. |
|
10 |
|
11 * Future.join works for *any* actor, not just the one of the |
|
12 original fork. |
|
13 */ |
7 */ |
14 |
8 |
15 package isabelle |
9 package isabelle |
16 |
10 |
17 |
11 |
18 import scala.actors.Actor._ |
12 import scala.util.{Success, Failure} |
|
13 import scala.concurrent.{Future => Scala_Future, Promise => Scala_Promise, Await} |
|
14 import scala.concurrent.duration.Duration |
|
15 import scala.concurrent.ExecutionContext.Implicits.global |
19 |
16 |
20 |
17 |
21 object Future |
18 object Future |
22 { |
19 { |
23 def value[A](x: A): Future[A] = new Finished_Future(x) |
20 def value[A](x: A): Future[A] = new Finished_Future(x) |
24 def fork[A](body: => A): Future[A] = new Pending_Future(body) |
21 def fork[A](body: => A): Future[A] = new Pending_Future(Scala_Future[A](body)) |
25 def promise[A]: Promise[A] = new Promise_Future |
22 def promise[A]: Promise[A] = new Promise_Future[A](Scala_Promise[A]) |
26 } |
23 } |
27 |
24 |
28 abstract class Future[A] |
25 trait Future[A] |
29 { |
26 { |
30 def peek: Option[Exn.Result[A]] |
27 def peek: Option[Exn.Result[A]] |
31 def is_finished: Boolean = peek.isDefined |
28 def is_finished: Boolean = peek.isDefined |
32 def get_finished: A = { require(is_finished); Exn.release(peek.get) } |
29 def get_finished: A = { require(is_finished); Exn.release(peek.get) } |
33 def join: A |
30 def join: A |
39 case Some(Exn.Exn(_)) => "<failed>" |
36 case Some(Exn.Exn(_)) => "<failed>" |
40 case Some(Exn.Res(x)) => x.toString |
37 case Some(Exn.Res(x)) => x.toString |
41 } |
38 } |
42 } |
39 } |
43 |
40 |
44 abstract class Promise[A] extends Future[A] |
41 trait Promise[A] extends Future[A] |
45 { |
42 { |
46 def fulfill_result(res: Exn.Result[A]): Unit |
43 def fulfill_result(res: Exn.Result[A]): Unit |
47 def fulfill(x: A) { fulfill_result(Exn.Res(x)) } |
44 def fulfill(x: A) { fulfill_result(Exn.Res(x)) } |
48 } |
45 } |
|
46 |
49 |
47 |
50 private class Finished_Future[A](x: A) extends Future[A] |
48 private class Finished_Future[A](x: A) extends Future[A] |
51 { |
49 { |
52 val peek: Option[Exn.Result[A]] = Some(Exn.Res(x)) |
50 val peek: Option[Exn.Result[A]] = Some(Exn.Res(x)) |
53 val join: A = x |
51 val join: A = x |
54 } |
52 } |
55 |
53 |
56 private class Pending_Future[A](body: => A) extends Future[A] |
54 private class Pending_Future[A](future: Scala_Future[A]) extends Future[A] |
57 { |
55 { |
58 @volatile private var result: Option[Exn.Result[A]] = None |
56 def peek: Option[Exn.Result[A]] = |
|
57 future.value match { |
|
58 case Some(Success(x)) => Some(Exn.Res(x)) |
|
59 case Some(Failure(e)) => Some(Exn.Exn(e)) |
|
60 case None => None |
|
61 } |
|
62 override def is_finished: Boolean = future.isCompleted |
59 |
63 |
60 private val evaluator = actor { |
64 def join: A = Await.result(future, Duration.Inf) |
61 result = Some(Exn.capture(body)) |
|
62 loop { react { case _ => reply(result.get) } } |
|
63 } |
|
64 |
65 |
65 def peek: Option[Exn.Result[A]] = result |
66 override def map[B](f: A => B): Future[B] = new Pending_Future[B](future.map(f)) |
66 |
|
67 def join: A = |
|
68 Exn.release { |
|
69 peek match { |
|
70 case Some(res) => res |
|
71 case None => (evaluator !? (())).asInstanceOf[Exn.Result[A]] |
|
72 } |
|
73 } |
|
74 } |
67 } |
75 |
68 |
76 private class Promise_Future[A] extends Promise[A] |
69 private class Promise_Future[A](promise: Scala_Promise[A]) |
|
70 extends Pending_Future(promise.future) with Promise[A] |
77 { |
71 { |
78 @volatile private var result: Option[Exn.Result[A]] = None |
72 override def is_finished: Boolean = promise.isCompleted |
79 |
73 |
80 private case object Read |
74 override def fulfill_result(res: Exn.Result[A]): Unit = |
81 private case class Write(res: Exn.Result[A]) |
75 res match { |
82 |
76 case Exn.Res(x) => promise.success(x) |
83 private val receiver = actor { |
77 case Exn.Exn(e) => promise.failure(e) |
84 loop { |
|
85 react { |
|
86 case Read if result.isDefined => reply(result.get) |
|
87 case Write(res) => |
|
88 if (result.isDefined) reply(false) |
|
89 else { result = Some(res); reply(true) } |
|
90 } |
|
91 } |
|
92 } |
|
93 |
|
94 def peek: Option[Exn.Result[A]] = result |
|
95 |
|
96 def join: A = |
|
97 Exn.release { |
|
98 result match { |
|
99 case Some(res) => res |
|
100 case None => (receiver !? Read).asInstanceOf[Exn.Result[A]] |
|
101 } |
|
102 } |
78 } |
103 |
79 |
104 def fulfill_result(res: Exn.Result[A]) { |
80 override def fulfill(x: A): Unit = promise.success(x) |
105 receiver !? Write(res) match { |
|
106 case false => error("Duplicate fulfillment of promise") |
|
107 case _ => |
|
108 } |
|
109 } |
|
110 } |
81 } |
111 |
82 |