Actor Model and Akka: Building Concurrent Systems

Introduction

The Actor model is a mathematical model of concurrent computation that treats actors as the fundamental units of computation. In response to a message, an actor can make local decisions, create more actors, send messages, and designate how to respond to the next message received.

Akka is Scala's premier implementation of the Actor model, providing a powerful toolkit for building concurrent, distributed, and fault-tolerant applications. This lesson will teach you to design and implement actor-based systems that can handle thousands of concurrent operations efficiently.

Understanding the Actor Model

Basic Actor Concepts

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

// Basic actor definition
class SimpleActor extends Actor {
  def receive: Receive = {
    case "hello" => 
      println(s"Hello from ${self.path.name}")
      sender() ! "Hello back!"

    case msg: String => 
      println(s"SimpleActor received: $msg")

    case number: Int => 
      println(s"SimpleActor received number: $number")
      sender() ! number * 2

    case _ => 
      println("SimpleActor received unknown message")
  }
}

// Creating an actor system
implicit val system: ActorSystem = ActorSystem("ActorExample")
implicit val ec: ExecutionContext = system.dispatcher
implicit val timeout: Timeout = Timeout(5.seconds)

// Creating actors
val simpleActor = system.actorOf(Props[SimpleActor](), "simple-actor")

// Sending messages (fire-and-forget)
simpleActor ! "hello"
simpleActor ! 42
simpleActor ! "test message"

// Request-response pattern with ask
val futureResponse: Future[Any] = simpleActor ? 10
futureResponse.foreach { response =>
  println(s"Response received: $response")
}

Thread.sleep(1000)  // Give actors time to process

// Stateful actor example
class CounterActor extends Actor {
  private var count = 0

  def receive: Receive = {
    case "increment" => 
      count += 1
      println(s"Counter incremented to $count")

    case "decrement" => 
      count -= 1
      println(s"Counter decremented to $count")

    case "get" => 
      sender() ! count

    case "reset" => 
      count = 0
      println("Counter reset")

    case value: Int => 
      count = value
      println(s"Counter set to $count")
  }
}

val counterActor = system.actorOf(Props[CounterActor](), "counter")

counterActor ! "increment"
counterActor ! "increment"
counterActor ! "increment"

val countFuture = counterActor ? "get"
countFuture.foreach(count => println(s"Current count: $count"))

// Actor with constructor parameters
class NamedActor(name: String, initialValue: Int) extends Actor {
  private var value = initialValue

  def receive: Receive = {
    case "status" => 
      sender() ! s"$name has value $value"

    case "add" => 
      value += 1

    case "subtract" => 
      value -= 1

    case newValue: Int => 
      value = newValue
  }
}

// Creating actors with constructor parameters
val namedActor = system.actorOf(
  Props(new NamedActor("Alice", 100)), 
  "named-actor"
)

namedActor ! "add"
namedActor ! "add"
val statusFuture = namedActor ? "status"
statusFuture.foreach(println)

Thread.sleep(1000)

Message Types and Protocols

// Define message types using case classes and objects
object BankAccount {
  // Commands
  case class Deposit(amount: Double)
  case class Withdraw(amount: Double)
  case object GetBalance
  case class Transfer(amount: Double, target: ActorRef)

  // Events/Responses
  case class BalanceResponse(balance: Double)
  case class DepositSuccess(newBalance: Double)
  case class WithdrawSuccess(newBalance: Double)
  case class WithdrawFailure(reason: String)
  case class TransferSuccess(amount: Double, newBalance: Double)
  case class TransferFailure(reason: String)
}

class BankAccountActor(accountId: String, initialBalance: Double) extends Actor {
  import BankAccount._

  private var balance = initialBalance

