Modeling a Skitter User

Stambecco has many of the attributes of an Actor-based system.  However, Stambecco differs from Erlang/Scala style Actors in a number of ways:

We are going to model a Worker that handles all the functionality of a Twitter-clone User.

First, we define the UserId... the identifier that can be used to obtain a proxy to the User Worker from any node in a Stambecco system:

/**
 * Define an Id for the User class of worker
 */
final case class UserId(id: Long) extends WorkerId {
  type MyType = UserId // what is my type
  type MsgType = UserMsg // what type of messages does the User Worker accept
  type IdType = Long // what is the type of the User id

  /**
   * Generate a manifest for the UserId type
   */
  def myType: Manifest[MyType] = manifest[UserId]

  /**
   * Based on the UserId, calculate a unique name for a file
   */
  def uniqueNameForFile: String = "user_info_"+id
}

Next, we define the messages that our User responds to:

/**
 * All messages sent to a User worker must implement this trait
 */
sealed trait UserMsg extends QBase

/**
 * Sent to a User telling the user to follow another user
 */
final case class Follow(who: UserId) extends UserMsg

There are more messages, but you get the idea.  Next, we declare our UserWorker class:

/**
 * A UserWorker models a user in Skitter.  The UserWorker needs to
 * know its UserId and needs to be able to calculate a JDBC connection
 * vendor.
 */
class UserWorker(id: UserId,
                 calcFunc: UserId => ConnectionManager) extends
WorkerImpl[UserId, UserMsg](id, calcFunc) {
  // cache the user's timeline
  private var timeline: Array[Long] = Array()

  // cache the user's most recent updates
  private var updates: Array[Long] = Array()

The private variables are where we cache the user's timeline and their most recent updates.

Each Worker in Stambecco has its own private relational data store.  Yep... every worker has its own personal database.  The database is accessed via JDBC.  By default, in test mode, the database is in-memory so there are no file turds created during tests.  In single-JVM mode (Stambecco can run in distributed mode), each database is represented by a single file on the local filesystem.

Each Worker has a lifecycle of created, unfrozen, frozen, and destroyed:

For our UserWorker, we create a schema in our local RDBMS when we are created:

  /**
   * When the worker is created, populate the correct tables into
   * the UserWorker instance-private relational database
   */
  override protected[this] def workerCreated() {
    super.workerCreated()
    
    def nullLogFunc(a: => AnyRef) {}

    Schemifier.schemify(true, nullLogFunc _, Relationship, Message)
  }

And when we are unfrozen, we cache our timeline and most recent updates:

  /**
   * When the UserWorker is unfrozen (materialized into an address
   * space), read the data to be cached
   */
  override protected[this] def workerUnfrozen() {
    timeline = Message.findAll(OrderBy(Message.postTime, Descending),
                   MaxRows(20)).map(_.messageId.is).toArray
    updates = Message.findAll(By(Message.postedBy, id.id),
                  OrderBy(Message.postTime, Descending),
                  MaxRows(20)).map(_.messageId.is).toArray
  }

Stambecco's Workers differ from the Actor paradigm in that you declare a separate method for each message rather than having a single pattern match for all the possible messages.  Here's how we define the method that handles the Follow message:

  /**
   * handle the Follow message
   */
  def doFollow(msg: Follow) {
    // if we're not already following, create a row in our local
    // store indicating that we're following
    if (Relationship.find(By(Relationship.who, msg.who.id),
                          By(Relationship.isFollowing, true)).isEmpty) {
      Relationship.create.who(msg.who.id).isFollowing(true).save
    }

    // tell the other guy that we're following     for (other <- WorkerMgr.find(msg.who)) other ! AddMeAsAFollower(id)   }

Stambecco supports the Ask/Answer paradigm such that Workers and programs outside the Worker system can send an asynchronous message to another Worker and have a function executed when the Worker answers the question.  All messages that ask a question must extend the MsgWithResponse[T] trait and explcitly define the type of answer.  To get the timeline from a UserWorker, we send:

final case class GetTimeline(first: Int, count: Int) extends UserMsg with MsgWithResponse[ListOfMessages]

And the implemented method looks like:

  def doGetTimeline(msg: GetTimeline): ListOfMessages = getMessages(false, msg.first, msg.count)

We can also explicitly take the function as a second parameter, so with:

final case class GetUpdates(first: Int, count: Int) extends UserMsg with MsgWithResponse[ListOfMessages]

Our method looks like:

  def doGetUpdates(msg: GetUpdates, answer: Box[ListOfMessages] => Unit) =
    answer(Full(getMessages(true, msg.first, msg.count)))

Finally, we have to define the thing WorkerMaster that is capable of vending instances of a given Worker:

object UserMaster extends LocalWorkMaster[UserId] {

  def createInstance(id: UserId) = new UserWorker(id, connectionManagerForId)

  def canCreate(id: UserId) = UserTable.find(By(UserTable.id, id.id)).isDefined

  def numberInMemory = 500

  def idType = manifest[UserId]
}

We define how to create an instance of a UserWorker.  We define rules for whether an instance can be created.  We define the maximum number of UserWorker instances that can be in memory at once.  If that number is exceeded, the least recently used UserWorker is frozen and removed from memory.