Problem with getting CommitBarrierOpenException when using Transaction.Global
org.multiverse.commitbarriers.CommitBarrierOpenException: Can't call countDown on already committed CountDownCommitBarrier at org.multiverse.commitbarriers.CountDownCommitBarrier.incParties(CountDownCommitBarrier.java:159) at org.multiverse.commitbarriers.CountDownCommitBarrier.incParties(CountDownCommitBarrier.java:123) at se.scalablesolutions.akka.actor.Actor$class.joinTransaction(Actor.scala:1124) at se.scalablesolutions.akka.actor.Actor$class.postMessageToMailboxAndCreateFutureResultWithTimeout(Actor.scala:1088) at sample.event.Runner$$anonfun$3$$anon$1.postMessageToMailboxAndCreateFutureResultWithTimeout(Event.scala:155) at se.scalablesolutions.akka.actor.ActorRef.$bang$bang$bang(Actor.scala:455) at se.scalablesolutions.akka.actor.ActorRef.$bang$bang(Actor.scala:427) at sample.event.Runner$.run(Event.scala:159) at .<init>(<console>:8) at .<clinit>(<console>) at RequestResult$.<init>(<console>:4) at RequestResult$.<clinit>(<console>) at RequestResult$scala_repl_result(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at scala.tools.nsc.Interpreter$Request$$anonfun$loadAndRun$1$$anonfun$apply$13.apply(Interpreter.scala:827) at scala.tools.nsc.Interpreter$Request$$anonfun$loadAndRun$1$$anonfun$apply$13.apply(Interpreter.scala:827) at scala.util.control.Exception$Catch.apply(Exception.scala:79) at scala.tools.nsc.Interpreter$Request$$anonfun$loadAndRun$1.apply(Interpreter.scala:826) at scala.tools.nsc.Interpreter$Request$$anonfun$loadAndRun$1.apply(Interpreter.scala:826) at scala.util.control.Exception$Catch.apply(Exception.scala:79) at scala.tools.nsc.Interpreter$Request.loadAndRun(Interpreter.scala:825) at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:467) at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:457) at scala.tools.nsc.InterpreterLoop.interpretStartingWith(InterpreterLoop.scala:391) at scala.tools.nsc.InterpreterLoop.command(InterpreterLoop.scala:367) at scala.tools.nsc.InterpreterLoop.processLine$1(InterpreterLoop.scala:249) at scala.tools.nsc.InterpreterLoop.repl(InterpreterLoop.scala:267) at scala.tools.nsc.InterpreterLoop.main(InterpreterLoop.scala...
Code:
package sample package event import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, ActorRef, RemoteActor} import se.scalablesolutions.akka.persistence.common.PersistentVector import se.scalablesolutions.akka.persistence.redis.RedisStorage import se.scalablesolutions.akka.stm.Transaction.Global._ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.config.OneForOneStrategy import se.scalablesolutions.akka.util.Logging import Actor._ import java.util.Date /** * domain abstractions */ sealed trait InstrumentType case object EQ extends InstrumentType case object FI extends InstrumentType case class Instrument(isin: String, name: String, insType: InstrumentType) case class Trade(id: String, reference: String, ins: Instrument, quantity: Int, unitPrice: Int, taxFee: Int, netAmount: Int) /** * domain events */ sealed trait Event { val recorded = new Date } trait TradeProcessingEvent extends Event { val trade: Trade } case class EnrichTrade(val trade: Trade) extends TradeProcessingEvent case class ValueTrade(val trade: Trade) extends TradeProcessingEvent case class GetEventLog(val trade: Trade) case class EventLog(val events: List[String]) /** * a domain service that responds to events */ class TradingService extends Actor { def receive = { case EnrichTrade(trade) => println("Event EnrichTrade for trade = " + trade) reply(trade.copy(taxFee = 100)) case ValueTrade(trade) => println("Event ValueTrade for trade = " + trade) reply(trade.copy(netAmount = trade.quantity * trade.unitPrice + trade.taxFee)) case _ => throw new Exception("unknown event") } } /** * a persistent storage for events */ trait EventStorage extends Actor /** * redis based persistence */ class RedisEventStorage extends EventStorage { lifeCycle = Some(LifeCycle(Permanent)) val EVENT_LOG = "akka.event.log" private var eventLog = atomic { RedisStorage.getVector(EVENT_LOG) } log.info("Redis-based event storage is starting up...") def receive = { case msg @ EnrichTrade(trade) => atomic { eventLog + msg.toString.getBytes("UTF-8") } case msg @ ValueTrade(trade) => atomic { eventLog + msg.toString.getBytes("UTF-8") } case GetEventLog(trade) => val eventList = atomic { eventLog.map(bytes => new String(bytes, "UTF-8")).toList } reply(EventLog(eventList)) } override def postRestart(reason: Throwable) = eventLog = RedisStorage.getVector(EVENT_LOG) } trait TradingServiceFactory { this: Actor => val storage = spawnLink[RedisEventStorage] val tradingService = spawnLink[TradingService] } /** * the event processor */ trait EventProcessor extends Actor { faultHandler = Some(OneForOneStrategy(5, 5000)) trapExit = List(classOf[Exception]) val tradingService: ActorRef val storage: ActorRef def receive = { case t: TradeProcessingEvent => val trd: Option[Trade] = tradingService !! t println("For event : " + t + " trade is " + trd) storage ! t reply(trd.get) case ev @ GetEventLog(trade) => storage forward ev } } object Runner { def run = { val proc = actorOf(new EventProcessor with TradingServiceFactory) proc.start val trade = Trade("1", "dg-1", Instrument("isin-1", "IBM Bond", FI), 100, 12, 0, 0) val richTrade: Option[Trade] = proc !! EnrichTrade(trade) val valuedTrade: Option[Trade] = proc !! ValueTrade(richTrade.get) val x: EventLog = (proc !! GetEventLog(trade)).getOrElse(throw new Exception("couldn't get event log")) println(x.events.mkString("\n")) } }
Leave a comment
file:aOmXauzCyr34UzeJe5cbCb: The domain specific parts
on 2010-05-23 17:51 *
By Jonas Bonér
Ok. So these are actually 2 different issues. Thanks.