Microservices Architecture with Akka and Distributed Systems

Building scalable, resilient microservices requires careful consideration of distributed systems patterns and the right technology stack. Akka provides a powerful foundation for building distributed applications using the actor model, clustering, and event-driven architectures. In this lesson, we'll explore how to design and implement microservices using Akka and related technologies.

Understanding Microservices Architecture

Key Principles

  • Single Responsibility: Each service has one business purpose
  • Autonomous: Services can be developed, deployed, and scaled independently
  • Decentralized: Services manage their own data and business logic
  • Fault Tolerant: Failure in one service doesn't cascade to others
  • Technology Agnostic: Services can use different technologies

Akka Advantages for Microservices

  • Actor Model: Natural fit for distributed systems
  • Location Transparency: Actors can be local or remote
  • Fault Tolerance: Built-in supervision strategies
  • Clustering: Native support for distributed systems
  • Message-Driven: Asynchronous, non-blocking communication

Setting Up Akka for Microservices

Project Dependencies

// build.sbt
val AkkaVersion = "2.7.0"
val AkkaHttpVersion = "10.4.0"
val AkkaManagementVersion = "1.2.0"

libraryDependencies ++= Seq(
  // Core Akka
  "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-cluster-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,

  // HTTP and Serialization
  "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
  "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion,
  "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,

  // Persistence and Projections
  "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
  "com.lightbend.akka" %% "akka-projection-eventsourced" % "1.3.1",
  "com.lightbend.akka" %% "akka-projection-jdbc" % "1.3.1",

  // Cluster Management
  "com.lightbend.akka.management" %% "akka-management" % AkkaManagementVersion,
  "com.lightbend.akka.management" %% "akka-management-cluster-http" % AkkaManagementVersion,
  "com.lightbend.akka.management" %% "akka-management-cluster-bootstrap" % AkkaManagementVersion,

  // Service Discovery
  "com.lightbend.akka.discovery" %% "akka-discovery-kubernetes-api" % AkkaManagementVersion,

  // Monitoring
  "com.lightbend.cinnamon" %% "cinnamon-akka" % "2.17.3",
  "com.lightbend.cinnamon" %% "cinnamon-prometheus" % "2.17.3",

  // Testing
  "com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test,
  "com.typesafe.akka" %% "akka-http-testkit" % AkkaHttpVersion % Test
)

Basic Configuration

# application.conf
akka {
  actor {
    provider = cluster

    serialization-bindings {
      "com.example.CborSerializable" = jackson-cbor
    }
  }

  remote.artery {
    canonical.hostname = "127.0.0.1"
    canonical.port = 2551
  }

  cluster {
    seed-nodes = [
      "akka://ClusterSystem@127.0.0.1:2551",
      "akka://ClusterSystem@127.0.0.1:2552"
    ]

    downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"

    sharding {
      number-of-shards = 100
      passivate-idle-entity-after = 2 minutes
    }
  }

  management {
    http {
      hostname = "127.0.0.1"
      port = 8558
    }

    cluster.bootstrap {
      contact-point-discovery {
        service-name = "my-service"
        discovery-method = kubernetes-api
      }
    }
  }

  persistence {
    journal.plugin = "jdbc-journal"
    snapshot-store.plugin = "jdbc-snapshot-store"
  }
}

# Database configuration for persistence
jdbc-journal {
  slick = ${slick}
}

jdbc-snapshot-store {
  slick = ${slick}
}

slick {
  profile = "slick.jdbc.PostgresProfile$"
  db {
    host = "localhost"
    port = 5432
    name = "eventstore"
    user = "postgres"
    password = "postgres"
    driver = "org.postgresql.Driver"
    numThreads = 5
    maxConnections = 5
    minConnections = 1
  }
}

Implementing Domain Services

Order Service Domain

// Domain Model
package com.example.order

import akka.actor.typed.ActorRef
import akka.persistence.typed.PersistenceId
import com.example.CborSerializable
import java.time.Instant
import java.util.UUID

// Commands
sealed trait OrderCommand extends CborSerializable
case class CreateOrder(
  customerId: String,
  items: List[OrderItem],
  replyTo: ActorRef[OrderResponse]
) extends OrderCommand