  def receive: Receive = {
    case Deposit(amount) if amount > 0 =>
      balance += amount
      println(s"Account $accountId: Deposited $$${amount}. New balance: $$${balance}")
      sender() ! DepositSuccess(balance)

    case Deposit(amount) =>
      println(s"Account $accountId: Invalid deposit amount: $$${amount}")
      sender() ! WithdrawFailure("Invalid deposit amount")

    case Withdraw(amount) if amount > 0 && amount <= balance =>
      balance -= amount
      println(s"Account $accountId: Withdrew $$${amount}. New balance: $$${balance}")
      sender() ! WithdrawSuccess(balance)

    case Withdraw(amount) if amount > balance =>
      println(s"Account $accountId: Insufficient funds for withdrawal of $$${amount}")
      sender() ! WithdrawFailure("Insufficient funds")

    case Withdraw(amount) =>
      println(s"Account $accountId: Invalid withdrawal amount: $$${amount}")
      sender() ! WithdrawFailure("Invalid withdrawal amount")

    case GetBalance =>
      sender() ! BalanceResponse(balance)

    case Transfer(amount, target) if amount > 0 && amount <= balance =>
      balance -= amount
      target ! Deposit(amount)
      println(s"Account $accountId: Transferred $$${amount}. New balance: $$${balance}")
      sender() ! TransferSuccess(amount, balance)

    case Transfer(amount, _) =>
      println(s"Account $accountId: Invalid transfer amount or insufficient funds: $$${amount}")
      sender() ! TransferFailure("Invalid transfer amount or insufficient funds")
  }
}

// Creating bank account actors
val account1 = system.actorOf(
  Props(new BankAccountActor("ACC001", 1000.0)), 
  "account1"
)

val account2 = system.actorOf(
  Props(new BankAccountActor("ACC002", 500.0)), 
  "account2"
)

// Testing bank account operations
import BankAccount._

account1 ! Deposit(200.0)
account1 ! Withdraw(150.0)
account1 ! Transfer(100.0, account2)

val balance1Future = account1 ? GetBalance
val balance2Future = account2 ? GetBalance

for {
  balance1 <- balance1Future.mapTo[BalanceResponse]
  balance2 <- balance2Future.mapTo[BalanceResponse]
} yield {
  println(s"Account 1 balance: $$${balance1.balance}")
  println(s"Account 2 balance: $$${balance2.balance}")
}

// Complex message protocol example
object TaskManager {
  case class CreateTask(taskId: String, description: String, priority: Int)
  case class AssignTask(taskId: String, assignee: String)
  case class CompleteTask(taskId: String)
  case class GetTaskStatus(taskId: String)
  case object GetAllTasks

  case class Task(id: String, description: String, priority: Int, 
                 assignee: Option[String] = None, completed: Boolean = false)

  sealed trait TaskResponse
  case class TaskCreated(task: Task) extends TaskResponse
  case class TaskAssigned(task: Task) extends TaskResponse
  case class TaskCompleted(task: Task) extends TaskResponse
  case class TaskNotFound(taskId: String) extends TaskResponse
  case class TaskStatus(task: Task) extends TaskResponse
  case class AllTasks(tasks: List[Task]) extends TaskResponse
}

class TaskManagerActor extends Actor {
  import TaskManager._

  private var tasks = Map.empty[String, Task]

  def receive: Receive = {
    case CreateTask(taskId, description, priority) =>
      val task = Task(taskId, description, priority)
      tasks += taskId -> task
      println(s"Created task: $taskId")
      sender() ! TaskCreated(task)

    case AssignTask(taskId, assignee) =>
      tasks.get(taskId) match {
        case Some(task) =>
          val updatedTask = task.copy(assignee = Some(assignee))
          tasks += taskId -> updatedTask
          println(s"Assigned task $taskId to $assignee")
          sender() ! TaskAssigned(updatedTask)
        case None =>
          sender() ! TaskNotFound(taskId)
      }

    case CompleteTask(taskId) =>
      tasks.get(taskId) match {
        case Some(task) =>
          val completedTask = task.copy(completed = true)
          tasks += taskId -> completedTask
          println(s"Completed task: $taskId")
          sender() ! TaskCompleted(completedTask)
        case None =>
          sender() ! TaskNotFound(taskId)
      }

    case GetTaskStatus(taskId) =>
      tasks.get(taskId) match {
        case Some(task) => sender() ! TaskStatus(task)
        case None => sender() ! TaskNotFound(taskId)
      }

    case GetAllTasks =>
      sender() ! AllTasks(tasks.values.toList.sortBy(_.priority))
  }
}

