Futures: add Promise.timeout
It is easy to hand-roll, but frequently-enough asked for, to be able to do like this:
There’s some hairy detail attached to how to clean up the scheduled timeout action in case it is not needed anymore, though. The question is whether this should actually be very easy, because it encourages a rather costly solution mechanism (but sometimes it’s needed to stay non-blocking).
val f = Future.firstCompletedOf(Seq(Future { doSomething }, Promise.timeout(10 seconds){ new MyScaryException }))
There’s some hairy detail attached to how to clean up the scheduled timeout action in case it is not needed anymore, though. The question is whether this should actually be very easy, because it encourages a rather costly solution mechanism (but sometimes it’s needed to stay non-blocking).
Leave a comment
on 2012-02-21 11:21 *
By viktorklang
Not so sure about this ticket.
diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala
index d7b4f17..6fbdc79 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{ ExecutionException, Callable, TimeoutException }
import java.util.concurrent.atomic.{ AtomicInteger, AtomicReferenceFieldUpdater }
import akka.pattern.AskTimeoutException
import util.DynamicVariable
+import akka.actor.Scheduler
object Await {
@@ -713,6 +714,13 @@ object Promise {
def successful[T](result: T)(implicit executor: ExecutionContext): Promise[T] = new KeptPromise[T](Right(result))
+
+ def withTimeout[T](at: Duration, scheduler: akka.actor.Scheduler)(implicit executor: ExecutionContext): Promise[T] = {
+ val p = Promise[T]()
+ val cancellable = scheduler.scheduleOnce(at) { p.tryComplete(Left(new TimeoutException("Scheduled timeout"))) }
+ p.onComplete(_ ⇒ cancellable.cancel)
+ p
+ }
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala
index d7b4f17..6fbdc79 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{ ExecutionException, Callable, TimeoutException }
import java.util.concurrent.atomic.{ AtomicInteger, AtomicReferenceFieldUpdater }
import akka.pattern.AskTimeoutException
import util.DynamicVariable
+import akka.actor.Scheduler
object Await {
@@ -713,6 +714,13 @@ object Promise {
- Creates an already completed Promise with the specified result
def successful[T](result: T)(implicit executor: ExecutionContext): Promise[T] = new KeptPromise[T](Right(result))
+
+ def withTimeout[T](at: Duration, scheduler: akka.actor.Scheduler)(implicit executor: ExecutionContext): Promise[T] = {
+ val p = Promise[T]()
+ val cancellable = scheduler.scheduleOnce(at) { p.tryComplete(Left(new TimeoutException("Scheduled timeout"))) }
+ p.onComplete(_ ⇒ cancellable.cancel)
+ p
+ }
}
Updating tickets (#520, #852, #857, #874, #935, #950, #1364, #1508, #1542, #1559, #1734, #1744, #1755, #1782, #1812, #1824, #1831, #1858, #1871, #1880, #1886, #1892, #1896, #1899, #1929, #1930, #1950, #1952, #1953, #1962, #1966, #1969, #1972, #1973, #1977, #1978, #1986, #1988, #1993, #1999, #2000, #2003, #2005, #2006, #2015, #2016, #2019, #2021, #2022, #2023, #2024, #2025, #2029, #2031, #2032, #2036, #2046, #2048, #2051, #2055, #2059, #2061, #2062, #2064, #2065, #2068, #2072, #2074, #2076, #2078, #2079, #2085, #2087, #2088, #2089, #2090, #2091, #2092, #2093, #2095, #2098, #2099, #2100, #2101, #2102, #2119, #2129, #2134, #2135, #2136, #2144, #2147, #2148, #2156, #2166, #2168, #2172, #2174, #2178, #2183)