case class AddItem(
  orderId: String,
  item: OrderItem,
  replyTo: ActorRef[OrderResponse]
) extends OrderCommand

case class ConfirmOrder(
  orderId: String,
  replyTo: ActorRef[OrderResponse]
) extends OrderCommand

case class GetOrder(
  orderId: String,
  replyTo: ActorRef[OrderResponse]
) extends OrderCommand

// Events
sealed trait OrderEvent extends CborSerializable {
  def orderId: String
}

case class OrderCreated(
  orderId: String,
  customerId: String,
  items: List[OrderItem],
  timestamp: Instant = Instant.now()
) extends OrderEvent

case class ItemAdded(
  orderId: String,
  item: OrderItem,
  timestamp: Instant = Instant.now()
) extends OrderEvent

case class OrderConfirmed(
  orderId: String,
  timestamp: Instant = Instant.now()
) extends OrderEvent

// State
case class OrderItem(
  productId: String,
  quantity: Int,
  price: BigDecimal
) extends CborSerializable

sealed trait OrderStatus extends CborSerializable
case object Draft extends OrderStatus
case object Confirmed extends OrderStatus
case object Shipped extends OrderStatus
case object Delivered extends OrderStatus

case class Order(
  id: String,
  customerId: String,
  items: List[OrderItem],
  status: OrderStatus,
  createdAt: Instant,
  updatedAt: Instant
) extends CborSerializable {

  def total: BigDecimal = items.map(item => item.price * item.quantity).sum

  def addItem(item: OrderItem): Order = 
    copy(items = items :+ item, updatedAt = Instant.now())

  def confirm(): Order = 
    copy(status = Confirmed, updatedAt = Instant.now())
}

// Responses
sealed trait OrderResponse extends CborSerializable
case class OrderCreatedResponse(order: Order) extends OrderResponse
case class OrderUpdatedResponse(order: Order) extends OrderResponse
case class OrderRetrievedResponse(order: Option[Order]) extends OrderResponse
case class OrderErrorResponse(message: String) extends OrderResponse

Event Sourced Order Actor

// OrderActor.scala
package com.example.order

import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.{EventSourcedBehavior, Effect}
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import java.time.Instant

object OrderActor {

  val TypeKey: EntityTypeKey[OrderCommand] = EntityTypeKey[OrderCommand]("Order")

  def apply(orderId: String): Behavior[OrderCommand] = {
    Behaviors.setup { context =>
      EventSourcedBehavior[OrderCommand, OrderEvent, Option[Order]](
        persistenceId = PersistenceId(TypeKey.name, orderId),
        emptyState = None,
        commandHandler = commandHandler(orderId),
        eventHandler = eventHandler
      ).withTagger(_ => Set("order"))
    }
  }

  private def commandHandler(orderId: String): (Option[Order], OrderCommand) => Effect[OrderEvent, Option[Order]] = {
    (state, command) =>

      state match {
        case None =>
          command match {
            case CreateOrder(customerId, items, replyTo) =>
              val event = OrderCreated(orderId, customerId, items)
              Effect
                .persist(event)
                .thenReply(replyTo)(newState => OrderCreatedResponse(newState.get))

            case GetOrder(_, replyTo) =>
              Effect.reply(replyTo)(OrderRetrievedResponse(None))

            case _ =>
              Effect.unhandled.thenNoReply()
          }

        case Some(order) =>
          command match {
            case CreateOrder(_, _, replyTo) =>
              Effect.reply(replyTo)(OrderErrorResponse("Order already exists"))

            case AddItem(_, item, replyTo) if order.status == Draft =>
              val event = ItemAdded(orderId, item)
              Effect
                .persist(event)
                .thenReply(replyTo)(newState => OrderUpdatedResponse(newState.get))

            case ConfirmOrder(_, replyTo) if order.status == Draft =>
              val event = OrderConfirmed(orderId)
              Effect
                .persist(event)
                .thenReply(replyTo)(newState => OrderUpdatedResponse(newState.get))

            case GetOrder(_, replyTo) =>
              Effect.reply(replyTo)(OrderRetrievedResponse(Some(order)))

            case _ =>
              Effect.reply(command.replyTo)(OrderErrorResponse("Invalid command for current state"))
          }
      }
  }

