Version 5, last updated by dpp at April 13, 2010 UTC
Modeling a Skitter User
Stambecco has many of the attributes of an Actor-based system. However, Stambecco differs from Erlang/Scala style Actors in a number of ways:
- Workers are never referenced directly, but always through a handle.
- A Worker has a unique identifier and a Worker proxy can be obtained via the unique identifer (this is similar to Erlang's PIDs).
- Workers exist in finite, well typed classes (e.g., User, Venue, etc.)
- Workers of a given class are vended by a WorkMaster that is associated with that class of Worker.
- Workers of a given class only accept messages associated with their class (messages are strongly typed).
- Ask/Answer messages include the type of the response, so responses types are compiler enforced.
- All messages and WorkerId must implement the QBase trait. This flags compiler checks to insure JSON serializability and immutability of all parameters (all the way down).
We are going to model a Worker that handles all the functionality of a Twitter-clone User.
First, we define the UserId... the identifier that can be used to obtain a proxy to the User Worker from any node in a Stambecco system:
/**
* Define an Id for the User class of worker
*/
final case class UserId(id: Long) extends WorkerId {
type MyType = UserId // what is my type
type MsgType = UserMsg // what type of messages does the User Worker accept
type IdType = Long // what is the type of the User id
/**
* Generate a manifest for the UserId type
*/
def myType: Manifest[MyType] = manifest[UserId]
/**
* Based on the UserId, calculate a unique name for a file
*/
def uniqueNameForFile: String = "user_info_"+id
}
Next, we define the messages that our User responds to:
/**
* All messages sent to a User worker must implement this trait
*/
sealed trait UserMsg extends QBase
/**
* Sent to a User telling the user to follow another user
*/
final case class Follow(who: UserId) extends UserMsg
There are more messages, but you get the idea. Next, we declare our UserWorker class:
/**
* A UserWorker models a user in Skitter. The UserWorker needs to
* know its UserId and needs to be able to calculate a JDBC connection
* vendor.
*/
class UserWorker(id: UserId,
calcFunc: UserId => ConnectionManager) extends
WorkerImpl[UserId, UserMsg](id, calcFunc) {
// cache the user's timeline
private var timeline: Array[Long] = Array()
// cache the user's most recent updates
private var updates: Array[Long] = Array()
The private variables are where we cache the user's timeline and their most recent updates.
Each Worker in Stambecco has its own private relational data store. Yep... every worker has its own personal database. The database is accessed via JDBC. By default, in test mode, the database is in-memory so there are no file turds created during tests. In single-JVM mode (Stambecco can run in distributed mode), each database is represented by a single file on the local filesystem.
Each Worker has a lifecycle of created, unfrozen, frozen, and destroyed:
- Created: this is the first time in a system when a Worker could be available (e.g., someone has signed up with your service and you create a record that they exist). A Worker will receive a call to its workerCreated() method when the Worker is first created. A Worker will only receive a call to workerCreated() once in its life.
- Unfrozen: when a Worker is materialized into a running program, it will receive a call to its workerUnfrozen() method. This is a chance for the Worker to compute its instance variable.
- Frozen: before a worker is removed from the address space of a running program, it will receive a workerWillFreeze() method invocation. If the Worker needs to persist state or release/notify external resources that it is being frozen, this is the time to do it. Note that a Worker cannot be guaranteed that this method will be called. If a process with active Workers is abrutely terminated, no attempt is made to explicitly freeze workers.
- Destroyed: the workerWillDestroy() method will be invoked on a Worker once in its lifecycle: just before all traces of the Worker are removed from the entire system.
For our UserWorker, we create a schema in our local RDBMS when we are created:
/**
* When the worker is created, populate the correct tables into
* the UserWorker instance-private relational database
*/
override protected[this] def workerCreated() {
super.workerCreated()
def nullLogFunc(a: => AnyRef) {}
Schemifier.schemify(true, nullLogFunc _, Relationship, Message)
}
And when we are unfrozen, we cache our timeline and most recent updates:
/**
* When the UserWorker is unfrozen (materialized into an address
* space), read the data to be cached
*/
override protected[this] def workerUnfrozen() {
timeline = Message.findAll(OrderBy(Message.postTime, Descending),
MaxRows(20)).map(_.messageId.is).toArray
updates = Message.findAll(By(Message.postedBy, id.id),
OrderBy(Message.postTime, Descending),
MaxRows(20)).map(_.messageId.is).toArray
}
Stambecco's Workers differ from the Actor paradigm in that you declare a separate method for each message rather than having a single pattern match for all the possible messages. Here's how we define the method that handles the Follow message:
/**
* handle the Follow message
*/
def doFollow(msg: Follow) {
// if we're not already following, create a row in our local
// store indicating that we're following
if (Relationship.find(By(Relationship.who, msg.who.id),
By(Relationship.isFollowing, true)).isEmpty) {
Relationship.create.who(msg.who.id).isFollowing(true).save
}
// tell the other guy that we're following
for (other <- WorkerMgr.find(msg.who)) other ! AddMeAsAFollower(id)
}
Stambecco supports the Ask/Answer paradigm such that Workers and programs outside the Worker system can send an asynchronous message to another Worker and have a function executed when the Worker answers the question. All messages that ask a question must extend the MsgWithResponse[T] trait and explcitly define the type of answer. To get the timeline from a UserWorker, we send:
final case class GetTimeline(first: Int, count: Int) extends UserMsg with MsgWithResponse[ListOfMessages]
And the implemented method looks like:
def doGetTimeline(msg: GetTimeline): ListOfMessages = getMessages(false, msg.first, msg.count)
We can also explicitly take the function as a second parameter, so with:
final case class GetUpdates(first: Int, count: Int) extends UserMsg with MsgWithResponse[ListOfMessages]
Our method looks like:
def doGetUpdates(msg: GetUpdates, answer: Box[ListOfMessages] => Unit) =
answer(Full(getMessages(true, msg.first, msg.count)))
Finally, we have to define the thing WorkerMaster that is capable of vending instances of a given Worker:
object UserMaster extends LocalWorkMaster[UserId] {
def createInstance(id: UserId) = new UserWorker(id, connectionManagerForId)
def canCreate(id: UserId) = UserTable.find(By(UserTable.id, id.id)).isDefined
def numberInMemory = 500
def idType = manifest[UserId]
}
We define how to create an instance of a UserWorker. We define rules for whether an instance can be created. We define the maximum number of UserWorker instances that can be in memory at once. If that number is exceeded, the least recently used UserWorker is frozen and removed from memory.