A Case Study in Akka

26 Apr 2015

A Case Study in Akka I/O

//
Comments0
/

This case study walks through a real-world example of how you can boost the performance of legacy components by leveraging the powerful Akka I/O package. Akka is a core part of Lightbend’s “Reactive Platform” for building highly concurrent, distributed, and resilient systems.

In the dark ages of the Internet web servers would typically consume a processor thread with each incoming request. Although simple to work with, software components that used this mechanism suffered high resource consumption, choked throughput and very limited scale.

What can you do if you’re faced with a legacy system component that utilises this old thread-per-request model, yet your client/boss wants your service that depends on this to be “web scale”. Enter Akka I/O! With Akka I/O you have access to a fully event-driven, non-blocking and asynchronous API for TCP protocols. You can wrap your old “thread-per-request” components inside elegant Actors and leverage an extremely lightweight event pipeline straight from the socket. Instead of 100s of requests/second with unpredictable and hungry resource usage, we can attain 1000s with consistent and efficient resource usage.

The Challenge

Recently we developed a protocol bridge to support a client that needed to consume data from several building and data centre automation products. One of the libraries that supports one of the binary network protocols is quite old and pre-dates the introduction of modern concurrency features in Java. It follows the thread-per-request pattern and uses an in-memory map of device state to service requests. This pattern is resource intensive under high load and requires locking of the in-memory map to ensure that updates to devices (and device data) are available when servicing requests.

The Upgrade

What we’d like to do is to replace the library with something that allows us to handle a larger number of requests and to manage the device state more elegantly. To do this, we’ll use Akka I/O to provide the Actor System and the interface to the TCP connections.

The Protocol

The protocol that we’ll support is quite simple. It will have a single request (ReadRequest) and a single reply (ReadResponse). Each TCP interface can have multiple devices associated with it. Each device on the interface will be identified by a unique device id.

Devices as Actors

In this system, devices are stateful entities with well-defined boundaries. The state of a device can be fully encapsulated within it. Actors are a pattern that (among other things) let us manage state in multi-threaded environments, so having one actor per device is a great fit. The overhead of making a device an actor is small ~300 bytes, so we can have lots of devices without pushing the limits of the actor system.

What’s an Actor anyway?

If you are coming from an OO background, actors will not seem to dissimilar to classical object. An actor models behaviour and may contain mutable data. The main difference between an actor and an object is how communication happens. Rather than using object methods, actors are message driven. Each actor has an associated queue of messages.

This queue decouples the caller from the actor and has a couple of important implications:

  1. Communication between a caller and the actor is asynchronous – the actor may not process and respond a message for a length of time.
  2. Since communication is mediated by the message queue (and the actor system at large), communication with remote actors much simpler.

The actor processes messages one at a time. From a developer’s point of view, this means that the state of an actor can be mutated in response to a message without regard to concurrency.

A Simple Actor

Before we start building our custom actors, lets check out what a simple actor looks like, and how we’d talk to it.

This is the core of the PingActor from the akka scala seed:

class PingActor extends Actor with ActorLogging {

  val pongActor = context.actorOf(PongActor.props, "pongActor")

  var counter = 0

  def receive = {
  	case Initialize =>
	    log.info("In PingActor - starting ping-pong")
  	  pongActor ! PingMessage("ping")
  	case PongActor.PongMessage(text) =>
  	  log.info("In PingActor - received message: {}", text)
  	  counter += 1
  	  if (counter == 3) context.system.shutdown()
  	  else sender() ! PingMessage("ping")
  }
}

And here is how we’d talk to it outside an actor system:

val system = ActorSystem("MyActorSystem")
val pingActor = system.actorOf(PingActor.props, "pingActor")
pingActor ! PingActor.Initialize

The important points to note are:

  1. PingActor extends Actor;
  2. The actor defines the message loop by overriding receive and defining what will happen when messages are received;
  3. The actor manages some mutable state (the counter);
  4. The actor creates other actors; and,
  5. The actor messages other actors (including the sender of a received message).
  6. Communication with the actor instance is via messages (pingActor ! ...).

One core concept not demonstrated by the PingActor is explicitly changing the message loop. Akka supports this switching out the message loop using become / unbecome. This allows developers to model behavioural changes where the handling of a message is dependent on the state of the actor.

Also, note that this example uses the fire-and-forget pattern. If you are operating outside of an actor and need a response, you may need to use the ask pattern instead.

The Device Registry

To begin with, we’ll need something that will create and manage devices. Since we have one set of devices per interface, it seems natural for there to be one device registry per interface as well, and that registry will be solely responsible for managing those devices.

Here is the core of the device registry:

class DeviceRegistry extends Actor with ActorLogging {

  import DeviceRegistryProtocol._

  var devices = Map[Int, ActorRef]()

  override def receive: Receive = {
    case RegisterDevice =>
      ...
      val device = context.system.actorOf(Props[Device])
      ...
    case UnregisterDevice(id) =>
      ...
      context.stop(device)
      ...
    case FindDevice(id) =>
      ...
  }
}

Like the PingActor, we have mutable data – the map of ids to devices. Also like the PingActor, the registry creates actors – in response to the RegisterDevice message.

Since the registry will be responsible for creating and destroying devices, it will become the supervisor for those device actors. Supervision is an important concept in Akka, and determines how failures are handled. The default supervision strategy is to restart a failed actor and discard the message that caused the failure.

We’ll use this default strategy for the device actors created by the registry. For us, this means that the state of the restarted device actors will be lost. Any values written to the devices registry will have been forgotten. In the final implementation we’ll need a way to persist and recover this state between restarts of the application anyway, so for now we’ll assume that functionality and would also be used to recover the state of the actor as it starts.