  private val eventHandler: (Option[Order], OrderEvent) => Option[Order] = {
    (state, event) =>
      event match {
        case OrderCreated(orderId, customerId, items, timestamp) =>
          Some(Order(orderId, customerId, items, Draft, timestamp, timestamp))

        case ItemAdded(_, item, timestamp) =>
          state.map(_.addItem(item).copy(updatedAt = timestamp))

        case OrderConfirmed(_, timestamp) =>
          state.map(_.confirm().copy(updatedAt = timestamp))
      }
  }
}

HTTP API Layer

// OrderRoutes.scala
package com.example.order.api

import akka.actor.typed.{ActorRef, ActorSystem}
import akka.actor.typed.scaladsl.AskPattern._
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityRef}
import akka.http.scaladsl.model.{StatusCodes, HttpResponse}
import akka.http.scaladsl.server.{Directives, Route}
import akka.util.Timeout
import spray.json._
import com.example.order._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Success, Failure}
import java.util.UUID

// JSON serialization
trait OrderJsonProtocol extends DefaultJsonProtocol {
  implicit val orderItemFormat = jsonFormat3(OrderItem)
  implicit val orderFormat = jsonFormat6(Order.apply)

  implicit val createOrderRequestFormat = jsonFormat2(CreateOrderRequest)
  implicit val addItemRequestFormat = jsonFormat1(AddItemRequest)
}

case class CreateOrderRequest(customerId: String, items: List[OrderItem])
case class AddItemRequest(item: OrderItem)

class OrderRoutes(sharding: ClusterSharding)(implicit system: ActorSystem[_]) 
  extends Directives with OrderJsonProtocol {

  implicit val timeout: Timeout = 3.seconds
  implicit val ec: ExecutionContext = system.executionContext

  def routes: Route = {
    pathPrefix("orders") {
      concat(
        // POST /orders - Create new order
        path("orders") {
          post {
            entity(as[CreateOrderRequest]) { request =>
              val orderId = UUID.randomUUID().toString
              val orderRef: EntityRef[OrderCommand] = sharding.entityRefFor(OrderActor.TypeKey, orderId)

              val future: Future[OrderResponse] = orderRef.ask(
                CreateOrder(request.customerId, request.items, _)
              )

              onComplete(future) {
                case Success(OrderCreatedResponse(order)) =>
                  complete(StatusCodes.Created, order)
                case Success(OrderErrorResponse(message)) =>
                  complete(StatusCodes.BadRequest, Map("error" -> message))
                case Failure(ex) =>
                  complete(StatusCodes.InternalServerError, Map("error" -> ex.getMessage))
              }
            }
          }
        },

        // GET /orders/{orderId}
        path("orders" / Segment) { orderId =>
          get {
            val orderRef = sharding.entityRefFor(OrderActor.TypeKey, orderId)
            val future: Future[OrderResponse] = orderRef.ask(GetOrder(orderId, _))

            onComplete(future) {
              case Success(OrderRetrievedResponse(Some(order))) =>
                complete(order)
              case Success(OrderRetrievedResponse(None)) =>
                complete(StatusCodes.NotFound, Map("error" -> "Order not found"))
              case Failure(ex) =>
                complete(StatusCodes.InternalServerError, Map("error" -> ex.getMessage))
            }
          }
        },

        // POST /orders/{orderId}/items
        path("orders" / Segment / "items") { orderId =>
          post {
            entity(as[AddItemRequest]) { request =>
              val orderRef = sharding.entityRefFor(OrderActor.TypeKey, orderId)
              val future: Future[OrderResponse] = orderRef.ask(
                AddItem(orderId, request.item, _)
              )

              onComplete(future) {
                case Success(OrderUpdatedResponse(order)) =>
                  complete(order)
                case Success(OrderErrorResponse(message)) =>
                  complete(StatusCodes.BadRequest, Map("error" -> message))
                case Failure(ex) =>
                  complete(StatusCodes.InternalServerError, Map("error" -> ex.getMessage))
              }
            }
          }
        },

        // POST /orders/{orderId}/confirm
        path("orders" / Segment / "confirm") { orderId =>
          post {
            val orderRef = sharding.entityRefFor(OrderActor.TypeKey, orderId)
            val future: Future[OrderResponse] = orderRef.ask(ConfirmOrder(orderId, _))

            onComplete(future) {
              case Success(OrderUpdatedResponse(order)) =>
                complete(order)
              case Success(OrderErrorResponse(message)) =>
                complete(StatusCodes.BadRequest, Map("error" -> message))
              case Failure(ex) =>
                complete(StatusCodes.InternalServerError, Map("error" -> ex.getMessage))
            }
          }
        }
      )
    }
  }
}