val taskManager = system.actorOf(Props[TaskManagerActor](), "task-manager")

// Test task manager
import TaskManager._

taskManager ! CreateTask("TASK001", "Implement user authentication", 1)
taskManager ! CreateTask("TASK002", "Write unit tests", 2)
taskManager ! CreateTask("TASK003", "Deploy to production", 3)

taskManager ! AssignTask("TASK001", "Alice")
taskManager ! AssignTask("TASK002", "Bob")

taskManager ! CompleteTask("TASK001")

val allTasksFuture = taskManager ? GetAllTasks
allTasksFuture.foreach {
  case AllTasks(tasks) =>
    println("All tasks:")
    tasks.foreach { task =>
      val status = if (task.completed) "COMPLETED" else "PENDING"
      val assignee = task.assignee.getOrElse("UNASSIGNED")
      println(s"  ${task.id}: ${task.description} [Priority: ${task.priority}, Assignee: $assignee, Status: $status]")
    }
}

Thread.sleep(2000)

Actor Hierarchies and Supervision

Creating Actor Hierarchies

// Parent actor that manages child actors
object WorkerManager {
  case class CreateWorker(workerId: String)
  case class AssignWork(workerId: String, work: String)
  case class GetWorkerStatus(workerId: String)
  case object GetAllWorkers

  case class WorkerCreated(workerId: String)
  case class WorkAssigned(workerId: String, work: String)
  case class WorkerNotFound(workerId: String)
  case class WorkerStatus(workerId: String, currentWork: Option[String], completedWork: Int)
  case class AllWorkers(workers: List[String])
}

object Worker {
  case class DoWork(work: String)
  case object GetStatus
  case class WorkCompleted(work: String)
  case class Status(currentWork: Option[String], completedWork: Int)
}

class WorkerActor(workerId: String) extends Actor {
  import Worker._

  private var currentWork: Option[String] = None
  private var completedWorkCount = 0

  def receive: Receive = {
    case DoWork(work) =>
      currentWork = Some(work)
      println(s"Worker $workerId starting work: $work")

      // Simulate work completion after some time
      import scala.concurrent.duration._
      import context.dispatcher
      context.system.scheduler.scheduleOnce(1.second) {
        self ! WorkCompleted(work)
      }

    case WorkCompleted(work) =>
      if (currentWork.contains(work)) {
        currentWork = None
        completedWorkCount += 1
        println(s"Worker $workerId completed work: $work (total completed: $completedWorkCount)")
        context.parent ! WorkerManager.WorkAssigned(workerId, work)
      }

    case GetStatus =>
      sender() ! Status(currentWork, completedWorkCount)
  }
}

class WorkerManagerActor extends Actor {
  import WorkerManager._

  private var workers = Map.empty[String, ActorRef]

  def receive: Receive = {
    case CreateWorker(workerId) =>
      if (!workers.contains(workerId)) {
        val worker = context.actorOf(Props(new WorkerActor(workerId)), s"worker-$workerId")
        workers += workerId -> worker
        println(s"Created worker: $workerId")
        sender() ! WorkerCreated(workerId)
      }

    case AssignWork(workerId, work) =>
      workers.get(workerId) match {
        case Some(worker) =>
          worker ! Worker.DoWork(work)
          sender() ! WorkAssigned(workerId, work)
        case None =>
          sender() ! WorkerNotFound(workerId)
      }

    case GetWorkerStatus(workerId) =>
      workers.get(workerId) match {
        case Some(worker) =>
          implicit val timeout: Timeout = Timeout(5.seconds)
          val statusFuture = worker ? Worker.GetStatus
          statusFuture.foreach {
            case Worker.Status(currentWork, completedWork) =>
              sender() ! WorkerStatus(workerId, currentWork, completedWork)
          }
        case None =>
          sender() ! WorkerNotFound(workerId)
      }

    case GetAllWorkers =>
      sender() ! AllWorkers(workers.keys.toList)
  }
}

