LocalActorRef.invoke consumes interrupted status
The Invoke method consumes the interrupt status:
try {
cancelReceiveTimeout() // FIXME: leave this here?
actor(messageHandle.message)
currentMessage = null // reset current message after successful invocation
} catch {
//todo: review pveentjer: the interrupted status is eaten here, it should be restored in the Thread.
case e: InterruptedException ⇒
currentMessage = null // received message while actor is shutting down, ignore
case e ⇒
handleExceptionInDispatch(e, messageHandle.message)
}
The interrupt flag should be restored by calling Thread.currentThread().interrupt(). Normally this would be a no brainer, but I don't know what the impact is on the Dispatchers since they are the guardians of the threads used.
try {
cancelReceiveTimeout() // FIXME: leave this here?
actor(messageHandle.message)
currentMessage = null // reset current message after successful invocation
} catch {
//todo: review pveentjer: the interrupted status is eaten here, it should be restored in the Thread.
case e: InterruptedException ⇒
currentMessage = null // received message while actor is shutting down, ignore
case e ⇒
handleExceptionInDispatch(e, messageHandle.message)
}
The interrupt flag should be restored by calling Thread.currentThread().interrupt(). Normally this would be a no brainer, but I don't know what the impact is on the Dispatchers since they are the guardians of the threads used.
Leave a comment
on 2011-06-29 14:22 *
By viktorklang
Don't you mean: Thread.currentThread.interrupted ???
on 2011-06-29 18:24 *
By Derek Williams
This ticket prompted me to actually go look into how to properly handle an InterruptedException, and this should probably also be fixed for the exception handling within Future as well, and there might be exception handling code in the dispatcher that might need this too.
And due to looking this info up, I'm pretty sure it is the 'interrupt()' method that will reset the interrupted status. Calling 'interrupted()' will return and clear the interrupted status.
And due to looking this info up, I'm pretty sure it is the 'interrupt()' method that will reset the interrupted status. Calling 'interrupted()' will return and clear the interrupted status.
on 2011-06-29 18:40 *
By viktorklang
But the current Thread is already interrupted, since it gets an InterruptedException, so all we want to do here is clear it?
on 2011-06-29 19:32 *
By Derek Williams
The interrupted status would have already been cleared when whatever method originally discovered the interrupted status. We should be setting the interrupted status again so it can be handled by whatever is interested, probably the threadpool? Also, it seems like we shouldn't be capturing the InterruptedException within a Future as it may cause undesirable effects if it is rethrown in a different thread. Perhaps wrap it in a different exception?
This is just from some brief googling on my phone so I could be a little off base. I will try to look into it more at home. I have been meaning to learn best practices for exception handling anyways, it's one of those things I would have learned if I programmed Java I'm sure.
This is just from some brief googling on my phone so I could be a little off base. I will try to look into it more at home. I have been meaning to learn best practices for exception handling anyways, it's one of those things I would have learned if I programmed Java I'm sure.
I still don't understand why we would want to propagate the interrupt, we're already out of the message-processing.
Derek is right. "Java Concurrency in Practice" explains how to deal with the interrupt. Either handle it or restore it.
Often interrupts are used by threadpool to notify thread that they should get the hell back home. Consuming the interrupt somewhere could leave a thread longer from home than desired.
In this case, since the actor already is shutting down, it is very likely that the thread that executes the actor at that moment, is going to get home very soon. So difficult to say if it really is a problem.. but I would just restore it.
Often interrupts are used by threadpool to notify thread that they should get the hell back home. Consuming the interrupt somewhere could leave a thread longer from home than desired.
In this case, since the actor already is shutting down, it is very likely that the thread that executes the actor at that moment, is going to get home very soon. So difficult to say if it really is a problem.. but I would just restore it.
on 2011-07-14 09:22 *
By viktorklang
I think also an even more interesting question is why we special-case InterruptedExceptions and don't treat them as any actor failure (supervision etc).
on 2011-07-14 09:47 *
By viktorklang
I added some logic to treat InterruptedExceptions in the same way as normal exceptions in terms of supervision etc, then set the interrupted-status and rethrew the interruptedexception, but the threadpool doesn't seem to reset the interrupted-flag so I don't think restoring the interrupted-status is something we should do at all. We should just make sure that the thread is freed from it's duties. IMHO.
But I am open to be convinced on otherwise, preferrably with code demonstrating why it's "the right way"
But I am open to be convinced on otherwise, preferrably with code demonstrating why it's "the right way"
on 2011-07-14 09:49 *
By viktorklang
My patch, for the brave, it fails the BalancingDispatcherSpec:
diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala
index dd7c6a5..a04a75b 100644
--- a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala
@@ -15,6 +15,8 @@ import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
import akka.util.{ Duration, Switch }
import org.multiverse.api.latches.StandardLatch
import akka.actor.{ ActorKilledException, PoisonPill, ActorRef, Actor }
+import org.omg.PortableInterceptor.Interceptor
+import java.lang.InterruptedException
object ActorModelSpec {
@@ -30,6 +32,7 @@ object ActorModelSpec {
case class Wait(time: Long) extends ActorModelMessage
case class WaitAck(time: Long, latch: CountDownLatch) extends ActorModelMessage
case object Restart extends ActorModelMessage
+ case object Interrupt extends ActorModelMessage
val Ping = "Ping"
val Pong = "Pong"
@@ -63,6 +66,7 @@ object ActorModelSpec {
case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff()
case CountDownNStop(l) ⇒ ack; l.countDown(); self.stop(); busy.switchOff()
case Restart ⇒ ack; busy.switchOff(); throw new Exception("Restart requested")
+ case Interrupt => ack; busy.switchOff(); throw new InterruptedException("Ping!")
}
}
@@ -356,6 +360,25 @@ abstract class ActorModelSpec extends JUnitSuite {
assert(each.exception.get.isInstanceOf[ActorKilledException])
a.stop()
}
+
+ @Test
+ def dispatcherShouldContinueToProcessMessagesWhenAThreadGetsInterrupted {
+ implicit val dispatcher = newInterceptedDispatcher
+ val a = newTestActor.start()
+ val f1 = a ? Reply("foo")
+ val f2 = a ? Reply("bar")
+ val f3 = a ? Interrupt
+ val f4 = a ? Reply("foo2")
+ val f5 = a ? Interrupt
+ val f6 = a ? Reply("bar2")
+
+ assert(f1.get === "foo")
+ assert(f2.get === "bar")
+ assert((intercept[InterruptedException] { f3.get }).getMessage === "Ping!")
+ assert(f4.get === "foo2")
+ assert((intercept[InterruptedException] { f5.get }).getMessage === "Ping!")
+ assert(f6.get === "bar2")
+ }
}
class DispatcherModelTest extends ActorModelSpec {
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 0874f93..76dd967 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -677,7 +677,9 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
currentMessage = null // reset current message after successful invocation
} catch {
case e: InterruptedException ⇒
- currentMessage = null // received message while actor is shutting down, ignore
+ handleExceptionInDispatch(e, messageHandle.message)
+ Thread.currentThread().interrupt() //Restore interrupt
+ throw e //Re-throw interruptions
case e ⇒
handleExceptionInDispatch(e, messageHandle.message)
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
index 4516597..1ee760a 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
@@ -160,8 +160,6 @@ class Dispatcher(
private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
registerForExecution(mbox)
- private[akka] def doneProcessingMailbox(mbox: MessageQueue with ExecutableMailbox): Unit = ()
-
protected override def cleanUpMailboxFor(actorRef: ActorRef) {
val m = getMailbox(actorRef)
if (!m.isEmpty) {
@@ -194,20 +192,10 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒
def dispatcher: Dispatcher
- final def run = {
- try {
- processMailbox()
- } catch {
- case ie: InterruptedException ⇒
- }
- finally {
- dispatcherLock.unlock()
- }
-
+ final def run = try { processMailbox() } finally {
+ dispatcherLock.unlock()
if (!self.isEmpty)
dispatcher.reRegisterForExecution(this)
-
- dispatcher.doneProcessingMailbox(this)
}
/**
diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala
index dd7c6a5..a04a75b 100644
--- a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala
@@ -15,6 +15,8 @@ import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
import akka.util.{ Duration, Switch }
import org.multiverse.api.latches.StandardLatch
import akka.actor.{ ActorKilledException, PoisonPill, ActorRef, Actor }
+import org.omg.PortableInterceptor.Interceptor
+import java.lang.InterruptedException
object ActorModelSpec {
@@ -30,6 +32,7 @@ object ActorModelSpec {
case class Wait(time: Long) extends ActorModelMessage
case class WaitAck(time: Long, latch: CountDownLatch) extends ActorModelMessage
case object Restart extends ActorModelMessage
+ case object Interrupt extends ActorModelMessage
val Ping = "Ping"
val Pong = "Pong"
@@ -63,6 +66,7 @@ object ActorModelSpec {
case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff()
case CountDownNStop(l) ⇒ ack; l.countDown(); self.stop(); busy.switchOff()
case Restart ⇒ ack; busy.switchOff(); throw new Exception("Restart requested")
+ case Interrupt => ack; busy.switchOff(); throw new InterruptedException("Ping!")
}
}
@@ -356,6 +360,25 @@ abstract class ActorModelSpec extends JUnitSuite {
assert(each.exception.get.isInstanceOf[ActorKilledException])
a.stop()
}
+
+ @Test
+ def dispatcherShouldContinueToProcessMessagesWhenAThreadGetsInterrupted {
+ implicit val dispatcher = newInterceptedDispatcher
+ val a = newTestActor.start()
+ val f1 = a ? Reply("foo")
+ val f2 = a ? Reply("bar")
+ val f3 = a ? Interrupt
+ val f4 = a ? Reply("foo2")
+ val f5 = a ? Interrupt
+ val f6 = a ? Reply("bar2")
+
+ assert(f1.get === "foo")
+ assert(f2.get === "bar")
+ assert((intercept[InterruptedException] { f3.get }).getMessage === "Ping!")
+ assert(f4.get === "foo2")
+ assert((intercept[InterruptedException] { f5.get }).getMessage === "Ping!")
+ assert(f6.get === "bar2")
+ }
}
class DispatcherModelTest extends ActorModelSpec {
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 0874f93..76dd967 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -677,7 +677,9 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
currentMessage = null // reset current message after successful invocation
} catch {
case e: InterruptedException ⇒
- currentMessage = null // received message while actor is shutting down, ignore
+ handleExceptionInDispatch(e, messageHandle.message)
+ Thread.currentThread().interrupt() //Restore interrupt
+ throw e //Re-throw interruptions
case e ⇒
handleExceptionInDispatch(e, messageHandle.message)
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
index 4516597..1ee760a 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
@@ -160,8 +160,6 @@ class Dispatcher(
private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
registerForExecution(mbox)
- private[akka] def doneProcessingMailbox(mbox: MessageQueue with ExecutableMailbox): Unit = ()
-
protected override def cleanUpMailboxFor(actorRef: ActorRef) {
val m = getMailbox(actorRef)
if (!m.isEmpty) {
@@ -194,20 +192,10 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒
def dispatcher: Dispatcher
- final def run = {
- try {
- processMailbox()
- } catch {
- case ie: InterruptedException ⇒
- }
- finally {
- dispatcherLock.unlock()
- }
-
+ final def run = try { processMailbox() } finally {
+ dispatcherLock.unlock()
if (!self.isEmpty)
dispatcher.reRegisterForExecution(this)
-
- dispatcher.doneProcessingMailbox(this)
}
/**
on 2011-07-14 18:47 *
By Derek Williams
I'm going to try and sum up what I've learned about interrupts and how it applies to Akka. Let me know if I'm making incorrect assumptions about interrupts and Akka.
InterruptedExceptions are typically thrown when a blocking method detects that the current thread is interrupted, which means Thread.interrupt() has been called. It detects interruption by periodically checking Thread.interrupted(). Due to the return type of the blocking method, it is unable to inform the caller that this thread has been interrupted (which typically means something is requesting that this thread should shutdown) so it throws an InterruptedException.
The InterruptedException is just a means of breaking out of whatever method was currently blocking execution. There typically 2 ways of dealing with an InterruptedException:
1) if you don't care that something is trying to tell you that this Thread should shut down, you can ignore it, especially if you are already trying to shutdown.
2) set Thread.interrupt() again, and do not rethrow the InterruptedException. This is usually the correct way. This will make any blocking call immediatly throw an InterruptedException again, preventing blocking methods from executing. This is usually desirable since the current thread should be trying to shutdown, not blocking. Non blocking operations should typically not be effected by this.
The reason for setting Thread.interrupt() is so any long running processes can be notified that they should stop what they are doing.
The way I see this applying to Akka is that currently, if a Dispatcher is shutdown, it's threads will continue to process messages from a mailbox until the throughput is reached. This could cause a significant delay until the Dispatcher is actually shutdown. Interruption is supposed to fix this if we were to add a Thread.interrupted() check within ExecutableMailbox.processMailbox() along with the throughput check and deadline check. Of course this does add another volatile variable access within a very critical piece of code. But this is completely optional, we don't need to respect thread interruption, it is only there to help us terminate early if we want to be able to. If we want to handle shutting down with our own method, doing things like we are now, then we can just ignore InterruptedExceptions like we (mostly) do now.
Its a trade off between faster shutdowns or faster execution.
The wrong way of handling InterruptedExceptions is letting them leak out of the current thread and throwing them again. This could cause some other well behaved code to then think it's thread was interrupted and start shutting things down. This is something I think we have to fix with our current code (futures and actors in particuler), even if it is by just wrapping it in a RuntimeException.
Additionally, within the Dispatcher.shutdown() method, we use shutdownAll to shutdown the ExecuterService. This does cause all of it's threads to be interrupted, but the processMailbox method will keep on going through messages since ActorRef.invoke catches the exception and then ignores it.
InterruptedExceptions are typically thrown when a blocking method detects that the current thread is interrupted, which means Thread.interrupt() has been called. It detects interruption by periodically checking Thread.interrupted(). Due to the return type of the blocking method, it is unable to inform the caller that this thread has been interrupted (which typically means something is requesting that this thread should shutdown) so it throws an InterruptedException.
The InterruptedException is just a means of breaking out of whatever method was currently blocking execution. There typically 2 ways of dealing with an InterruptedException:
1) if you don't care that something is trying to tell you that this Thread should shut down, you can ignore it, especially if you are already trying to shutdown.
2) set Thread.interrupt() again, and do not rethrow the InterruptedException. This is usually the correct way. This will make any blocking call immediatly throw an InterruptedException again, preventing blocking methods from executing. This is usually desirable since the current thread should be trying to shutdown, not blocking. Non blocking operations should typically not be effected by this.
The reason for setting Thread.interrupt() is so any long running processes can be notified that they should stop what they are doing.
The way I see this applying to Akka is that currently, if a Dispatcher is shutdown, it's threads will continue to process messages from a mailbox until the throughput is reached. This could cause a significant delay until the Dispatcher is actually shutdown. Interruption is supposed to fix this if we were to add a Thread.interrupted() check within ExecutableMailbox.processMailbox() along with the throughput check and deadline check. Of course this does add another volatile variable access within a very critical piece of code. But this is completely optional, we don't need to respect thread interruption, it is only there to help us terminate early if we want to be able to. If we want to handle shutting down with our own method, doing things like we are now, then we can just ignore InterruptedExceptions like we (mostly) do now.
Its a trade off between faster shutdowns or faster execution.
The wrong way of handling InterruptedExceptions is letting them leak out of the current thread and throwing them again. This could cause some other well behaved code to then think it's thread was interrupted and start shutting things down. This is something I think we have to fix with our current code (futures and actors in particuler), even if it is by just wrapping it in a RuntimeException.
Additionally, within the Dispatcher.shutdown() method, we use shutdownAll to shutdown the ExecuterService. This does cause all of it's threads to be interrupted, but the processMailbox method will keep on going through messages since ActorRef.invoke catches the exception and then ignores it.
@Viktor.
I think the right way in this case is to break the flow like any other exception. And since the invoke method is very close to the threadpool that executes the thread, so the flow will be returned to the threadpool very shortly, and if he decides to shutdown, than he can shut down.
I tried to apply your patch with git patch patch.txt, but I get a 'corrupt patch at line 6'. So I applied the patch manually and current am looking why the BalancingDispatcherSpec fails. I also increased the logging levels to WARNING to reduce sbt clutter.
I think the right way in this case is to break the flow like any other exception. And since the invoke method is very close to the threadpool that executes the thread, so the flow will be returned to the threadpool very shortly, and if he decides to shutdown, than he can shut down.
I tried to apply your patch with git patch patch.txt, but I get a 'corrupt patch at line 6'. So I applied the patch manually and current am looking why the BalancingDispatcherSpec fails. I also increased the logging levels to WARNING to reduce sbt clutter.
The cause of the other tests to fail is that the InterruptedException doesn't need to be thrown again. It is handled as exception and the interrupt status is restored.
The problem is that now the Ticket669Spec fails when it is run in combination with other tests, but not when run alone.
info] - should be able to reply on failure during preRestart *** FAILED ***
[info] java.lang.InterruptedException
[info] at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1302)
[info] at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:253)
[info] at akka.actor.supervisor.Ticket669Spec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Ticket669Spec.scala:33)
[info] at akka.actor.supervisor.Ticket669Spec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(Ticket669Spec.scala:20)
[info] at akka.actor.supervisor.Ticket669Spec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(Ticket669Spec.scala:20)
[info] at org.scalatest.WordSpec$$anon$3.apply(WordSpec.scala:2110)
[info] at org.scalatest.Suite$class.withFixture(Suite.scala:1477)
[info] at akka.actor.supervisor.Ticket669Spec.withFixture(Ticket669Spec.scala:14)
[info] at org.scalatest.WordSpec$class.runTest(WordSpec.scala:2107)
[info] at akka.actor.supervisor.Ticket669Spec.runTest(Ticket669Spec.scala:14)
[info] at org.scalatest.WordSpec$$anonfun$org$scalatest$WordSpec$$runTestsInBranch$1.apply(WordSpec.scala:2032)
[info] at org.scalatest.WordSpec$$anonfun$org$scalatest$WordSpec$$runTestsInBranch$1.apply(WordSpec.scala:2021)
[info] at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
[info] at scala.collection.immutable.List.foreach(List.scala:45)
[info] at org.scalatest.WordSpec$class.org$scalatest$WordSpec$$runTestsInBranch(WordSpec.scala:2020)
[info] at org.scalatest.WordSpec$$anonfun$org$scalatest$WordSpec$$runTestsInBranch$1.apply(WordSpec.scala:2040)
[info] at org.scalatest.WordSpec$$anonfun$org$scalatest$WordSpec$$runTestsInBranch$1.apply(WordSpec.scala:2021)
[info] at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
[info] at scala.collection.immutable.List.foreach(List.scala:45)
[info] at org.scalatest.WordSpec$class.org$scalatest$WordSpec$$runTestsInBranch(WordSpec.scala:2020)
[info] at org.scalatest.WordSpec$class.runTests(WordSpec.scala:2237)
[info] at akka.actor.supervisor.Ticket669Spec.runTests(Ticket669Spec.scala:14)
[info] at org.scalatest.Suite$class.run(Suite.scala:1772)
[info] at akka.actor.supervisor.Ticket669Spec.org$scalatest$WordSpec$$super$run(Ticket669Spec.scala:14)
[info] at org.scalatest.WordSpec$class.run(WordSpec.scala:2308)
[info] at akka.actor.supervisor.Ticket669Spec.org$scalatest$BeforeAndAfterAll$$super$run(Ticket669Spec.scala:14)
[info] at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:213)
[info] at akka.actor.supervisor.Ticket669Spec.run(Ticket669Spec.scala:14)
[info] at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:104)
[info] at sbt.TestRunner.delegateRun(TestFramework.scala:61)
[info] at sbt.TestRunner.run(TestFramework.scala:55)
[info] at sbt.TestRunner.runTest$1(TestFramework.scala:75)
[info] at sbt.TestRunner.run(TestFramework.scala:84)
[info] at sbt.TestFramework$$anonfun$6$$anonfun$apply$8$$anonfun$7$$anonfun$apply$9.apply(TestFramework.scala:182)
[info] at sbt.TestFramework$$anonfun$6$$anonfun$apply$8$$anonfun$7$$anonfun$apply$9.apply(TestFramework.scala:182)
[info] at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:194)
[info] at sbt.TestFramework$$anonfun$6$$anonfun$apply$8$$anonfun$7.apply(TestFramework.scala:182)
[info] at sbt.TestFramework$$anonfun$6$$anonfun$apply$8$$anonfun$7.apply(TestFramework.scala:182)
[info] at sbt.Tests$$anonfun$makeSerial$1$$anonfun$apply$7.apply(Tests.scala:107)
[info] at sbt.Tests$$anonfun$makeSerial$1$$anonfun$apply$7.apply(Tests.scala:107)
[info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
[info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
[info] at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
[info] at scala.collection.immutable.List.foreach(List.scala:45)
[info] at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
[info] at scala.collection.immutable.List.map(List.scala:45)
[info] at sbt.Tests$$anonfun$makeSerial$1.apply(Tests.scala:107)
[info] at sbt.Tests$$anonfun$makeSerial$1.apply(Tests.scala:107)
[info] at sbt.std.Transform$$anon$3$$anonfun$apply$2.apply(System.scala:46)
[info] at sbt.std.Transform$$anon$3$$anonfun$apply$2.apply(System.scala:46)
[info] at sbt.std.Transform$$anon$5.work(System.scala:66)
[info] at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:220)
[info] at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:220)
[info] at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:13)
[info] at sbt.Execute.work(Execute.scala:226)
[info] at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:220)
[info] at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:220)
[info] at sbt.CompletionService$$anon$1$$anon$2.call(CompletionService.scala:26)
[info] at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
[info] at java.util.concurrent.FutureTask.run(FutureTask.java:138)
[info] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
[info] at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
[info] at java.util.concurrent.FutureTask.run(FutureTask.java:138)
[info] at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
[info] at java.lang.Thread.run(Thread.java:680)
The problem is that now the Ticket669Spec fails when it is run in combination with other tests, but not when run alone.
info] - should be able to reply on failure during preRestart *** FAILED ***
[info] java.lang.InterruptedException
[info] at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1302)
[info] at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:253)
[info] at akka.actor.supervisor.Ticket669Spec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Ticket669Spec.scala:33)
[info] at akka.actor.supervisor.Ticket669Spec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(Ticket669Spec.scala:20)
[info] at akka.actor.supervisor.Ticket669Spec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(Ticket669Spec.scala:20)
[info] at org.scalatest.WordSpec$$anon$3.apply(WordSpec.scala:2110)
[info] at org.scalatest.Suite$class.withFixture(Suite.scala:1477)
[info] at akka.actor.supervisor.Ticket669Spec.withFixture(Ticket669Spec.scala:14)
[info] at org.scalatest.WordSpec$class.runTest(WordSpec.scala:2107)
[info] at akka.actor.supervisor.Ticket669Spec.runTest(Ticket669Spec.scala:14)
[info] at org.scalatest.WordSpec$$anonfun$org$scalatest$WordSpec$$runTestsInBranch$1.apply(WordSpec.scala:2032)
[info] at org.scalatest.WordSpec$$anonfun$org$scalatest$WordSpec$$runTestsInBranch$1.apply(WordSpec.scala:2021)
[info] at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
[info] at scala.collection.immutable.List.foreach(List.scala:45)
[info] at org.scalatest.WordSpec$class.org$scalatest$WordSpec$$runTestsInBranch(WordSpec.scala:2020)
[info] at org.scalatest.WordSpec$$anonfun$org$scalatest$WordSpec$$runTestsInBranch$1.apply(WordSpec.scala:2040)
[info] at org.scalatest.WordSpec$$anonfun$org$scalatest$WordSpec$$runTestsInBranch$1.apply(WordSpec.scala:2021)
[info] at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
[info] at scala.collection.immutable.List.foreach(List.scala:45)
[info] at org.scalatest.WordSpec$class.org$scalatest$WordSpec$$runTestsInBranch(WordSpec.scala:2020)
[info] at org.scalatest.WordSpec$class.runTests(WordSpec.scala:2237)
[info] at akka.actor.supervisor.Ticket669Spec.runTests(Ticket669Spec.scala:14)
[info] at org.scalatest.Suite$class.run(Suite.scala:1772)
[info] at akka.actor.supervisor.Ticket669Spec.org$scalatest$WordSpec$$super$run(Ticket669Spec.scala:14)
[info] at org.scalatest.WordSpec$class.run(WordSpec.scala:2308)
[info] at akka.actor.supervisor.Ticket669Spec.org$scalatest$BeforeAndAfterAll$$super$run(Ticket669Spec.scala:14)
[info] at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:213)
[info] at akka.actor.supervisor.Ticket669Spec.run(Ticket669Spec.scala:14)
[info] at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:104)
[info] at sbt.TestRunner.delegateRun(TestFramework.scala:61)
[info] at sbt.TestRunner.run(TestFramework.scala:55)
[info] at sbt.TestRunner.runTest$1(TestFramework.scala:75)
[info] at sbt.TestRunner.run(TestFramework.scala:84)
[info] at sbt.TestFramework$$anonfun$6$$anonfun$apply$8$$anonfun$7$$anonfun$apply$9.apply(TestFramework.scala:182)
[info] at sbt.TestFramework$$anonfun$6$$anonfun$apply$8$$anonfun$7$$anonfun$apply$9.apply(TestFramework.scala:182)
[info] at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:194)
[info] at sbt.TestFramework$$anonfun$6$$anonfun$apply$8$$anonfun$7.apply(TestFramework.scala:182)
[info] at sbt.TestFramework$$anonfun$6$$anonfun$apply$8$$anonfun$7.apply(TestFramework.scala:182)
[info] at sbt.Tests$$anonfun$makeSerial$1$$anonfun$apply$7.apply(Tests.scala:107)
[info] at sbt.Tests$$anonfun$makeSerial$1$$anonfun$apply$7.apply(Tests.scala:107)
[info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
[info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
[info] at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
[info] at scala.collection.immutable.List.foreach(List.scala:45)
[info] at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
[info] at scala.collection.immutable.List.map(List.scala:45)
[info] at sbt.Tests$$anonfun$makeSerial$1.apply(Tests.scala:107)
[info] at sbt.Tests$$anonfun$makeSerial$1.apply(Tests.scala:107)
[info] at sbt.std.Transform$$anon$3$$anonfun$apply$2.apply(System.scala:46)
[info] at sbt.std.Transform$$anon$3$$anonfun$apply$2.apply(System.scala:46)
[info] at sbt.std.Transform$$anon$5.work(System.scala:66)
[info] at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:220)
[info] at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:220)
[info] at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:13)
[info] at sbt.Execute.work(Execute.scala:226)
[info] at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:220)
[info] at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:220)
[info] at sbt.CompletionService$$anon$1$$anon$2.call(CompletionService.scala:26)
[info] at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
[info] at java.util.concurrent.FutureTask.run(FutureTask.java:138)
[info] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
[info] at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
[info] at java.util.concurrent.FutureTask.run(FutureTask.java:138)
[info] at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
[info] at java.lang.Thread.run(Thread.java:680)
on 2011-07-15 01:24 *
By pveentjer
Assigned to changed from pveentjer to viktorklang
Status changed from Accepted to Test
https://github.com/jboner/akka/commit/f93624e7e0ebb38c8ed351fbbd045ef1c4fbd207
The .conf file changes is DEBUG -> WARNING
The .conf file changes is DEBUG -> WARNING
I've tweaked the fix:
invoke now catches InterruptedException, delegates it to handleExcpetionInDispatch, then rethrows the InterruptedException to make sure that the ExecutableMailbox's throughput setting will be ignored, I then catch the InterruptedException in ExecutableMailbox.run and restore the interrupted-status before I do cleanup and resubmit the mailbox to the pool if requested.
invoke now catches InterruptedException, delegates it to handleExcpetionInDispatch, then rethrows the InterruptedException to make sure that the ExecutableMailbox's throughput setting will be ignored, I then catch the InterruptedException in ExecutableMailbox.run and restore the interrupted-status before I do cleanup and resubmit the mailbox to the pool if requested.
Updating tickets (#1129, #1132, #1138, #1149, #1153, #1154, #1157, #1161, #1163, #1168, #1170, #1171, #1172, #1176, #1177, #1178, #1180, #1199, #1217, #1218, #1219, #1237, #1238, #1239, #1244, #1246, #1249, #1250, #1251, #1252, #1256, #1301, #1302, #1306, #1395, #1396, #1409, #1418, #455, #891, #895, #912, #956, #972, #1031, #1374, #880, #1125, #1146)