Event Sourcing and CQRS

Read Model Projections

// OrderProjection.scala
package com.example.order.projection

import akka.actor.typed.ActorSystem
import akka.projection.eventsourced.EventEnvelope
import akka.projection.scaladsl.Handler
import akka.projection.jdbc.scaladsl.{JdbcHandler, JdbcProjection}
import akka.projection.{ProjectionBehavior, ProjectionId}
import com.example.order._
import slick.jdbc.PostgresProfile.api._
import scala.concurrent.{ExecutionContext, Future}

// Read model tables
case class OrderSummary(
  id: String,
  customerId: String,
  itemCount: Int,
  totalAmount: BigDecimal,
  status: String,
  createdAt: Long,
  updatedAt: Long
)

class OrderSummaryTable(tag: Tag) extends Table[OrderSummary](tag, "order_summary") {
  def id = column[String]("id", O.PrimaryKey)
  def customerId = column[String]("customer_id")
  def itemCount = column[Int]("item_count")
  def totalAmount = column[BigDecimal]("total_amount")
  def status = column[String]("status")
  def createdAt = column[Long]("created_at")
  def updatedAt = column[Long]("updated_at")

  def * = (id, customerId, itemCount, totalAmount, status, createdAt, updatedAt) <> (OrderSummary.tupled, OrderSummary.unapply)
}

val orderSummaries = TableQuery[OrderSummaryTable]

// Projection handler
class OrderProjectionHandler(database: Database)(implicit ec: ExecutionContext) 
  extends JdbcHandler[EventEnvelope[OrderEvent], Database]() {

  override def process(database: Database, envelope: EventEnvelope[OrderEvent]): Future[Done] = {
    envelope.event match {
      case OrderCreated(orderId, customerId, items, timestamp) =>
        val summary = OrderSummary(
          id = orderId,
          customerId = customerId,
          itemCount = items.length,
          totalAmount = items.map(i => i.price * i.quantity).sum,
          status = "Draft",
          createdAt = timestamp.toEpochMilli,
          updatedAt = timestamp.toEpochMilli
        )
        database.run(orderSummaries += summary).map(_ => Done)

      case ItemAdded(orderId, item, timestamp) =>
        val updateAction = orderSummaries
          .filter(_.id === orderId)
          .map(o => (o.itemCount, o.totalAmount, o.updatedAt))
          .update((999, item.price * item.quantity, timestamp.toEpochMilli)) // Simplified
        database.run(updateAction).map(_ => Done)

      case OrderConfirmed(orderId, timestamp) =>
        val updateAction = orderSummaries
          .filter(_.id === orderId)
          .map(o => (o.status, o.updatedAt))
          .update(("Confirmed", timestamp.toEpochMilli))
        database.run(updateAction).map(_ => Done)
    }
  }
}

// Projection setup
object OrderProjection {
  def create(system: ActorSystem[_], database: Database): ProjectionBehavior.Command = {
    val handler = new OrderProjectionHandler(database)(system.executionContext)

    JdbcProjection.exactlyOnce(
      projectionId = ProjectionId("OrderProjection", "orders"),
      sourceProvider = EventSourcedProvider.eventsByTag[OrderEvent](
        system = system,
        readJournalPluginId = "jdbc-read-journal",
        tag = "order"
      ),
      handler = () => handler,
      sessionFactory = () => database
    )(system)
  }
}

