Actor Model and Akka: Building Concurrent Systems
Introduction
The Actor model is a mathematical model of concurrent computation that treats actors as the fundamental units of computation. In response to a message, an actor can make local decisions, create more actors, send messages, and designate how to respond to the next message received.
Akka is Scala's premier implementation of the Actor model, providing a powerful toolkit for building concurrent, distributed, and fault-tolerant applications. This lesson will teach you to design and implement actor-based systems that can handle thousands of concurrent operations efficiently.
Understanding the Actor Model
Basic Actor Concepts
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
// Basic actor definition
class SimpleActor extends Actor {
def receive: Receive = {
case "hello" =>
println(s"Hello from ${self.path.name}")
sender() ! "Hello back!"
case msg: String =>
println(s"SimpleActor received: $msg")
case number: Int =>
println(s"SimpleActor received number: $number")
sender() ! number * 2
case _ =>
println("SimpleActor received unknown message")
}
}
// Creating an actor system
implicit val system: ActorSystem = ActorSystem("ActorExample")
implicit val ec: ExecutionContext = system.dispatcher
implicit val timeout: Timeout = Timeout(5.seconds)
// Creating actors
val simpleActor = system.actorOf(Props[SimpleActor](), "simple-actor")
// Sending messages (fire-and-forget)
simpleActor ! "hello"
simpleActor ! 42
simpleActor ! "test message"
// Request-response pattern with ask
val futureResponse: Future[Any] = simpleActor ? 10
futureResponse.foreach { response =>
println(s"Response received: $response")
}
Thread.sleep(1000) // Give actors time to process
// Stateful actor example
class CounterActor extends Actor {
private var count = 0
def receive: Receive = {
case "increment" =>
count += 1
println(s"Counter incremented to $count")
case "decrement" =>
count -= 1
println(s"Counter decremented to $count")
case "get" =>
sender() ! count
case "reset" =>
count = 0
println("Counter reset")
case value: Int =>
count = value
println(s"Counter set to $count")
}
}
val counterActor = system.actorOf(Props[CounterActor](), "counter")
counterActor ! "increment"
counterActor ! "increment"
counterActor ! "increment"
val countFuture = counterActor ? "get"
countFuture.foreach(count => println(s"Current count: $count"))
// Actor with constructor parameters
class NamedActor(name: String, initialValue: Int) extends Actor {
private var value = initialValue
def receive: Receive = {
case "status" =>
sender() ! s"$name has value $value"
case "add" =>
value += 1
case "subtract" =>
value -= 1
case newValue: Int =>
value = newValue
}
}
// Creating actors with constructor parameters
val namedActor = system.actorOf(
Props(new NamedActor("Alice", 100)),
"named-actor"
)
namedActor ! "add"
namedActor ! "add"
val statusFuture = namedActor ? "status"
statusFuture.foreach(println)
Thread.sleep(1000)
Message Types and Protocols
// Define message types using case classes and objects
object BankAccount {
// Commands
case class Deposit(amount: Double)
case class Withdraw(amount: Double)
case object GetBalance
case class Transfer(amount: Double, target: ActorRef)
// Events/Responses
case class BalanceResponse(balance: Double)
case class DepositSuccess(newBalance: Double)
case class WithdrawSuccess(newBalance: Double)
case class WithdrawFailure(reason: String)
case class TransferSuccess(amount: Double, newBalance: Double)
case class TransferFailure(reason: String)
}
class BankAccountActor(accountId: String, initialBalance: Double) extends Actor {
import BankAccount._
private var balance = initialBalance
def receive: Receive = {
case Deposit(amount) if amount > 0 =>
balance += amount
println(s"Account $accountId: Deposited $$${amount}. New balance: $$${balance}")
sender() ! DepositSuccess(balance)
case Deposit(amount) =>
println(s"Account $accountId: Invalid deposit amount: $$${amount}")
sender() ! WithdrawFailure("Invalid deposit amount")
case Withdraw(amount) if amount > 0 && amount <= balance =>
balance -= amount
println(s"Account $accountId: Withdrew $$${amount}. New balance: $$${balance}")
sender() ! WithdrawSuccess(balance)
case Withdraw(amount) if amount > balance =>
println(s"Account $accountId: Insufficient funds for withdrawal of $$${amount}")
sender() ! WithdrawFailure("Insufficient funds")
case Withdraw(amount) =>
println(s"Account $accountId: Invalid withdrawal amount: $$${amount}")
sender() ! WithdrawFailure("Invalid withdrawal amount")
case GetBalance =>
sender() ! BalanceResponse(balance)
case Transfer(amount, target) if amount > 0 && amount <= balance =>
balance -= amount
target ! Deposit(amount)
println(s"Account $accountId: Transferred $$${amount}. New balance: $$${balance}")
sender() ! TransferSuccess(amount, balance)
case Transfer(amount, _) =>
println(s"Account $accountId: Invalid transfer amount or insufficient funds: $$${amount}")
sender() ! TransferFailure("Invalid transfer amount or insufficient funds")
}
}
// Creating bank account actors
val account1 = system.actorOf(
Props(new BankAccountActor("ACC001", 1000.0)),
"account1"
)
val account2 = system.actorOf(
Props(new BankAccountActor("ACC002", 500.0)),
"account2"
)
// Testing bank account operations
import BankAccount._
account1 ! Deposit(200.0)
account1 ! Withdraw(150.0)
account1 ! Transfer(100.0, account2)
val balance1Future = account1 ? GetBalance
val balance2Future = account2 ? GetBalance
for {
balance1 <- balance1Future.mapTo[BalanceResponse]
balance2 <- balance2Future.mapTo[BalanceResponse]
} yield {
println(s"Account 1 balance: $$${balance1.balance}")
println(s"Account 2 balance: $$${balance2.balance}")
}
// Complex message protocol example
object TaskManager {
case class CreateTask(taskId: String, description: String, priority: Int)
case class AssignTask(taskId: String, assignee: String)
case class CompleteTask(taskId: String)
case class GetTaskStatus(taskId: String)
case object GetAllTasks
case class Task(id: String, description: String, priority: Int,
assignee: Option[String] = None, completed: Boolean = false)
sealed trait TaskResponse
case class TaskCreated(task: Task) extends TaskResponse
case class TaskAssigned(task: Task) extends TaskResponse
case class TaskCompleted(task: Task) extends TaskResponse
case class TaskNotFound(taskId: String) extends TaskResponse
case class TaskStatus(task: Task) extends TaskResponse
case class AllTasks(tasks: List[Task]) extends TaskResponse
}
class TaskManagerActor extends Actor {
import TaskManager._
private var tasks = Map.empty[String, Task]
def receive: Receive = {
case CreateTask(taskId, description, priority) =>
val task = Task(taskId, description, priority)
tasks += taskId -> task
println(s"Created task: $taskId")
sender() ! TaskCreated(task)
case AssignTask(taskId, assignee) =>
tasks.get(taskId) match {
case Some(task) =>
val updatedTask = task.copy(assignee = Some(assignee))
tasks += taskId -> updatedTask
println(s"Assigned task $taskId to $assignee")
sender() ! TaskAssigned(updatedTask)
case None =>
sender() ! TaskNotFound(taskId)
}
case CompleteTask(taskId) =>
tasks.get(taskId) match {
case Some(task) =>
val completedTask = task.copy(completed = true)
tasks += taskId -> completedTask
println(s"Completed task: $taskId")
sender() ! TaskCompleted(completedTask)
case None =>
sender() ! TaskNotFound(taskId)
}
case GetTaskStatus(taskId) =>
tasks.get(taskId) match {
case Some(task) => sender() ! TaskStatus(task)
case None => sender() ! TaskNotFound(taskId)
}
case GetAllTasks =>
sender() ! AllTasks(tasks.values.toList.sortBy(_.priority))
}
}
val taskManager = system.actorOf(Props[TaskManagerActor](), "task-manager")
// Test task manager
import TaskManager._
taskManager ! CreateTask("TASK001", "Implement user authentication", 1)
taskManager ! CreateTask("TASK002", "Write unit tests", 2)
taskManager ! CreateTask("TASK003", "Deploy to production", 3)
taskManager ! AssignTask("TASK001", "Alice")
taskManager ! AssignTask("TASK002", "Bob")
taskManager ! CompleteTask("TASK001")
val allTasksFuture = taskManager ? GetAllTasks
allTasksFuture.foreach {
case AllTasks(tasks) =>
println("All tasks:")
tasks.foreach { task =>
val status = if (task.completed) "COMPLETED" else "PENDING"
val assignee = task.assignee.getOrElse("UNASSIGNED")
println(s" ${task.id}: ${task.description} [Priority: ${task.priority}, Assignee: $assignee, Status: $status]")
}
}
Thread.sleep(2000)
Actor Hierarchies and Supervision
Creating Actor Hierarchies
// Parent actor that manages child actors
object WorkerManager {
case class CreateWorker(workerId: String)
case class AssignWork(workerId: String, work: String)
case class GetWorkerStatus(workerId: String)
case object GetAllWorkers
case class WorkerCreated(workerId: String)
case class WorkAssigned(workerId: String, work: String)
case class WorkerNotFound(workerId: String)
case class WorkerStatus(workerId: String, currentWork: Option[String], completedWork: Int)
case class AllWorkers(workers: List[String])
}
object Worker {
case class DoWork(work: String)
case object GetStatus
case class WorkCompleted(work: String)
case class Status(currentWork: Option[String], completedWork: Int)
}
class WorkerActor(workerId: String) extends Actor {
import Worker._
private var currentWork: Option[String] = None
private var completedWorkCount = 0
def receive: Receive = {
case DoWork(work) =>
currentWork = Some(work)
println(s"Worker $workerId starting work: $work")
// Simulate work completion after some time
import scala.concurrent.duration._
import context.dispatcher
context.system.scheduler.scheduleOnce(1.second) {
self ! WorkCompleted(work)
}
case WorkCompleted(work) =>
if (currentWork.contains(work)) {
currentWork = None
completedWorkCount += 1
println(s"Worker $workerId completed work: $work (total completed: $completedWorkCount)")
context.parent ! WorkerManager.WorkAssigned(workerId, work)
}
case GetStatus =>
sender() ! Status(currentWork, completedWorkCount)
}
}
class WorkerManagerActor extends Actor {
import WorkerManager._
private var workers = Map.empty[String, ActorRef]
def receive: Receive = {
case CreateWorker(workerId) =>
if (!workers.contains(workerId)) {
val worker = context.actorOf(Props(new WorkerActor(workerId)), s"worker-$workerId")
workers += workerId -> worker
println(s"Created worker: $workerId")
sender() ! WorkerCreated(workerId)
}
case AssignWork(workerId, work) =>
workers.get(workerId) match {
case Some(worker) =>
worker ! Worker.DoWork(work)
sender() ! WorkAssigned(workerId, work)
case None =>
sender() ! WorkerNotFound(workerId)
}
case GetWorkerStatus(workerId) =>
workers.get(workerId) match {
case Some(worker) =>
implicit val timeout: Timeout = Timeout(5.seconds)
val statusFuture = worker ? Worker.GetStatus
statusFuture.foreach {
case Worker.Status(currentWork, completedWork) =>
sender() ! WorkerStatus(workerId, currentWork, completedWork)
}
case None =>
sender() ! WorkerNotFound(workerId)
}
case GetAllWorkers =>
sender() ! AllWorkers(workers.keys.toList)
}
}
val workerManager = system.actorOf(Props[WorkerManagerActor](), "worker-manager")
// Test worker manager
import WorkerManager._
workerManager ! CreateWorker("W001")
workerManager ! CreateWorker("W002")
workerManager ! CreateWorker("W003")
workerManager ! AssignWork("W001", "Process data batch 1")
workerManager ! AssignWork("W002", "Generate report A")
workerManager ! AssignWork("W003", "Backup database")
Thread.sleep(3000) // Wait for work to complete
// Supervision strategies
import akka.actor.SupervisorStrategy._
import akka.actor.{OneForOneStrategy, AllForOneStrategy}
import scala.concurrent.duration._
class SupervisedWorker extends Actor {
var workCount = 0
def receive: Receive = {
case "work" =>
workCount += 1
println(s"Worker ${self.path.name} doing work (count: $workCount)")
case "fail" =>
println(s"Worker ${self.path.name} is failing!")
throw new RuntimeException("Worker failed!")
case "stop" =>
println(s"Worker ${self.path.name} stopping")
context.stop(self)
case msg =>
println(s"Worker ${self.path.name} received: $msg")
}
override def preStart(): Unit = {
println(s"Worker ${self.path.name} starting")
}
override def postStop(): Unit = {
println(s"Worker ${self.path.name} stopped")
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
println(s"Worker ${self.path.name} restarting due to: ${reason.getMessage}")
super.preRestart(reason, message)
}
override def postRestart(reason: Throwable): Unit = {
println(s"Worker ${self.path.name} restarted")
super.postRestart(reason)
}
}
class SupervisorActor extends Actor {
// Supervision strategy
override val supervisorStrategy = OneForOneStrategy(
maxNrOfRetries = 3,
withinTimeRange = 1.minute
) {
case _: RuntimeException => Restart
case _: IllegalArgumentException => Stop
case _ => Escalate
}
def receive: Receive = {
case "create-worker" =>
val worker = context.actorOf(Props[SupervisedWorker](), s"worker-${System.currentTimeMillis()}")
sender() ! worker
case "create-workers" =>
val workers = (1 to 3).map { i =>
context.actorOf(Props[SupervisedWorker](), s"worker-$i")
}
sender() ! workers
case msg =>
println(s"Supervisor received: $msg")
}
}
val supervisor = system.actorOf(Props[SupervisorActor](), "supervisor")
// Test supervision
val workersFuture = supervisor ? "create-workers"
workersFuture.foreach {
case workers: Seq[ActorRef] =>
workers.foreach { worker =>
worker ! "work"
worker ! "work"
}
// Cause one worker to fail
workers.head ! "fail"
Thread.sleep(1000)
// Try to send more work after failure
workers.foreach(_ ! "work")
}
Thread.sleep(3000)
Advanced Actor Patterns
Router Patterns and Actor Pools
import akka.routing.{RoundRobinPool, BalancingPool, SmallestMailboxPool}
import akka.actor.{Actor, Props}
// Work processing actor
class ProcessingActor extends Actor {
def receive: Receive = {
case work: String =>
val processingTime = scala.util.Random.nextInt(1000) + 500
println(s"${self.path.name} processing: $work (${processingTime}ms)")
Thread.sleep(processingTime)
sender() ! s"Completed: $work by ${self.path.name}"
case number: Int =>
val result = number * number
println(s"${self.path.name} computed: $number^2 = $result")
sender() ! result
}
}
// Round-robin router
val roundRobinRouter = system.actorOf(
RoundRobinPool(5).props(Props[ProcessingActor]()),
"round-robin-router"
)
// Send work to round-robin router
(1 to 10).foreach { i =>
roundRobinRouter ! s"Task-$i"
}
Thread.sleep(3000)
// Balancing pool router (work-stealing)
val balancingRouter = system.actorOf(
BalancingPool(3).props(Props[ProcessingActor]()),
"balancing-router"
)
(1 to 8).foreach { i =>
balancingRouter ! s"BalancedTask-$i"
}
Thread.sleep(3000)
// Smallest mailbox router
val smallestMailboxRouter = system.actorOf(
SmallestMailboxPool(4).props(Props[ProcessingActor]()),
"smallest-mailbox-router"
)
(1 to 12).foreach { i =>
smallestMailboxRouter ! s"MailboxTask-$i"
}
// Event Sourcing pattern
object EventStore {
sealed trait Event
case class UserCreated(userId: String, name: String, email: String) extends Event
case class UserUpdated(userId: String, name: Option[String], email: Option[String]) extends Event
case class UserDeleted(userId: String) extends Event
case class User(id: String, name: String, email: String, deleted: Boolean = false)
case class GetUser(userId: String)
case class CreateUser(userId: String, name: String, email: String)
case class UpdateUser(userId: String, name: Option[String], email: Option[String])
case class DeleteUser(userId: String)
case class UserResponse(user: Option[User])
case class UserCreatedResponse(user: User)
case class UserUpdatedResponse(user: User)
case class UserDeletedResponse(userId: String)
}
class EventSourcingActor extends Actor {
import EventStore._
private var events = List.empty[Event]
private var users = Map.empty[String, User]
def receive: Receive = {
case CreateUser(userId, name, email) =>
val event = UserCreated(userId, name, email)
applyEvent(event)
sender() ! UserCreatedResponse(users(userId))
case UpdateUser(userId, name, email) =>
if (users.contains(userId)) {
val event = UserUpdated(userId, name, email)
applyEvent(event)
sender() ! UserUpdatedResponse(users(userId))
}
case DeleteUser(userId) =>
if (users.contains(userId)) {
val event = UserDeleted(userId)
applyEvent(event)
sender() ! UserDeletedResponse(userId)
}
case GetUser(userId) =>
sender() ! UserResponse(users.get(userId).filter(!_.deleted))
}
private def applyEvent(event: Event): Unit = {
events = event :: events
event match {
case UserCreated(userId, name, email) =>
users += userId -> User(userId, name, email)
case UserUpdated(userId, name, email) =>
users.get(userId).foreach { user =>
users += userId -> user.copy(
name = name.getOrElse(user.name),
email = email.getOrElse(user.email)
)
}
case UserDeleted(userId) =>
users.get(userId).foreach { user =>
users += userId -> user.copy(deleted = true)
}
}
println(s"Applied event: $event (total events: ${events.length})")
}
}
val eventStore = system.actorOf(Props[EventSourcingActor](), "event-store")
// Test event sourcing
import EventStore._
eventStore ! CreateUser("user1", "Alice", "alice@example.com")
eventStore ! CreateUser("user2", "Bob", "bob@example.com")
eventStore ! UpdateUser("user1", Some("Alice Smith"), None)
eventStore ! DeleteUser("user2")
val user1Future = eventStore ? GetUser("user1")
val user2Future = eventStore ? GetUser("user2")
user1Future.foreach(println)
user2Future.foreach(println)
// State machine actor using become/unbecome
object LightSwitch {
case object TurnOn
case object TurnOff
case object GetState
case class State(isOn: Boolean)
}
class LightSwitchActor extends Actor {
import LightSwitch._
def receive: Receive = off
def off: Receive = {
case TurnOn =>
println("Light turned ON")
context.become(on)
case TurnOff =>
println("Light is already OFF")
case GetState =>
sender() ! State(isOn = false)
}
def on: Receive = {
case TurnOff =>
println("Light turned OFF")
context.become(off)
case TurnOn =>
println("Light is already ON")
case GetState =>
sender() ! State(isOn = true)
}
}
val lightSwitch = system.actorOf(Props[LightSwitchActor](), "light-switch")
lightSwitch ! LightSwitch.GetState
lightSwitch ! LightSwitch.TurnOn
lightSwitch ! LightSwitch.GetState
lightSwitch ! LightSwitch.TurnOn
lightSwitch ! LightSwitch.TurnOff
lightSwitch ! LightSwitch.GetState
Thread.sleep(1000)
// Producer-Consumer pattern
object ProducerConsumer {
case class Produce(item: String)
case class Consume()
case class Item(value: String)
case object NoItems
}
class ProducerActor(consumer: ActorRef) extends Actor {
import ProducerConsumer._
def receive: Receive = {
case Produce(item) =>
println(s"Producing: $item")
consumer ! Item(item)
}
}
class ConsumerActor extends Actor {
import ProducerConsumer._
private var queue = List.empty[String]
def receive: Receive = {
case Item(value) =>
queue = queue :+ value
println(s"Received item: $value (queue size: ${queue.length})")
case Consume() =>
queue match {
case head :: tail =>
queue = tail
println(s"Consumed: $head (queue size: ${queue.length})")
sender() ! Item(head)
case Nil =>
println("No items to consume")
sender() ! NoItems
}
}
}
val consumer = system.actorOf(Props[ConsumerActor](), "consumer")
val producer = system.actorOf(Props(new ProducerActor(consumer)), "producer")
// Test producer-consumer
import ProducerConsumer._
producer ! Produce("Item1")
producer ! Produce("Item2")
producer ! Produce("Item3")
Thread.sleep(500)
consumer ! Consume()
consumer ! Consume()
consumer ! Consume()
consumer ! Consume() // Should return NoItems
Thread.sleep(2000)
Building Distributed Systems
Actor Selection and Remote Actors
// Configuration for distributed actors (would normally be in application.conf)
/*
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
}
*/
// For this example, we'll simulate distributed behavior with actor selection
import akka.actor.{ActorSelection, Identify, ActorIdentity}
// Distributed service actor
class DistributedServiceActor extends Actor {
def receive: Receive = {
case "ping" =>
println(s"${self.path} received ping")
sender() ! "pong"
case msg: String =>
println(s"Service ${self.path.name} processing: $msg")
sender() ! s"Processed: $msg"
case Identify(messageId) =>
sender() ! ActorIdentity(messageId, Some(self))
}
}
// Create multiple service instances
val service1 = system.actorOf(Props[DistributedServiceActor](), "service1")
val service2 = system.actorOf(Props[DistributedServiceActor](), "service2")
val service3 = system.actorOf(Props[DistributedServiceActor](), "service3")
// Client actor that uses actor selection
class ClientActor extends Actor {
import context.dispatcher
def receive: Receive = {
case "discover-services" =>
val selection: ActorSelection = context.actorSelection("/user/service*")
selection ! Identify("discovery")
case ActorIdentity("discovery", Some(ref)) =>
println(s"Discovered service: ${ref.path}")
ref ! "Hello from client"
case ActorIdentity("discovery", None) =>
println("Service not found")
case response: String =>
println(s"Client received: $response")
}
}
val client = system.actorOf(Props[ClientActor](), "client")
client ! "discover-services"
Thread.sleep(2000)
// Cluster-aware actor (conceptual)
object ClusterMessages {
case class RegisterNode(nodeId: String, address: String)
case class UnregisterNode(nodeId: String)
case class GetClusterStatus()
case class RouteMessage(message: Any, nodeId: Option[String] = None)
case class ClusterStatus(nodes: Map[String, String])
case class MessageRouted(nodeId: String, response: Any)
}
class ClusterManagerActor extends Actor {
import ClusterMessages._
private var nodes = Map.empty[String, String]
private var nodeActors = Map.empty[String, ActorRef]
def receive: Receive = {
case RegisterNode(nodeId, address) =>
nodes += nodeId -> address
// In real implementation, would create remote actor ref
val nodeActor = context.actorOf(Props[DistributedServiceActor](), s"node-$nodeId")
nodeActors += nodeId -> nodeActor
println(s"Registered node: $nodeId at $address")
case UnregisterNode(nodeId) =>
nodes -= nodeId
nodeActors.get(nodeId).foreach(context.stop)
nodeActors -= nodeId
println(s"Unregistered node: $nodeId")
case GetClusterStatus() =>
sender() ! ClusterStatus(nodes)
case RouteMessage(message, Some(nodeId)) =>
nodeActors.get(nodeId) match {
case Some(actor) =>
implicit val timeout: Timeout = Timeout(5.seconds)
val responseFuture = actor ? message
responseFuture.foreach { response =>
sender() ! MessageRouted(nodeId, response)
}
case None =>
println(s"Node not found: $nodeId")
}
case RouteMessage(message, None) =>
// Round-robin to any available node
if (nodeActors.nonEmpty) {
val nodeId = nodeActors.keys.toSeq(scala.util.Random.nextInt(nodeActors.size))
self ! RouteMessage(message, Some(nodeId))
}
}
}
val clusterManager = system.actorOf(Props[ClusterManagerActor](), "cluster-manager")
// Test cluster management
import ClusterMessages._
clusterManager ! RegisterNode("node1", "127.0.0.1:2551")
clusterManager ! RegisterNode("node2", "127.0.0.1:2552")
clusterManager ! RegisterNode("node3", "127.0.0.1:2553")
clusterManager ! RouteMessage("task1", Some("node1"))
clusterManager ! RouteMessage("task2", None) // Round-robin
val clusterStatusFuture = clusterManager ? GetClusterStatus()
clusterStatusFuture.foreach {
case ClusterStatus(nodes) =>
println("Cluster nodes:")
nodes.foreach { case (nodeId, address) =>
println(s" $nodeId: $address")
}
}
Thread.sleep(3000)
// Graceful shutdown
println("Shutting down actor system...")
system.terminate()
Performance and Best Practices
Actor Performance Optimization
// Create a new actor system for performance examples
implicit val perfSystem: ActorSystem = ActorSystem("PerformanceExample")
implicit val perfEc: ExecutionContext = perfSystem.dispatcher
// High-throughput actor design
class HighThroughputActor extends Actor {
private var messageCount = 0
private val startTime = System.currentTimeMillis()
def receive: Receive = {
case "benchmark" =>
messageCount += 1
// Report throughput every 100,000 messages
if (messageCount % 100000 == 0) {
val elapsed = System.currentTimeMillis() - startTime
val throughput = messageCount.toDouble / elapsed * 1000
println(f"Processed $messageCount messages in ${elapsed}ms (${throughput}%.0f msg/sec)")
}
case "count" =>
sender() ! messageCount
}
}
// Benchmark actor throughput
def benchmarkActorThroughput(): Unit = {
val actor = perfSystem.actorOf(Props[HighThroughputActor](), "benchmark-actor")
val numMessages = 1000000
val startTime = System.currentTimeMillis()
(1 to numMessages).foreach { _ =>
actor ! "benchmark"
}
// Wait a bit and check final count
Thread.sleep(5000)
implicit val timeout: Timeout = Timeout(5.seconds)
val countFuture = actor ? "count"
countFuture.foreach { count =>
val elapsed = System.currentTimeMillis() - startTime
val throughput = numMessages.toDouble / elapsed * 1000
println(f"Final: $count messages in ${elapsed}ms (${throughput}%.0f msg/sec)")
}
}
benchmarkActorThroughput()
// Memory-efficient actor with object pooling
object ObjectPool {
case class PooledMessage(id: Int, data: Array[Byte]) {
def reset(newId: Int, newData: Array[Byte]): PooledMessage = {
copy(id = newId, data = newData)
}
}
private val pool = new java.util.concurrent.ConcurrentLinkedQueue[PooledMessage]()
def acquire(id: Int, data: Array[Byte]): PooledMessage = {
Option(pool.poll()) match {
case Some(pooled) => pooled.reset(id, data)
case None => PooledMessage(id, data)
}
}
def release(message: PooledMessage): Unit = {
pool.offer(message)
}
}
class MemoryEfficientActor extends Actor {
import ObjectPool._
def receive: Receive = {
case msg: PooledMessage =>
// Process message
val processed = msg.data.sum
// Return to pool
ObjectPool.release(msg)
sender() ! processed
}
}
// Actor best practices
object ActorBestPractices {
// 1. Use immutable messages
case class ImmutableCommand(id: String, data: List[String])
// 2. Avoid blocking operations in receive
class NonBlockingActor extends Actor {
import context.dispatcher
def receive: Receive = {
case "blocking-work" =>
// Wrong: blocking in receive
// Thread.sleep(1000)
// Right: delegate to Future
val originalSender = sender()
Future {
Thread.sleep(1000)
"Work completed"
}.foreach(originalSender ! _)
case "cpu-work" =>
// CPU work is okay in receive
val result = (1 to 1000000).sum
sender() ! result
}
}
// 3. Use appropriate supervision strategies
class RobustSupervisor extends Actor {
override val supervisorStrategy = OneForOneStrategy(
maxNrOfRetries = 3,
withinTimeRange = 1.minute
) {
case _: IllegalArgumentException => Stop
case _: RuntimeException => Restart
case _ => Escalate
}
def receive: Receive = {
case "create-child" =>
val child = context.actorOf(Props[NonBlockingActor]())
sender() ! child
}
}
// 4. Batch processing for high throughput
class BatchProcessingActor extends Actor {
private var batch = List.empty[String]
private val batchSize = 100
def receive: Receive = {
case item: String =>
batch = item :: batch
if (batch.length >= batchSize) {
processBatch(batch)
batch = List.empty
}
case "flush" =>
if (batch.nonEmpty) {
processBatch(batch)
batch = List.empty
}
}
private def processBatch(items: List[String]): Unit = {
println(s"Processing batch of ${items.length} items")
// Process all items together
}
}
// 5. Use router for parallelism
def createOptimalRouter(actorProps: Props): ActorRef = {
val nrOfInstances = Runtime.getRuntime.availableProcessors()
perfSystem.actorOf(
RoundRobinPool(nrOfInstances).props(actorProps),
"optimal-router"
)
}
}
Thread.sleep(3000)
// Graceful shutdown
perfSystem.terminate()
Summary
In this lesson, you've mastered the Actor model and Akka:
✅ Actor Basics: Creating actors, sending messages, and handling responses
✅ Message Protocols: Designing type-safe communication patterns
✅ Actor Hierarchies: Building supervised actor systems
✅ Advanced Patterns: Routers, event sourcing, and state machines
✅ Distributed Systems: Actor selection and cluster concepts
✅ Performance: Optimization techniques and best practices
✅ Fault Tolerance: Supervision strategies and error handling
The Actor model provides a powerful foundation for building concurrent, distributed, and fault-tolerant systems that can scale from single machines to large clusters.
What's Next
In the next lesson, we'll explore advanced Scala features including implicits, type classes, and metaprogramming techniques that enable powerful library design and domain-specific languages.
Comments
Be the first to comment on this lesson!