val workerManager = system.actorOf(Props[WorkerManagerActor](), "worker-manager")

// Test worker manager
import WorkerManager._

workerManager ! CreateWorker("W001")
workerManager ! CreateWorker("W002")
workerManager ! CreateWorker("W003")

workerManager ! AssignWork("W001", "Process data batch 1")
workerManager ! AssignWork("W002", "Generate report A")
workerManager ! AssignWork("W003", "Backup database")

Thread.sleep(3000)  // Wait for work to complete

// Supervision strategies
import akka.actor.SupervisorStrategy._
import akka.actor.{OneForOneStrategy, AllForOneStrategy}
import scala.concurrent.duration._

class SupervisedWorker extends Actor {
  var workCount = 0

  def receive: Receive = {
    case "work" =>
      workCount += 1
      println(s"Worker ${self.path.name} doing work (count: $workCount)")

    case "fail" =>
      println(s"Worker ${self.path.name} is failing!")
      throw new RuntimeException("Worker failed!")

    case "stop" =>
      println(s"Worker ${self.path.name} stopping")
      context.stop(self)

    case msg =>
      println(s"Worker ${self.path.name} received: $msg")
  }

  override def preStart(): Unit = {
    println(s"Worker ${self.path.name} starting")
  }

  override def postStop(): Unit = {
    println(s"Worker ${self.path.name} stopped")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    println(s"Worker ${self.path.name} restarting due to: ${reason.getMessage}")
    super.preRestart(reason, message)
  }

  override def postRestart(reason: Throwable): Unit = {
    println(s"Worker ${self.path.name} restarted")
    super.postRestart(reason)
  }
}

class SupervisorActor extends Actor {

  // Supervision strategy
  override val supervisorStrategy = OneForOneStrategy(
    maxNrOfRetries = 3,
    withinTimeRange = 1.minute
  ) {
    case _: RuntimeException => Restart
    case _: IllegalArgumentException => Stop
    case _ => Escalate
  }

  def receive: Receive = {
    case "create-worker" =>
      val worker = context.actorOf(Props[SupervisedWorker](), s"worker-${System.currentTimeMillis()}")
      sender() ! worker

    case "create-workers" =>
      val workers = (1 to 3).map { i =>
        context.actorOf(Props[SupervisedWorker](), s"worker-$i")
      }
      sender() ! workers

    case msg =>
      println(s"Supervisor received: $msg")
  }
}

val supervisor = system.actorOf(Props[SupervisorActor](), "supervisor")

// Test supervision
val workersFuture = supervisor ? "create-workers"
workersFuture.foreach {
  case workers: Seq[ActorRef] =>
    workers.foreach { worker =>
      worker ! "work"
      worker ! "work"
    }

    // Cause one worker to fail
    workers.head ! "fail"

    Thread.sleep(1000)

    // Try to send more work after failure
    workers.foreach(_ ! "work")
}

Thread.sleep(3000)

Advanced Actor Patterns

Router Patterns and Actor Pools

import akka.routing.{RoundRobinPool, BalancingPool, SmallestMailboxPool}
import akka.actor.{Actor, Props}

// Work processing actor
class ProcessingActor extends Actor {
  def receive: Receive = {
    case work: String =>
      val processingTime = scala.util.Random.nextInt(1000) + 500
      println(s"${self.path.name} processing: $work (${processingTime}ms)")
      Thread.sleep(processingTime)
      sender() ! s"Completed: $work by ${self.path.name}"

    case number: Int =>
      val result = number * number
      println(s"${self.path.name} computed: $number^2 = $result")
      sender() ! result
  }
}

// Round-robin router
val roundRobinRouter = system.actorOf(
  RoundRobinPool(5).props(Props[ProcessingActor]()), 
  "round-robin-router"
)