Query Service

// OrderQueryService.scala
package com.example.order.query

import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import com.example.order.projection.{OrderSummary, OrderSummaryTable}
import slick.jdbc.PostgresProfile.api._
import scala.concurrent.{ExecutionContext, Future}

// Query commands
sealed trait OrderQuery
case class GetOrderSummary(orderId: String, replyTo: ActorRef[OrderQueryResponse]) extends OrderQuery
case class GetCustomerOrders(customerId: String, replyTo: ActorRef[OrderQueryResponse]) extends OrderQuery
case class GetOrdersByStatus(status: String, replyTo: ActorRef[OrderQueryResponse]) extends OrderQuery

// Query responses
sealed trait OrderQueryResponse
case class OrderSummaryResponse(summary: Option[OrderSummary]) extends OrderQueryResponse
case class OrderSummariesResponse(summaries: List[OrderSummary]) extends OrderQueryResponse
case class QueryErrorResponse(message: String) extends OrderQueryResponse

class OrderQueryService(database: Database)(implicit ec: ExecutionContext) {

  private val orderSummaries = TableQuery[OrderSummaryTable]

  def apply(): Behavior[OrderQuery] = {
    Behaviors.receiveMessage {
      case GetOrderSummary(orderId, replyTo) =>
        val future = database.run(
          orderSummaries.filter(_.id === orderId).result.headOption
        )
        future.foreach(result => replyTo ! OrderSummaryResponse(result))
        Behaviors.same

      case GetCustomerOrders(customerId, replyTo) =>
        val future = database.run(
          orderSummaries
            .filter(_.customerId === customerId)
            .sortBy(_.createdAt.desc)
            .result
        )
        future.foreach(results => replyTo ! OrderSummariesResponse(results.toList))
        Behaviors.same

      case GetOrdersByStatus(status, replyTo) =>
        val future = database.run(
          orderSummaries
            .filter(_.status === status)
            .sortBy(_.updatedAt.desc)
            .result
        )
        future.foreach(results => replyTo ! OrderSummariesResponse(results.toList))
        Behaviors.same
    }
  }
}

Service Communication Patterns

Event-Driven Communication

// EventBus.scala
package com.example.messaging

import akka.actor.typed.{ActorRef, ActorSystem}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.pubsub.Topic
import com.example.CborSerializable

// Domain events
sealed trait DomainEvent extends CborSerializable {
  def aggregateId: String
  def timestamp: Long
}

case class OrderConfirmedEvent(
  aggregateId: String,
  customerId: String,
  totalAmount: BigDecimal,
  timestamp: Long = System.currentTimeMillis()
) extends DomainEvent

case class PaymentProcessedEvent(
  aggregateId: String,
  orderId: String,
  amount: BigDecimal,
  timestamp: Long = System.currentTimeMillis()
) extends DomainEvent

// Event bus
object EventBus {
  def create(system: ActorSystem[_]): ActorRef[Topic.Command[DomainEvent]] = {
    system.systemActorOf(Topic[DomainEvent]("domain-events"), "event-bus")
  }
}

// Event listener
class OrderEventListener {
  def apply(): Behavior[DomainEvent] = {
    Behaviors.receiveMessage {
      case OrderConfirmedEvent(orderId, customerId, amount, _) =>
        // Trigger payment processing
        println(s"Order $orderId confirmed for customer $customerId with amount $amount")
        // Could send command to payment service
        Behaviors.same

      case PaymentProcessedEvent(paymentId, orderId, amount, _) =>
        // Update order status to paid
        println(s"Payment $paymentId processed for order $orderId with amount $amount")
        Behaviors.same

      case _ =>
        Behaviors.same
    }
  }
}

// Usage in order service
class OrderServiceWithEvents(eventBus: ActorRef[Topic.Command[DomainEvent]]) {

  def publishOrderConfirmed(order: Order): Unit = {
    val event = OrderConfirmedEvent(
      order.id,
      order.customerId,
      order.total
    )
    eventBus ! Topic.Publish(event)
  }
}

Saga Pattern for Distributed Transactions

