Complete futures asynchronously when replying through a channel
Since "self.channel ! ReplyMessage" invokes the callbacks on a future directly, they will be invoked while the replying actor's receive() is still on the stack. Deadlocks can result if the callbacks attempt to use the replying actor, because the replying actor can't receive more messages until its current receive() returns.
My take is that the API contract of ! should specify asynchronous execution so that replying to a future is truly identical to replying to an actor, from the perspective of the one doing the replying. Invoking callbacks synchronously creates some of the same deadlock/concurrency problems that actors otherwise tend to solve. Also, sync vs. async execution are not semantically the same, so a single method should be documented to do one or the other.
This came up in a blog post I wrote, which has a bit more context:
http://blog.ometer.com/2011/07/24/callbacks-synchronous-and-asynchronous/
fwiw, I don't mean to imply that completeWithResult and the rest of the API on Future should all be async, the issue is really that the ! method on Channel specifically has different semantics for actors and for futures. When working with a Future directly, the semantics are clear (always synchronous), which is fine, I think.
My take is that the API contract of ! should specify asynchronous execution so that replying to a future is truly identical to replying to an actor, from the perspective of the one doing the replying. Invoking callbacks synchronously creates some of the same deadlock/concurrency problems that actors otherwise tend to solve. Also, sync vs. async execution are not semantically the same, so a single method should be documented to do one or the other.
This came up in a blog post I wrote, which has a bit more context:
http://blog.ometer.com/2011/07/24/callbacks-synchronous-and-asynchronous/
fwiw, I don't mean to imply that completeWithResult and the rest of the API on Future should all be async, the issue is really that the ! method on Channel specifically has different semantics for actors and for futures. When working with a Future directly, the semantics are clear (always synchronous), which is fine, I think.
Leave a comment
on 2011-07-26 12:47 *
By viktorklang
Perhaps we could move the following section into notifyCompleted: https://github.com/jboner/akka/blob/wip-derekjw/akka-actor/src/main/scala/akka/dispatch/Future.scala#L917
And then we just need to execute the following segment in a dispatcher: https://github.com/jboner/akka/blob/wip-derekjw/akka-actor/src/main/scala/akka/dispatch/Future.scala#L934
So the logic would be:
Adding of onComplete-callback:
1) If not already completed, add to listeners
2) If completed, and callbacksPendingExecution isEmpty, fork off the execution to a dispatcher and set callbacksPendingExecution prior to executing the callbacks (as before)
3) If completed, and callbacksPendingExecution isDefined, add to the callbacksPendingExecution
Completing a Promise:
1) If not already completed, fork off the execution of the listeners to a dispatcher, and set the callbacksPendingExecution prior to starting to execute the callbacks.
Would that work?
And then we just need to execute the following segment in a dispatcher: https://github.com/jboner/akka/blob/wip-derekjw/akka-actor/src/main/scala/akka/dispatch/Future.scala#L934
So the logic would be:
Adding of onComplete-callback:
1) If not already completed, add to listeners
2) If completed, and callbacksPendingExecution isEmpty, fork off the execution to a dispatcher and set callbacksPendingExecution prior to executing the callbacks (as before)
3) If completed, and callbacksPendingExecution isDefined, add to the callbacksPendingExecution
Completing a Promise:
1) If not already completed, fork off the execution of the listeners to a dispatcher, and set the callbacksPendingExecution prior to starting to execute the callbacks.
Would that work?
For 2.0 I have been optimizing futures by returning a KeptPromise for certain cases, like calling 'map' on a completed future. This also allows those methods to not touch the dispatcher when used with a KeptPromise.
Would it be acceptable if these changes only affected when onComplete is actually used, or should all uses of map/flatMap/etc always run async?
For example:
If we want all callbacks to be run async then I can scrap these newer implementations.
As an additional note, my original implementation using Scalaz ran all callbacks async for consistency, but it was much slower. There may be a way of batching the execution of futures and callbacks within the dispatcher itself so callback intensive operations can still execute fast (like sequence and traverse for example). If so, the new implementations I wrote may not be necessary.
Would it be acceptable if these changes only affected when onComplete is actually used, or should all uses of map/flatMap/etc always run async?
For example:
final def map[A](f: T ⇒ A)(implicit timeout: Timeout): Future[A] = value match {
// future is complete with result, execute now and return a KeptPromise
case Some(Right(r)) ⇒
new KeptPromise[A](try {
Right(f(r))
} catch {
case e: Exception ⇒
EventHandler.error(e, this, e.getMessage)
Left(e)
})
// future is complete with an Exception, return as is to avoid new allocations
case Some(_) ⇒
this.asInstanceOf[Future[A]]
// future is not complete, register an onComplete callback (old behavior)
case None ⇒
val future = new DefaultPromise[A](timeout)
onComplete { self ⇒
future complete {
self.value.get match {
case Right(r) ⇒
try {
Right(f(r))
} catch {
case e: Exception ⇒
EventHandler.error(e, this, e.getMessage)
Left(e)
}
case v ⇒ v.asInstanceOf[Either[Throwable, A]]
}
}
}
future
}
If we want all callbacks to be run async then I can scrap these newer implementations.
As an additional note, my original implementation using Scalaz ran all callbacks async for consistency, but it was much slower. There may be a way of batching the execution of futures and callbacks within the dispatcher itself so callback intensive operations can still execute fast (like sequence and traverse for example). If so, the new implementations I wrote may not be necessary.
on 2011-07-27 06:31 *
By Derek Williams
I have a partial fix in branch 'derekjw-1054', targeting version 2.0:
https://github.com/jboner/akka/compare/master...derekjw-1054
I haven't removed the optimizations I described in my previous comment, and I haven't fully implemented this change into dataflow as it causes a few tests to fail due to the added non-determinism. I want to get some feedback on my changes so far first to make sure this is the direction we should go.
https://github.com/jboner/akka/compare/master...derekjw-1054
I haven't removed the optimizations I described in my previous comment, and I haven't fully implemented this change into dataflow as it causes a few tests to fail due to the added non-determinism. I want to get some feedback on my changes so far first to make sure this is the direction we should go.
on 2011-07-27 10:28 *
By Havoc Pennington
some .02
- Could it simply be documented that complete has to be be called from somewhere that's already dispatched and doesn't hold locks, rather than adding a dispatch inside complete()? I think it would be reasonable for Channel's ! for example to add a dispatch before it completes a future. but say in the implementation of map(), inside onComplete on the "inner" future, there's no need to dispatch again since you could assume onComplete was already called from a dispatch.
- Re: the optimization of already-completed futures to run sync, here is a thought: future.foreach({ v => }) almost certainly is used in the same way as onComplete, with a side-effecting callback... in fact it's more convenient than onComplete in most cases... so it almost has to work like onComplete. And then having foreach and map work differently seems a little wrong. So that would imply that onComplete, foreach, and map all have to be consistent (always defer, or only defer when not complete).
- While always deferring is more predictable for app developers, if it's too inefficient, it's too inefficient. Maybe Future is a special case where it represents a special not-sure-if-it-needs-to-be-async state and app developers can be trained to expect that. Not so bad.
on 2011-07-27 10:33 *
By Havoc Pennington
I guess the problem with making onComplete or foreach ambiguous on sync vs. async is contagion, i.e. other callback APIs are likely to be implemented in terms of them, and would then also be ambiguous.
on 2011-07-27 19:15 *
By Havoc Pennington
another thought, the solution to complete() in GSimpleAsyncResult (linked in my blog post) was to have two versions, one which dispatches and one which doesn't. The non-dispatching one is used when you already know you've been dispatched, for example inside another onComplete callback you'd already know that. So it could be complete() and dispatchComplete(), or complete() and synchronousComplete(), or something along those lines.
on 2011-07-28 06:40 *
By Derek Williams
If onComplete doesn't run async there is still an inconsistency with something like this:
In this example the code is run within the dispatcher but we can't be certain if 'foreach' will be run async or not. If it isn't, we will end up deadlocking trying to aquire the lock. If we then remove 'aquireLock()' and 'releaseLock()' from within the foreach to solve this deadlock, it may end up running async and then we don't have the needed lock. It is safer to just make it always run async.
Similar examples could probably also be found for 'complete'.
In any case, I tried comparing the performance of current master, my fix for this ticket, and my fix with optimizations removed, and saw very minimal difference in all 3. Due to this I think it would be much safer to remove the optimizations (which I have done in the 'derek-1054' branch).
So now in almost all cases callbacks are run async, it doesn't matter if it runs during 'complete' or 'onComplete'. I think the only exception at the moment is KeptPromise (AlreadyCompletedFuture) runs sync during 'onComplete' (which I should probably change as well).
I should be able to backport these changes to 1.2 without problem.
Future {
aquireLock()
val value = getSomeValue()
otherFuture foreach { x =>
aquireLock()
x.doSomething(value)
releaseLock()
}
releaseLock()
}
In this example the code is run within the dispatcher but we can't be certain if 'foreach' will be run async or not. If it isn't, we will end up deadlocking trying to aquire the lock. If we then remove 'aquireLock()' and 'releaseLock()' from within the foreach to solve this deadlock, it may end up running async and then we don't have the needed lock. It is safer to just make it always run async.
Similar examples could probably also be found for 'complete'.
In any case, I tried comparing the performance of current master, my fix for this ticket, and my fix with optimizations removed, and saw very minimal difference in all 3. Due to this I think it would be much safer to remove the optimizations (which I have done in the 'derek-1054' branch).
So now in almost all cases callbacks are run async, it doesn't matter if it runs during 'complete' or 'onComplete'. I think the only exception at the moment is KeptPromise (AlreadyCompletedFuture) runs sync during 'onComplete' (which I should probably change as well).
I should be able to backport these changes to 1.2 without problem.
I'm happy with the changes. I just need to get a code review and I'll backport it to 1.2. I probably have some docs to update about the behavior as well...
Should now be fixed in master and 1.2