// Send work to round-robin router
(1 to 10).foreach { i =>
  roundRobinRouter ! s"Task-$i"
}

Thread.sleep(3000)

// Balancing pool router (work-stealing)
val balancingRouter = system.actorOf(
  BalancingPool(3).props(Props[ProcessingActor]()), 
  "balancing-router"
)

(1 to 8).foreach { i =>
  balancingRouter ! s"BalancedTask-$i"
}

Thread.sleep(3000)

// Smallest mailbox router
val smallestMailboxRouter = system.actorOf(
  SmallestMailboxPool(4).props(Props[ProcessingActor]()), 
  "smallest-mailbox-router"
)

(1 to 12).foreach { i =>
  smallestMailboxRouter ! s"MailboxTask-$i"
}

// Event Sourcing pattern
object EventStore {
  sealed trait Event
  case class UserCreated(userId: String, name: String, email: String) extends Event
  case class UserUpdated(userId: String, name: Option[String], email: Option[String]) extends Event
  case class UserDeleted(userId: String) extends Event

  case class User(id: String, name: String, email: String, deleted: Boolean = false)

  case class GetUser(userId: String)
  case class CreateUser(userId: String, name: String, email: String)
  case class UpdateUser(userId: String, name: Option[String], email: Option[String])
  case class DeleteUser(userId: String)

  case class UserResponse(user: Option[User])
  case class UserCreatedResponse(user: User)
  case class UserUpdatedResponse(user: User)
  case class UserDeletedResponse(userId: String)
}

class EventSourcingActor extends Actor {
  import EventStore._

  private var events = List.empty[Event]
  private var users = Map.empty[String, User]

  def receive: Receive = {
    case CreateUser(userId, name, email) =>
      val event = UserCreated(userId, name, email)
      applyEvent(event)
      sender() ! UserCreatedResponse(users(userId))

    case UpdateUser(userId, name, email) =>
      if (users.contains(userId)) {
        val event = UserUpdated(userId, name, email)
        applyEvent(event)
        sender() ! UserUpdatedResponse(users(userId))
      }

    case DeleteUser(userId) =>
      if (users.contains(userId)) {
        val event = UserDeleted(userId)
        applyEvent(event)
        sender() ! UserDeletedResponse(userId)
      }

    case GetUser(userId) =>
      sender() ! UserResponse(users.get(userId).filter(!_.deleted))
  }

  private def applyEvent(event: Event): Unit = {
    events = event :: events

    event match {
      case UserCreated(userId, name, email) =>
        users += userId -> User(userId, name, email)

      case UserUpdated(userId, name, email) =>
        users.get(userId).foreach { user =>
          users += userId -> user.copy(
            name = name.getOrElse(user.name),
            email = email.getOrElse(user.email)
          )
        }

      case UserDeleted(userId) =>
        users.get(userId).foreach { user =>
          users += userId -> user.copy(deleted = true)
        }
    }

    println(s"Applied event: $event (total events: ${events.length})")
  }
}

val eventStore = system.actorOf(Props[EventSourcingActor](), "event-store")

// Test event sourcing
import EventStore._

eventStore ! CreateUser("user1", "Alice", "alice@example.com")
eventStore ! CreateUser("user2", "Bob", "bob@example.com")
eventStore ! UpdateUser("user1", Some("Alice Smith"), None)
eventStore ! DeleteUser("user2")

val user1Future = eventStore ? GetUser("user1")
val user2Future = eventStore ? GetUser("user2")

user1Future.foreach(println)
user2Future.foreach(println)

// State machine actor using become/unbecome
object LightSwitch {
  case object TurnOn
  case object TurnOff
  case object GetState

  case class State(isOn: Boolean)
}

class LightSwitchActor extends Actor {
  import LightSwitch._

  def receive: Receive = off

  def off: Receive = {
    case TurnOn =>
      println("Light turned ON")
      context.become(on)

    case TurnOff =>
      println("Light is already OFF")

    case GetState =>
      sender() ! State(isOn = false)
  }