// OrderSaga.scala
package com.example.saga

import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler}
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.{EventSourcedBehavior, Effect}
import scala.concurrent.duration._

// Saga commands
sealed trait SagaCommand
case class StartOrderSaga(orderId: String, customerId: String, amount: BigDecimal) extends SagaCommand
case class PaymentSucceeded(orderId: String) extends SagaCommand
case class PaymentFailed(orderId: String, reason: String) extends SagaCommand
case class InventoryReserved(orderId: String) extends SagaCommand
case class InventoryReservationFailed(orderId: String, reason: String) extends SagaCommand
case object SagaTimeout extends SagaCommand

// Saga events
sealed trait SagaEvent
case class SagaStarted(orderId: String, customerId: String, amount: BigDecimal) extends SagaEvent
case class PaymentRequested(orderId: String) extends SagaEvent
case class InventoryRequested(orderId: String) extends SagaEvent
case class SagaCompleted(orderId: String) extends SagaEvent
case class SagaFailed(orderId: String, reason: String) extends SagaEvent
case class CompensationStarted(orderId: String) extends SagaEvent

// Saga state
sealed trait SagaState
case object Idle extends SagaState
case object PaymentPending extends SagaState
case object InventoryPending extends SagaState
case object Completed extends SagaState
case object Failed extends SagaState
case object Compensating extends SagaState

case class OrderSagaData(
  orderId: String,
  customerId: String,
  amount: BigDecimal,
  state: SagaState,
  paymentCompleted: Boolean = false,
  inventoryReserved: Boolean = false
)

object OrderSaga {

  def apply(
    sagaId: String,
    paymentService: ActorRef[PaymentCommand],
    inventoryService: ActorRef[InventoryCommand]
  ): Behavior[SagaCommand] = {

    Behaviors.withTimers { timers =>
      Behaviors.setup { context =>
        EventSourcedBehavior[SagaCommand, SagaEvent, Option[OrderSagaData]](
          persistenceId = PersistenceId("OrderSaga", sagaId),
          emptyState = None,
          commandHandler = commandHandler(paymentService, inventoryService, timers),
          eventHandler = eventHandler
        )
      }
    }
  }

  private def commandHandler(
    paymentService: ActorRef[PaymentCommand],
    inventoryService: ActorRef[InventoryCommand],
    timers: TimerScheduler[SagaCommand]
  ): (Option[OrderSagaData], SagaCommand) => Effect[SagaEvent, Option[OrderSagaData]] = {

    (state, command) =>
      state match {
        case None =>
          command match {
            case StartOrderSaga(orderId, customerId, amount) =>
              timers.startSingleTimer("saga-timeout", SagaTimeout, 30.seconds)
              Effect
                .persist(SagaStarted(orderId, customerId, amount))
                .thenRun { _ =>
                  // Start payment processing
                  paymentService ! ProcessPayment(orderId, customerId, amount)
                  // Reserve inventory
                  inventoryService ! ReserveInventory(orderId)
                }
                .thenPersist(PaymentRequested(orderId))
                .thenPersist(InventoryRequested(orderId))
            case _ => Effect.unhandled
          }

        case Some(data) =>
          command match {
            case PaymentSucceeded(orderId) if data.orderId == orderId =>
              val newData = data.copy(paymentCompleted = true)
              if (newData.inventoryReserved) {
                Effect.persist(SagaCompleted(orderId))
              } else {
                Effect.noReply
              }

            case PaymentFailed(orderId, reason) if data.orderId == orderId =>
              Effect
                .persist(SagaFailed(orderId, reason))
                .thenRun { _ =>
                  // Start compensation
                  if (data.inventoryReserved) {
                    inventoryService ! ReleaseInventory(orderId)
                  }
                }

            case InventoryReserved(orderId) if data.orderId == orderId =>
              val newData = data.copy(inventoryReserved = true)
              if (newData.paymentCompleted) {
                Effect.persist(SagaCompleted(orderId))
              } else {
                Effect.noReply
              }

            case InventoryReservationFailed(orderId, reason) if data.orderId == orderId =>
              Effect
                .persist(SagaFailed(orderId, reason))
                .thenRun { _ =>
                  // Start compensation
                  if (data.paymentCompleted) {
                    paymentService ! RefundPayment(orderId)
                  }
                }

            case SagaTimeout =>
              Effect
                .persist(SagaFailed(data.orderId, "Saga timeout"))
                .thenRun { _ =>
                  // Compensate all completed steps
                  if (data.paymentCompleted) {
                    paymentService ! RefundPayment(data.orderId)
                  }
                  if (data.inventoryReserved) {
                    inventoryService ! ReleaseInventory(data.orderId)
                  }
                }

            case _ => Effect.unhandled
          }
      }
  }

