Hot questions for Using EventBus in scala

Question:

Are there any good tutorials/explanations on how to use the the event bus in akka? I've read through the Akka doc but I find it difficult to understand how to use the event bus


Answer:

Not sure if there are or aren't any good tutorials out there, but I can give you a quick example of a possible user case where using the event stream might be helpful. At a high level though, the event stream is a good mechanism for meeting pub/sub type requirements that your app might have. Let's say that you have a use case where you update a user's balance in your system. The balance is accessed often, so you have decided to cache it for better performance. When a balance is updated, you also want to check and see if the user crosses a threshold with their balance and if so, email them. You don't want either the cache drop or the balance threshold check to be tied directly into the main balance update call as they might be heavy weight and slow down the user's response. You could model that particular set of requirements like so:

//Message and event classes
case class UpdateAccountBalance(userId:Long, amount:Long)
case class BalanceUpdated(userId:Long)

//Actor that performs account updates
class AccountManager extends Actor{
  val dao = new AccountManagerDao

  def receive = {
    case UpdateAccountBalance(userId, amount) =>
      val res = for(result <- dao.updateBalance(userId, amount)) yield{
        context.system.eventStream.publish(BalanceUpdated(userId))
        result                
      }

      sender ! res
  }
}

//Actor that manages a cache of account balance data
class AccountCacher extends Actor{
  val cache = new AccountCache

  override def preStart = {
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
  }

  def receive = {
    case BalanceUpdated(userId) =>
      cache.remove(userId)
  }
}

//Actor that checks balance after an update to warn of low balance
class LowBalanceChecker extends Actor{
  val dao = new LowBalanceDao

  override def preStart = {
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
  }

  def receive = {
    case BalanceUpdated(userId) =>
      for{
        balance <- dao.getBalance(userId)
        theshold <- dao.getBalanceThreshold(userId)
        if (balance < threshold)
      }{
        sendBalanceEmail(userId, balance)
      }
  }
}

In this example, the AccountCacher and LowBalanceChecker actors both subscribe into the eventStream by class type for the BalanceUpdated event. If this event is event published to the stream, it will be received by both of these actor instances. Then, in the AccountManager, when the balance update succeeds, it raises a BalanceUpdated event for the user. When this happens, in parallel, that message is delivered to the mailboxes for both the AccountCacher and the LowBalanceChecker resulting in the balance into being dropped from the cache and the account threshold checked and possibly an email being sent.

Now, you could have just put direct tell (!) calls into the AccountManager to communicate directly with these other two actors, but one could argue that might be too closely coupling these two "side effects" of a balance update, and that those types of details don't necessarily belong in the AccountManager. If you have a condition that might result in some additional things (checks, updates, etc...) that need to happen purely as side effects (not part of the core business flow itself), then the event stream might be a good way to decouple the event being raised and who might need to react to that event.

Question:

I have my own Actor that subscribe to Akka logs using configuration

loggers = ["path.MyActor"]

MyActor Receiving the log events, but I would like to stop receiving them in run time. Is there way to do that? (etc by getting the event bus of Akka logs)


Answer:

You could call system.eventStream to unsubscribe MyActor. For example:

import akka.event.Logging._

case object UnsubscribeFromLogging

class MyActor extends Actor {
  def receive = {
    case InitializeLogger(_)                        => sender() ! LoggerInitialized
    case Error(cause, logSource, logClass, message) => // ...
    case Warning(logSource, logClass, message)      => // ...
    case Info(logSource, logClass, message)         => // ...
    case Debug(logSource, logClass, message)        => // ...

    case UnsubscribeFromLogging => context.system.eventStream.unsubscribe(self)
  }
}

The above example uses a custom UnsubscribeFromLogging message:

val myActor: ActorRef = ??? // reference to MyActor
myActor ! UnsubscribeFromLogging

You could pass the reference to MyActor to the unsubscribe method from another actor:

val myActor: ActorRef = ??? // reference to MyActor
context.system.eventStream.unsubscribe(myActor)

Either way, you need a reference to MyActor. You can obtain this by determining MyActor's path (for example, have MyActor print self.path) and passing this path to context.actorSelection.