  def on: Receive = {
    case TurnOff =>
      println("Light turned OFF")
      context.become(off)

    case TurnOn =>
      println("Light is already ON")

    case GetState =>
      sender() ! State(isOn = true)
  }
}

val lightSwitch = system.actorOf(Props[LightSwitchActor](), "light-switch")

lightSwitch ! LightSwitch.GetState
lightSwitch ! LightSwitch.TurnOn
lightSwitch ! LightSwitch.GetState
lightSwitch ! LightSwitch.TurnOn
lightSwitch ! LightSwitch.TurnOff
lightSwitch ! LightSwitch.GetState

Thread.sleep(1000)

// Producer-Consumer pattern
object ProducerConsumer {
  case class Produce(item: String)
  case class Consume()
  case class Item(value: String)
  case object NoItems
}

class ProducerActor(consumer: ActorRef) extends Actor {
  import ProducerConsumer._

  def receive: Receive = {
    case Produce(item) =>
      println(s"Producing: $item")
      consumer ! Item(item)
  }
}

class ConsumerActor extends Actor {
  import ProducerConsumer._

  private var queue = List.empty[String]

  def receive: Receive = {
    case Item(value) =>
      queue = queue :+ value
      println(s"Received item: $value (queue size: ${queue.length})")

    case Consume() =>
      queue match {
        case head :: tail =>
          queue = tail
          println(s"Consumed: $head (queue size: ${queue.length})")
          sender() ! Item(head)
        case Nil =>
          println("No items to consume")
          sender() ! NoItems
      }
  }
}

val consumer = system.actorOf(Props[ConsumerActor](), "consumer")
val producer = system.actorOf(Props(new ProducerActor(consumer)), "producer")

// Test producer-consumer
import ProducerConsumer._

producer ! Produce("Item1")
producer ! Produce("Item2")
producer ! Produce("Item3")

Thread.sleep(500)

consumer ! Consume()
consumer ! Consume()
consumer ! Consume()
consumer ! Consume()  // Should return NoItems

Thread.sleep(2000)

Building Distributed Systems

Actor Selection and Remote Actors

// Configuration for distributed actors (would normally be in application.conf)
/*
akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }
}
*/

// For this example, we'll simulate distributed behavior with actor selection
import akka.actor.{ActorSelection, Identify, ActorIdentity}

// Distributed service actor
class DistributedServiceActor extends Actor {
  def receive: Receive = {
    case "ping" =>
      println(s"${self.path} received ping")
      sender() ! "pong"

    case msg: String =>
      println(s"Service ${self.path.name} processing: $msg")
      sender() ! s"Processed: $msg"

    case Identify(messageId) =>
      sender() ! ActorIdentity(messageId, Some(self))
  }
}

// Create multiple service instances
val service1 = system.actorOf(Props[DistributedServiceActor](), "service1")
val service2 = system.actorOf(Props[DistributedServiceActor](), "service2")
val service3 = system.actorOf(Props[DistributedServiceActor](), "service3")

// Client actor that uses actor selection
class ClientActor extends Actor {
  import context.dispatcher

  def receive: Receive = {
    case "discover-services" =>
      val selection: ActorSelection = context.actorSelection("/user/service*")
      selection ! Identify("discovery")

    case ActorIdentity("discovery", Some(ref)) =>
      println(s"Discovered service: ${ref.path}")
      ref ! "Hello from client"

    case ActorIdentity("discovery", None) =>
      println("Service not found")

    case response: String =>
      println(s"Client received: $response")
  }
}

val client = system.actorOf(Props[ClientActor](), "client")
client ! "discover-services"

Thread.sleep(2000)

// Cluster-aware actor (conceptual)
object ClusterMessages {
  case class RegisterNode(nodeId: String, address: String)
  case class UnregisterNode(nodeId: String)
  case class GetClusterStatus()
  case class RouteMessage(message: Any, nodeId: Option[String] = None)