  private val eventHandler: (Option[OrderSagaData], SagaEvent) => Option[OrderSagaData] = {
    (state, event) =>
      event match {
        case SagaStarted(orderId, customerId, amount) =>
          Some(OrderSagaData(orderId, customerId, amount, PaymentPending))

        case PaymentRequested(_) =>
          state.map(_.copy(state = PaymentPending))

        case InventoryRequested(_) =>
          state.map(_.copy(state = InventoryPending))

        case SagaCompleted(_) =>
          state.map(_.copy(state = Completed))

        case SagaFailed(_, _) =>
          state.map(_.copy(state = Failed))

        case CompensationStarted(_) =>
          state.map(_.copy(state = Compensating))
      }
  }
}

// Supporting service commands
sealed trait PaymentCommand
case class ProcessPayment(orderId: String, customerId: String, amount: BigDecimal) extends PaymentCommand
case class RefundPayment(orderId: String) extends PaymentCommand

sealed trait InventoryCommand
case class ReserveInventory(orderId: String) extends InventoryCommand
case class ReleaseInventory(orderId: String) extends InventoryCommand

Clustering and High Availability

Cluster Setup

// ClusterApp.scala
package com.example

import akka.actor.typed.{ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.typed.{Cluster, Join}
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.http.scaladsl.Http
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.management.scaladsl.AkkaManagement
import com.example.order.{OrderActor, OrderRoutes}
import scala.concurrent.duration._
import scala.util.{Success, Failure}

object ClusterApp {

  def apply(): Behavior[Nothing] = {
    Behaviors.setup[Nothing] { context =>
      implicit val system = context.system

      // Start Akka Management
      AkkaManagement(system).start()

      // Start cluster bootstrap
      ClusterBootstrap(system).start()

      // Initialize cluster sharding
      val sharding = ClusterSharding(system)
      sharding.init(Entity(OrderActor.TypeKey) { entityContext =>
        OrderActor(entityContext.entityId)
      }.withStopMessage(Stop))

      // Start HTTP server
      val orderRoutes = new OrderRoutes(sharding)
      Http()
        .newServerAt("0.0.0.0", 8080)
        .bind(orderRoutes.routes)
        .onComplete {
          case Success(binding) =>
            context.log.info(s"HTTP server started at ${binding.localAddress}")
          case Failure(ex) =>
            context.log.error("Failed to start HTTP server", ex)
        }(system.executionContext)

      // Cluster event handling
      Cluster(system).subscriptions ! Subscribe(
        context.messageAdapter[ClusterEvent](ClusterEventAdapter),
        classOf[ClusterEvent]
      )

      Behaviors.empty
    }
  }

  case class ClusterEventAdapter(event: ClusterEvent)

  def main(args: Array[String]): Unit = {
    ActorSystem(ClusterApp(), "ClusterSystem")
  }
}

Service Discovery and Load Balancing

// ServiceDiscovery.scala
package com.example.discovery

import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.discovery.{Discovery, ServiceDiscovery}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Flow, Sink, Source}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

class ServiceClient(system: ActorSystem[_]) {
  implicit val sys = system
  implicit val ec: ExecutionContext = system.executionContext

  private val discovery: ServiceDiscovery = Discovery(system).discovery

  def callService(serviceName: String, path: String): Future[HttpResponse] = {
    for {
      resolved <- discovery.lookup(serviceName, 3.seconds)
      target = resolved.addresses.headOption.getOrElse(
        throw new RuntimeException(s"No instances found for service $serviceName")
      )
      response <- Http().singleRequest(
        HttpRequest(uri = s"http://${target.host}:${target.port.getOrElse(8080)}$path")
      )
    } yield response
  }