Devices

Now that we have a registry, we need to define the actor class for the devices that will be created, found and destroyed by the registry.

Devices in our system may be hidden if they have not been updated with sensor data recently, so the actor is built as a finite state machine with two states; Visible and Hidden.

This could have been written using become / unbecome, but where we have a combination of state driven behaviour and mutable data we tend to build upon the FSM trait instead. The FSM trait is particularly powerful when actions need to be performed on state transitions (as we’ll see when building the TCP request handling actor), or there is some default behaviour common to multiple message loops.

When the device is hidden, it has the following behaviour:

  1. It will not respond to requests for values.
  2. It will return a device status with the visible flag set to false.
  3. It will make the device visible on request.
when(Hidden) {
  case Event(ReadValue(_), _) =>
    // Device is hidden, do not respond.
    stay()
  case Event(ReadDeviceStatus, _) =>
    sender() ! DeviceStatus(false)
    stay()
  case Event(ShowDevice, _) =>
    sender() ! DeviceStatus(false)
    goto(Visible)
}

As you can see, making the device visible is modeled as a state transition in response to the ShowDevice message (goto(Visible)).

Sending response messages back to the caller is done using another fire-and-forget message to the sender().

When the device is visible, it has the following behaviour:

  1. It will respond to requests for values.
  2. It will return a device status with the visible flag set to true.
  3.  It will hide the device on request.
when(Visible) {
  case Event(ReadValue(register), RegisterData(registers)) =>
    sender() ! Value(register, registers.get(register))
    stay()
  case Event(ReadDeviceStatus, RegisterData(registers)) =>
    sender() ! DeviceStatus(true)
    stay()
  case Event(HideDevice, _) =>
    sender() ! DeviceStatus(false)
    goto(Hidden)
}

You may have been wondering what the second value of the Event is – in the hidden state it was ignored. That the second value holds the data currently associated with the actor. In our case, it is the register information (represented as a Map[Int, Int]). Finally, we need to define the event loop that handles events not handled by a particular state.

By default, the actor:

  1. Updates values on request.
  2. Unsets values on request.
whenUnhandled {
  case Event(SetValue(register, value), RegisterData(registers)) =>
    val updatedRegisters = registers + (register -> value)
    sender() ! Value(register, registers.get(updatedRegisters))
    goto(Visible) using RegisterData(updatedRegisters)
  case Event(UnsetValue(register), RegisterData(registers)) =>
    val updatedRegisters = registers - register
    sender() ! Value(register, updatedRegisters.get(register))
    goto(Visible) using RegisterData(updatedRegisters)

Note that unlike the state transitions in Visible and Hidden, when we transition in response to these events, we change the data associated with the state machine by updated (or removing) the value of a register (e.g. goto(Visible) using RegisterData(updatedRegisters)).

Handling Requests

Now that we have the rudiments of a registry and devices, we need to wire them up to the TCP interface.

akka-io provides a way of interacting with the network that is asynchronous and actors ‘all the way down’. Using the minimal example as a staring point, we can create our per-interface server:

class Server(registry: ActorRef) extends Actor with ActorLogging {

  IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 12000))

  override def receive = {
    case c@Connected(remote, local) =>
      val connection = sender()
      val handler = context.actorOf(RequestHandler.props(registry, connection))
      connection ! Register(handler)
    ...
  }

  ...
}

From a developer’s point of view, this is pretty simple. We grab the akka-io extension for TCP and use it to listen on a hardcoded address. The message loop simply spins up a new actor for each connection and then informs the connection that the new actor will be the handler for that connection. At this point, the per-connection actor takes over. This actor has to interact with the registry to find the device actor and then the device to query the value, accumulating details along the way. For this reason, we implement it as a state machine with state for:

  1. Reading data from the connection.
  2. Finding the device using the registry.
  3. Reading the value from the device.

The receiving event loop looks like:

when(Receiving) {
  case Event(Received(data), _) => data match {
    val request = decode(data)
    goto(WaitingForDevice) using Working(r)
  }
}

And leverages an onTransition block to message the registry:

onTransition {
  case Receiving -> WaitingForDevice =>
    nextStateData match {
      case Working(request) => registry ! DeviceRegistryProtocol.FindDevice(request.deviceId)
      case _ => log.info(s"Didn't have working data on transition to waiting")
    }
    ...
}

The waiting for device state then looks like:

when(WaitingForDevice, stateTimeout = 5 seconds) {
  case Event(DeviceRegistryProtocol.FoundDevice(None), Working(request)) =>
    goto(Receiving) using Done(request, ReadResponse.fromRequest(request))

  case Event(DeviceRegistryProtocol.FoundDevice(Some(device)), Working(request)) =>
    goto(WaitingForValue) using WithDevice(request, device)
}

Note that we use a state timeout to handle the case where the device registry does not respond within a (long) time. The timeout is handled in the default event loop:

whenUnhandled {
  ...
  case Event(StateTimeout, _) =>
    goto(Receiving) using Empty
}

Once all the data has been accumulated from the registry and device, the state machine transitions back to the Receive state, waiting for more requests, and the response is written back to the connection on that transition.

onTransition {
  ...
  case _ -> Receiving =>
    nextStateData match {
      case Done(request, response) => connection ! Write(response.encoded)
      case _ => log.info(s"Didn't have done data on transition to waiting")
    }
}

Wrapping Up

After this we’ve built a compact system that leverages the asynchronous features of akka-io (and Akka in general). Additionally, the mutable data of devices is encapsulated with individual actors and can be updated from one part of the system and read from another without worrying about concurrency issues.

Source code for this post is here.

Leave a Reply