  case class ClusterStatus(nodes: Map[String, String])
  case class MessageRouted(nodeId: String, response: Any)
}

class ClusterManagerActor extends Actor {
  import ClusterMessages._

  private var nodes = Map.empty[String, String]
  private var nodeActors = Map.empty[String, ActorRef]

  def receive: Receive = {
    case RegisterNode(nodeId, address) =>
      nodes += nodeId -> address
      // In real implementation, would create remote actor ref
      val nodeActor = context.actorOf(Props[DistributedServiceActor](), s"node-$nodeId")
      nodeActors += nodeId -> nodeActor
      println(s"Registered node: $nodeId at $address")

    case UnregisterNode(nodeId) =>
      nodes -= nodeId
      nodeActors.get(nodeId).foreach(context.stop)
      nodeActors -= nodeId
      println(s"Unregistered node: $nodeId")

    case GetClusterStatus() =>
      sender() ! ClusterStatus(nodes)

    case RouteMessage(message, Some(nodeId)) =>
      nodeActors.get(nodeId) match {
        case Some(actor) =>
          implicit val timeout: Timeout = Timeout(5.seconds)
          val responseFuture = actor ? message
          responseFuture.foreach { response =>
            sender() ! MessageRouted(nodeId, response)
          }
        case None =>
          println(s"Node not found: $nodeId")
      }

    case RouteMessage(message, None) =>
      // Round-robin to any available node
      if (nodeActors.nonEmpty) {
        val nodeId = nodeActors.keys.toSeq(scala.util.Random.nextInt(nodeActors.size))
        self ! RouteMessage(message, Some(nodeId))
      }
  }
}

val clusterManager = system.actorOf(Props[ClusterManagerActor](), "cluster-manager")

// Test cluster management
import ClusterMessages._

clusterManager ! RegisterNode("node1", "127.0.0.1:2551")
clusterManager ! RegisterNode("node2", "127.0.0.1:2552")
clusterManager ! RegisterNode("node3", "127.0.0.1:2553")

clusterManager ! RouteMessage("task1", Some("node1"))
clusterManager ! RouteMessage("task2", None)  // Round-robin

val clusterStatusFuture = clusterManager ? GetClusterStatus()
clusterStatusFuture.foreach {
  case ClusterStatus(nodes) =>
    println("Cluster nodes:")
    nodes.foreach { case (nodeId, address) =>
      println(s"  $nodeId: $address")
    }
}

Thread.sleep(3000)

// Graceful shutdown
println("Shutting down actor system...")
system.terminate()

Performance and Best Practices

Actor Performance Optimization

// Create a new actor system for performance examples
implicit val perfSystem: ActorSystem = ActorSystem("PerformanceExample")
implicit val perfEc: ExecutionContext = perfSystem.dispatcher

// High-throughput actor design
class HighThroughputActor extends Actor {
  private var messageCount = 0
  private val startTime = System.currentTimeMillis()

  def receive: Receive = {
    case "benchmark" =>
      messageCount += 1

      // Report throughput every 100,000 messages
      if (messageCount % 100000 == 0) {
        val elapsed = System.currentTimeMillis() - startTime
        val throughput = messageCount.toDouble / elapsed * 1000
        println(f"Processed $messageCount messages in ${elapsed}ms (${throughput}%.0f msg/sec)")
      }

    case "count" =>
      sender() ! messageCount
  }
}

// Benchmark actor throughput
def benchmarkActorThroughput(): Unit = {
  val actor = perfSystem.actorOf(Props[HighThroughputActor](), "benchmark-actor")

  val numMessages = 1000000
  val startTime = System.currentTimeMillis()

  (1 to numMessages).foreach { _ =>
    actor ! "benchmark"
  }

  // Wait a bit and check final count
  Thread.sleep(5000)

  implicit val timeout: Timeout = Timeout(5.seconds)
  val countFuture = actor ? "count"
  countFuture.foreach { count =>
    val elapsed = System.currentTimeMillis() - startTime
    val throughput = numMessages.toDouble / elapsed * 1000
    println(f"Final: $count messages in ${elapsed}ms (${throughput}%.0f msg/sec)")
  }
}