  // Circuit breaker pattern
  private val circuitBreaker = new akka.pattern.CircuitBreaker(
    scheduler = system.scheduler,
    maxFailures = 5,
    callTimeout = 10.seconds,
    resetTimeout = 30.seconds
  )(system.executionContext)

  def callServiceWithCircuitBreaker(serviceName: String, path: String): Future[HttpResponse] = {
    circuitBreaker.withCircuitBreaker(callService(serviceName, path))
  }
}

Monitoring and Observability

Health Checks

// HealthCheck.scala
package com.example.health

import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, StatusCodes}
import akka.http.scaladsl.server.{Directives, Route}
import akka.cluster.{Cluster, MemberStatus}
import akka.management.cluster.ClusterHealthCheck
import spray.json._

case class HealthStatus(
  status: String,
  cluster: ClusterHealth,
  services: Map[String, ServiceHealth]
)

case class ClusterHealth(
  leader: Boolean,
  members: Int,
  unreachable: Int
)

case class ServiceHealth(
  status: String,
  lastCheck: Long
)

trait HealthJsonProtocol extends DefaultJsonProtocol {
  implicit val serviceHealthFormat = jsonFormat2(ServiceHealth)
  implicit val clusterHealthFormat = jsonFormat3(ClusterHealth)
  implicit val healthStatusFormat = jsonFormat3(HealthStatus)
}

class HealthCheckRoutes(system: ActorSystem[_]) extends Directives with HealthJsonProtocol {

  def routes: Route = {
    path("health") {
      get {
        val cluster = Cluster(system)
        val clusterHealth = ClusterHealth(
          leader = cluster.state.leader.contains(cluster.selfAddress),
          members = cluster.state.members.count(_.status == MemberStatus.Up),
          unreachable = cluster.state.unreachable.size
        )

        val serviceHealth = Map(
          "database" -> ServiceHealth("healthy", System.currentTimeMillis()),
          "cache" -> ServiceHealth("healthy", System.currentTimeMillis())
        )

        val overallStatus = if (clusterHealth.unreachable == 0) "healthy" else "degraded"

        val health = HealthStatus(overallStatus, clusterHealth, serviceHealth)

        complete(health)
      }
    } ~
    path("ready") {
      get {
        val cluster = Cluster(system)
        if (cluster.selfMember.status == MemberStatus.Up) {
          complete(StatusCodes.OK, "Ready")
        } else {
          complete(StatusCodes.ServiceUnavailable, "Not ready")
        }
      }
    } ~
    path("alive") {
      get {
        complete(StatusCodes.OK, "Alive")
      }
    }
  }
}

Metrics and Tracing

// Metrics.scala
package com.example.metrics

import akka.actor.typed.ActorSystem
import kamon.Kamon
import kamon.prometheus.PrometheusReporter
import cinnamon.akka.typed._
import cinnamon.prometheus._

object MetricsSetup {

  def initialize(system: ActorSystem[_]): Unit = {
    // Initialize Kamon
    Kamon.init()

    // Add Prometheus reporter
    Kamon.registerModule("prometheus", new PrometheusReporter())

    // Custom metrics
    val orderCounter = Kamon.counter("orders.created")
    val orderTimer = Kamon.timer("orders.processing.time")
    val errorCounter = Kamon.counter("orders.errors")

    // System metrics
    system.setMetricsReporter(CinnamonPrometheusReporter())
  }

  def recordOrderCreated(): Unit = {
    Kamon.counter("orders.created").increment()
  }

  def recordOrderProcessingTime(durationMs: Long): Unit = {
    Kamon.timer("orders.processing.time").record(durationMs)
  }

  def recordError(errorType: String): Unit = {
    Kamon.counter("orders.errors").withTag("type", errorType).increment()
  }
}

Deployment and DevOps

Kubernetes Deployment


# kubernetes/order-service.yaml
apiVersion: apps/v1
kind: Deployment