Improvements to ExecutorBasedEventDrivenDispatcher
By Irmo Manie on May 28, 2010 @ 05:40PM *
Finally here's my thought then on the dispatcher.
Short explanation :
- dispatch is internally queued so no new runnable is created anymore for every dispatch call <- theoretically should be a bit faster too
- threadpool is fixed size (same as num workers) - threadpool with unbouded size queue will not create new threads anyway
- workers empty internal queue, try to aquire lock, so still actor is only processed by one thread at a time
Finally here's my thought then on the dispatcher.
Short explanation :
- dispatch is internally queued so no new runnable is created anymore for every dispatch call <- theoretically should be a bit faster too
- threadpool is fixed size (same as num workers) - threadpool with unbouded size queue will not create new threads anyway
- workers empty internal queue, try to aquire lock, so still actor is only processed by one thread at a time
package se.scalablesolutions.akka.dispatch import se.scalablesolutions.akka.actor.ActorRef import java.util.{ArrayList, List} import jsr166x.{LinkedBlockingDeque, BlockingDeque} import java.util.concurrent.TimeUnit class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispatchers.THROUGHPUT) extends MessageDispatcher with ThreadPoolBuilder { def this(_name: String) = this(_name, Dispatchers.THROUGHPUT) // Needed for Java API usage @volatile private var active: Boolean = false @volatile private var runWorkers: Boolean = false // todo make default poolsize configrable somewhere val poolSize: Int = 10 val name: String = "event-driven:executor:dispatcher:" + _name init val mailQueue: BlockingDeque[ActorRef] = new LinkedBlockingDeque[ActorRef] val workerFutures: List[java.util.concurrent.Future[Int]] = new ArrayList[java.util.concurrent.Future[Int]] def dispatch(invocation: MessageInvocation) = dispatch(invocation.receiver) def dispatch(receiver: ActorRef): Unit = if (active) { // just add the dispatching to an internal queue here mailQueue.add(receiver) } else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started") class MailboxWorker(workerNum: Int) extends Runnable { def run = { while(runWorkers) { try { val receiver: ActorRef = mailQueue.poll(1, TimeUnit.SECONDS) if (receiver != null) { val lock = receiver.dispatcherLock val mailbox = receiver.mailbox if (lock.tryLock) { try { var count: Int = 0 var messageInvocation: MessageInvocation = mailbox.poll while(messageInvocation != null) { messageInvocation.invoke count += 1 if (count < throughput) { messageInvocation = mailbox.poll } else { messageInvocation = null } } } finally { lock.unlock if (!mailbox.isEmpty) { // not done yet, so reschedule mailQueue.add(receiver) } } } else { // no lock received, reschedule mailQueue.add(receiver) } } } catch { case te: InterruptedException => log.ifTrace("timeout on poll") case e: Exception => log.error("error processing", e) } } } } def start = if (!active) { log.debug("Starting ExecutorBasedEventDrivenDispatcher [%s]", name) log.debug("Throughput for %s = %d", name, throughput) active = true runWorkers = true for (i <- 0 until poolSize) { executor.submit(new MailboxWorker(i)) } } def shutdown = if (active) { log.debug("Shutting down ExecutorBasedEventDrivenDispatcher [%s]", name) runWorkers = false executor.shutdownNow active = false references.clear } def usesActorMailbox = true def ensureNotActive: Unit = if (active) throw new IllegalStateException( "Can't build a new thread pool for a dispatcher that is already up and running") private[akka] def init = { // make fixed size threadpool val queueWithUnboundedCapacity: ThreadPoolBuilder = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity queueWithUnboundedCapacity.setCorePoolSize(poolSize) queueWithUnboundedCapacity.setMaxPoolSize(poolSize) queueWithUnboundedCapacity.buildThreadPool } }
Leave a comment
on 2010-09-01 15:42 *
By viktorklang
Yo, what's the progress on this?
on 2010-09-02 18:04 *
By Jonas Bonér
Irmo,
Are you on this one?
Fix or close?
Are you on this one?
Fix or close?
on 2010-09-03 11:34 *
By Irmo Manie
A lot has already changed in the meanwhile making this ticket not needed anymore imo? Close or fix as you wish :)
in branch ticket_250