benchmarkActorThroughput()

// Memory-efficient actor with object pooling
object ObjectPool {
  case class PooledMessage(id: Int, data: Array[Byte]) {
    def reset(newId: Int, newData: Array[Byte]): PooledMessage = {
      copy(id = newId, data = newData)
    }
  }

  private val pool = new java.util.concurrent.ConcurrentLinkedQueue[PooledMessage]()

  def acquire(id: Int, data: Array[Byte]): PooledMessage = {
    Option(pool.poll()) match {
      case Some(pooled) => pooled.reset(id, data)
      case None => PooledMessage(id, data)
    }
  }

  def release(message: PooledMessage): Unit = {
    pool.offer(message)
  }
}

class MemoryEfficientActor extends Actor {
  import ObjectPool._

  def receive: Receive = {
    case msg: PooledMessage =>
      // Process message
      val processed = msg.data.sum

      // Return to pool
      ObjectPool.release(msg)

      sender() ! processed
  }
}

// Actor best practices
object ActorBestPractices {

  // 1. Use immutable messages
  case class ImmutableCommand(id: String, data: List[String])

  // 2. Avoid blocking operations in receive
  class NonBlockingActor extends Actor {
    import context.dispatcher

    def receive: Receive = {
      case "blocking-work" =>
        // Wrong: blocking in receive
        // Thread.sleep(1000)

        // Right: delegate to Future
        val originalSender = sender()
        Future {
          Thread.sleep(1000)
          "Work completed"
        }.foreach(originalSender ! _)

      case "cpu-work" =>
        // CPU work is okay in receive
        val result = (1 to 1000000).sum
        sender() ! result
    }
  }

  // 3. Use appropriate supervision strategies
  class RobustSupervisor extends Actor {
    override val supervisorStrategy = OneForOneStrategy(
      maxNrOfRetries = 3,
      withinTimeRange = 1.minute
    ) {
      case _: IllegalArgumentException => Stop
      case _: RuntimeException => Restart
      case _ => Escalate
    }

    def receive: Receive = {
      case "create-child" =>
        val child = context.actorOf(Props[NonBlockingActor]())
        sender() ! child
    }
  }

  // 4. Batch processing for high throughput
  class BatchProcessingActor extends Actor {
    private var batch = List.empty[String]
    private val batchSize = 100

    def receive: Receive = {
      case item: String =>
        batch = item :: batch

        if (batch.length >= batchSize) {
          processBatch(batch)
          batch = List.empty
        }

      case "flush" =>
        if (batch.nonEmpty) {
          processBatch(batch)
          batch = List.empty
        }
    }

    private def processBatch(items: List[String]): Unit = {
      println(s"Processing batch of ${items.length} items")
      // Process all items together
    }
  }

  // 5. Use router for parallelism
  def createOptimalRouter(actorProps: Props): ActorRef = {
    val nrOfInstances = Runtime.getRuntime.availableProcessors()
    perfSystem.actorOf(
      RoundRobinPool(nrOfInstances).props(actorProps),
      "optimal-router"
    )
  }
}

Thread.sleep(3000)

// Graceful shutdown
perfSystem.terminate()

Summary

In this lesson, you've mastered the Actor model and Akka:

Actor Basics: Creating actors, sending messages, and handling responses
Message Protocols: Designing type-safe communication patterns
Actor Hierarchies: Building supervised actor systems
Advanced Patterns: Routers, event sourcing, and state machines
Distributed Systems: Actor selection and cluster concepts
Performance: Optimization techniques and best practices
Fault Tolerance: Supervision strategies and error handling

The Actor model provides a powerful foundation for building concurrent, distributed, and fault-tolerant systems that can scale from single machines to large clusters.

What's Next

In the next lesson, we'll explore advanced Scala features including implicits, type classes, and metaprogramming techniques that enable powerful library design and domain-specific languages.