Lots of DeadTransactionException in akka-stm.
[ERROR] [07/11/2011 09:27:30.507] [akka:event-driven:dispatcher:global-11] [LocalActorRef] akka.transactor.Coordinated@3b1f1f5
org.multiverse.api.exceptions.DeadTransactionException: Cant commit already aborted transaction %s
at org.multiverse.stms.AbstractTransaction.commit(AbstractTransaction.java:374)
at org.multiverse.templates.TransactionBoilerplate.executeWithTransaction(TransactionBoilerplate.java:280)
at org.multiverse.templates.TransactionBoilerplate.executeChecked(TransactionBoilerplate.java:218)
at org.multiverse.templates.TransactionBoilerplate.execute(TransactionBoilerplate.java:169)
at akka.transactor.Coordinated.atomic(Coordinated.scala:130)
at akka.transactor.Coordinated.atomic(Coordinated.scala:152)
at akka.transactor.test.UntypedCoordinatedCounter.onReceive(UntypedCoordinatedCounter.java:45)
at akka.actor.UntypedActor$$anonfun$receive$1.apply(UntypedActor.scala:125)
at akka.actor.UntypedActor$$anonfun$receive$1.apply(UntypedActor.scala:124)
at akka.actor.Actor$class.apply(Actor.scala:712)
at akka.actor.UntypedActor.apply(UntypedActor.scala:57)
at akka.actor.LocalActorRef.invoke(ActorRef.scala:694)
at akka.dispatch.MessageInvocation.invoke(MessageHandling.scala:25)
at akka.dispatch.ExecutableMailbox$class.processMailbox(Dispatcher.scala:225)
at akka.dispatch.Dispatcher$$anon$4.processMailbox(Dispatcher.scala:119)
at akka.dispatch.ExecutableMailbox$class.run(Dispatcher.scala:197)
at akka.dispatch.Dispatcher$$anon$4.run(Dispatcher.scala:119)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)
at akka.dispatch.MonitorableThread.run(ThreadPoolBuilder.scala:177)
These exceptions do not lead to a failure (which also is strange) but indicate that something fishy is going on. The cause of the problem is that the transaction already is aborted, but is committed in the akka-atomic block that deals with the coordinated transaction.
org.multiverse.api.exceptions.DeadTransactionException: Cant commit already aborted transaction %s
at org.multiverse.stms.AbstractTransaction.commit(AbstractTransaction.java:374)
at org.multiverse.templates.TransactionBoilerplate.executeWithTransaction(TransactionBoilerplate.java:280)
at org.multiverse.templates.TransactionBoilerplate.executeChecked(TransactionBoilerplate.java:218)
at org.multiverse.templates.TransactionBoilerplate.execute(TransactionBoilerplate.java:169)
at akka.transactor.Coordinated.atomic(Coordinated.scala:130)
at akka.transactor.Coordinated.atomic(Coordinated.scala:152)
at akka.transactor.test.UntypedCoordinatedCounter.onReceive(UntypedCoordinatedCounter.java:45)
at akka.actor.UntypedActor$$anonfun$receive$1.apply(UntypedActor.scala:125)
at akka.actor.UntypedActor$$anonfun$receive$1.apply(UntypedActor.scala:124)
at akka.actor.Actor$class.apply(Actor.scala:712)
at akka.actor.UntypedActor.apply(UntypedActor.scala:57)
at akka.actor.LocalActorRef.invoke(ActorRef.scala:694)
at akka.dispatch.MessageInvocation.invoke(MessageHandling.scala:25)
at akka.dispatch.ExecutableMailbox$class.processMailbox(Dispatcher.scala:225)
at akka.dispatch.Dispatcher$$anon$4.processMailbox(Dispatcher.scala:119)
at akka.dispatch.ExecutableMailbox$class.run(Dispatcher.scala:197)
at akka.dispatch.Dispatcher$$anon$4.run(Dispatcher.scala:119)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)
at akka.dispatch.MonitorableThread.run(ThreadPoolBuilder.scala:177)
These exceptions do not lead to a failure (which also is strange) but indicate that something fishy is going on. The cause of the problem is that the transaction already is aborted, but is committed in the akka-atomic block that deals with the coordinated transaction.
Leave a comment
If you look at the code:
def atomic[T](factory: TransactionFactory)(body: ⇒ T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
val result = body
val timeout = factory.config.timeout
barrier.tryJoinCommit(mtx, timeout.length, timeout.unit)
result
}
})
}
Than the result of the barrier.tryJoinCommit is ignored. true indicates that it joined successfully in time, false if there was a timeout.
If there was a timeout, the transaction will have been aborted. But when this transaction is asked to commit in the tx-template, then it will complain about an already aborted transaction.
So fix
def atomic[T](factory: TransactionFactory)(body: ⇒ T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
val result = body
val timeout = factory.config.timeout
if(!barrier.tryJoinCommit(mtx, timeout.length, timeout.unit))throw new SomeAkkaSpecificException
result
}
})
}
def atomic[T](factory: TransactionFactory)(body: ⇒ T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
val result = body
val timeout = factory.config.timeout
barrier.tryJoinCommit(mtx, timeout.length, timeout.unit)
result
}
})
}
Than the result of the barrier.tryJoinCommit is ignored. true indicates that it joined successfully in time, false if there was a timeout.
If there was a timeout, the transaction will have been aborted. But when this transaction is asked to commit in the tx-template, then it will complain about an already aborted transaction.
So fix
def atomic[T](factory: TransactionFactory)(body: ⇒ T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
val result = body
val timeout = factory.config.timeout
if(!barrier.tryJoinCommit(mtx, timeout.length, timeout.unit))throw new SomeAkkaSpecificException
result
}
})
}
Will add the akka-specific exception when working on #909.
(In revision:518fffa9aebc267d0a7119f6f199b4c2225ff65c) Early abort coordinated transactions on exception (fixes #909). Rethrow akka-specific exceptions (fixes #1011).
Branch: release-1.2
Branch: release-1.2
(In revision:ba4a9b3890cdb5f194ee93da47c2bf3b79c190e5) Early abort coordinated transactions on exception (fixes #909). Rethrow akka-specific exceptions (fixes #1011).
Branch: release-1.1.4
Branch: release-1.1.4
(In revision:6fc34fe0d5e4ca69dcbe5b225c701168b21e7e0d) Early abort coordinated transactions on exception (fixes #909). Rethrow akka-specific exceptions (fixes #1011).
Branch: master
Branch: master
on 2011-07-14 15:44 *
By pveentjer
Assigned to changed from Peter Vlugter to pveentjer
Status changed from Fixed to Accepted
The code is almost correct, but not completely.
It contains the following improvement:
- it deals with the Multiverse control flow which should not abort the barrier.
- it throws a timeout exception if there is a timout with a usable message
- included the original exceptions as cause to make it easier to debug.
This is the code:
def atomic[T](factory: TransactionFactory)(body: ⇒ T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
val result = try {
body
} catch {
case e: ControlFlowError ⇒ throw e
case e: Exception ⇒ {
barrier.abort()
throw e
}
}
val timeout = factory.config.timeout
try{
if (!barrier.tryJoinCommit(mtx, timeout.length, timeout.unit)) {
val config: TransactionConfiguration = mtx.getConfiguration
throw new ActorTimeoutException(
"Failed to complete transaction [" + config.getFamilyName + "] " +
"with a maxium timeout of [" + config.getTimeoutNs + "] ns")
}
result
}
})
}
I currently have checked it in under master:
https://github.com/jboner/akka/commit/518fffa9aebc267d0a7119f6f199b4c2225ff65c
Need to merge it back to 1.1.4 and 1.2
It contains the following improvement:
- it deals with the Multiverse control flow which should not abort the barrier.
- it throws a timeout exception if there is a timout with a usable message
- included the original exceptions as cause to make it easier to debug.
This is the code:
def atomic[T](factory: TransactionFactory)(body: ⇒ T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
val result = try {
body
} catch {
case e: ControlFlowError ⇒ throw e
case e: Exception ⇒ {
barrier.abort()
throw e
}
}
val timeout = factory.config.timeout
try{
if (!barrier.tryJoinCommit(mtx, timeout.length, timeout.unit)) {
val config: TransactionConfiguration = mtx.getConfiguration
throw new ActorTimeoutException(
"Failed to complete transaction [" + config.getFamilyName + "] " +
"with a maxium timeout of [" + config.getTimeoutNs + "] ns")
}
result
}
})
}
I currently have checked it in under master:
https://github.com/jboner/akka/commit/518fffa9aebc267d0a7119f6f199b4c2225ff65c
Need to merge it back to 1.1.4 and 1.2
on 2011-07-15 12:54 *
By pveentjer
Assigned to changed from pveentjer to viktorklang
Status changed from Accepted to Test
on 2011-07-15 13:34 *
By viktorklang
Assigned to changed from viktorklang to pveentjer
Status changed from Test to Fixed
(In revision:6fc34fe0d5e4ca69dcbe5b225c701168b21e7e0d) Early abort coordinated transactions on exception (fixes #909). Rethrow akka-specific exceptions (fixes #1011).
Branch: wip-derekjw
Branch: wip-derekjw
Updating tickets (#967, #974, #975, #976, #980, #981, #989, #990, #992, #993, #994, #999, #1000, #1004, #1008, #1011, #1015, #1018, #1022, #1023, #1024, #1025, #1027, #1028, #1029, #1030, #1032, #1033, #1036, #1047, #1053, #1062, #1067, #1068, #1069, #1072, #1075, #1078, #1082, #1102, #1107, #1110, #1111, #1115, #1116, #1121, #1122, #1123, #1124)