Microservices Architecture with Akka and Distributed Systems
Building scalable, resilient microservices requires careful consideration of distributed systems patterns and the right technology stack. Akka provides a powerful foundation for building distributed applications using the actor model, clustering, and event-driven architectures. In this lesson, we'll explore how to design and implement microservices using Akka and related technologies.
Understanding Microservices Architecture
Key Principles
- Single Responsibility: Each service has one business purpose
- Autonomous: Services can be developed, deployed, and scaled independently
- Decentralized: Services manage their own data and business logic
- Fault Tolerant: Failure in one service doesn't cascade to others
- Technology Agnostic: Services can use different technologies
Akka Advantages for Microservices
- Actor Model: Natural fit for distributed systems
- Location Transparency: Actors can be local or remote
- Fault Tolerance: Built-in supervision strategies
- Clustering: Native support for distributed systems
- Message-Driven: Asynchronous, non-blocking communication
Setting Up Akka for Microservices
Project Dependencies
// build.sbt
val AkkaVersion = "2.7.0"
val AkkaHttpVersion = "10.4.0"
val AkkaManagementVersion = "1.2.0"
libraryDependencies ++= Seq(
// Core Akka
"com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-cluster-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
// HTTP and Serialization
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
// Persistence and Projections
"com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
"com.lightbend.akka" %% "akka-projection-eventsourced" % "1.3.1",
"com.lightbend.akka" %% "akka-projection-jdbc" % "1.3.1",
// Cluster Management
"com.lightbend.akka.management" %% "akka-management" % AkkaManagementVersion,
"com.lightbend.akka.management" %% "akka-management-cluster-http" % AkkaManagementVersion,
"com.lightbend.akka.management" %% "akka-management-cluster-bootstrap" % AkkaManagementVersion,
// Service Discovery
"com.lightbend.akka.discovery" %% "akka-discovery-kubernetes-api" % AkkaManagementVersion,
// Monitoring
"com.lightbend.cinnamon" %% "cinnamon-akka" % "2.17.3",
"com.lightbend.cinnamon" %% "cinnamon-prometheus" % "2.17.3",
// Testing
"com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test,
"com.typesafe.akka" %% "akka-http-testkit" % AkkaHttpVersion % Test
)
Basic Configuration
# application.conf
akka {
actor {
provider = cluster
serialization-bindings {
"com.example.CborSerializable" = jackson-cbor
}
}
remote.artery {
canonical.hostname = "127.0.0.1"
canonical.port = 2551
}
cluster {
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:2551",
"akka://ClusterSystem@127.0.0.1:2552"
]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
sharding {
number-of-shards = 100
passivate-idle-entity-after = 2 minutes
}
}
management {
http {
hostname = "127.0.0.1"
port = 8558
}
cluster.bootstrap {
contact-point-discovery {
service-name = "my-service"
discovery-method = kubernetes-api
}
}
}
persistence {
journal.plugin = "jdbc-journal"
snapshot-store.plugin = "jdbc-snapshot-store"
}
}
# Database configuration for persistence
jdbc-journal {
slick = ${slick}
}
jdbc-snapshot-store {
slick = ${slick}
}
slick {
profile = "slick.jdbc.PostgresProfile$"
db {
host = "localhost"
port = 5432
name = "eventstore"
user = "postgres"
password = "postgres"
driver = "org.postgresql.Driver"
numThreads = 5
maxConnections = 5
minConnections = 1
}
}
Implementing Domain Services
Order Service Domain
// Domain Model
package com.example.order
import akka.actor.typed.ActorRef
import akka.persistence.typed.PersistenceId
import com.example.CborSerializable
import java.time.Instant
import java.util.UUID
// Commands
sealed trait OrderCommand extends CborSerializable
case class CreateOrder(
customerId: String,
items: List[OrderItem],
replyTo: ActorRef[OrderResponse]
) extends OrderCommand
case class AddItem(
orderId: String,
item: OrderItem,
replyTo: ActorRef[OrderResponse]
) extends OrderCommand
case class ConfirmOrder(
orderId: String,
replyTo: ActorRef[OrderResponse]
) extends OrderCommand
case class GetOrder(
orderId: String,
replyTo: ActorRef[OrderResponse]
) extends OrderCommand
// Events
sealed trait OrderEvent extends CborSerializable {
def orderId: String
}
case class OrderCreated(
orderId: String,
customerId: String,
items: List[OrderItem],
timestamp: Instant = Instant.now()
) extends OrderEvent
case class ItemAdded(
orderId: String,
item: OrderItem,
timestamp: Instant = Instant.now()
) extends OrderEvent
case class OrderConfirmed(
orderId: String,
timestamp: Instant = Instant.now()
) extends OrderEvent
// State
case class OrderItem(
productId: String,
quantity: Int,
price: BigDecimal
) extends CborSerializable
sealed trait OrderStatus extends CborSerializable
case object Draft extends OrderStatus
case object Confirmed extends OrderStatus
case object Shipped extends OrderStatus
case object Delivered extends OrderStatus
case class Order(
id: String,
customerId: String,
items: List[OrderItem],
status: OrderStatus,
createdAt: Instant,
updatedAt: Instant
) extends CborSerializable {
def total: BigDecimal = items.map(item => item.price * item.quantity).sum
def addItem(item: OrderItem): Order =
copy(items = items :+ item, updatedAt = Instant.now())
def confirm(): Order =
copy(status = Confirmed, updatedAt = Instant.now())
}
// Responses
sealed trait OrderResponse extends CborSerializable
case class OrderCreatedResponse(order: Order) extends OrderResponse
case class OrderUpdatedResponse(order: Order) extends OrderResponse
case class OrderRetrievedResponse(order: Option[Order]) extends OrderResponse
case class OrderErrorResponse(message: String) extends OrderResponse
Event Sourced Order Actor
// OrderActor.scala
package com.example.order
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.{EventSourcedBehavior, Effect}
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import java.time.Instant
object OrderActor {
val TypeKey: EntityTypeKey[OrderCommand] = EntityTypeKey[OrderCommand]("Order")
def apply(orderId: String): Behavior[OrderCommand] = {
Behaviors.setup { context =>
EventSourcedBehavior[OrderCommand, OrderEvent, Option[Order]](
persistenceId = PersistenceId(TypeKey.name, orderId),
emptyState = None,
commandHandler = commandHandler(orderId),
eventHandler = eventHandler
).withTagger(_ => Set("order"))
}
}
private def commandHandler(orderId: String): (Option[Order], OrderCommand) => Effect[OrderEvent, Option[Order]] = {
(state, command) =>
state match {
case None =>
command match {
case CreateOrder(customerId, items, replyTo) =>
val event = OrderCreated(orderId, customerId, items)
Effect
.persist(event)
.thenReply(replyTo)(newState => OrderCreatedResponse(newState.get))
case GetOrder(_, replyTo) =>
Effect.reply(replyTo)(OrderRetrievedResponse(None))
case _ =>
Effect.unhandled.thenNoReply()
}
case Some(order) =>
command match {
case CreateOrder(_, _, replyTo) =>
Effect.reply(replyTo)(OrderErrorResponse("Order already exists"))
case AddItem(_, item, replyTo) if order.status == Draft =>
val event = ItemAdded(orderId, item)
Effect
.persist(event)
.thenReply(replyTo)(newState => OrderUpdatedResponse(newState.get))
case ConfirmOrder(_, replyTo) if order.status == Draft =>
val event = OrderConfirmed(orderId)
Effect
.persist(event)
.thenReply(replyTo)(newState => OrderUpdatedResponse(newState.get))
case GetOrder(_, replyTo) =>
Effect.reply(replyTo)(OrderRetrievedResponse(Some(order)))
case _ =>
Effect.reply(command.replyTo)(OrderErrorResponse("Invalid command for current state"))
}
}
}
private val eventHandler: (Option[Order], OrderEvent) => Option[Order] = {
(state, event) =>
event match {
case OrderCreated(orderId, customerId, items, timestamp) =>
Some(Order(orderId, customerId, items, Draft, timestamp, timestamp))
case ItemAdded(_, item, timestamp) =>
state.map(_.addItem(item).copy(updatedAt = timestamp))
case OrderConfirmed(_, timestamp) =>
state.map(_.confirm().copy(updatedAt = timestamp))
}
}
}
HTTP API Layer
// OrderRoutes.scala
package com.example.order.api
import akka.actor.typed.{ActorRef, ActorSystem}
import akka.actor.typed.scaladsl.AskPattern._
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityRef}
import akka.http.scaladsl.model.{StatusCodes, HttpResponse}
import akka.http.scaladsl.server.{Directives, Route}
import akka.util.Timeout
import spray.json._
import com.example.order._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Success, Failure}
import java.util.UUID
// JSON serialization
trait OrderJsonProtocol extends DefaultJsonProtocol {
implicit val orderItemFormat = jsonFormat3(OrderItem)
implicit val orderFormat = jsonFormat6(Order.apply)
implicit val createOrderRequestFormat = jsonFormat2(CreateOrderRequest)
implicit val addItemRequestFormat = jsonFormat1(AddItemRequest)
}
case class CreateOrderRequest(customerId: String, items: List[OrderItem])
case class AddItemRequest(item: OrderItem)
class OrderRoutes(sharding: ClusterSharding)(implicit system: ActorSystem[_])
extends Directives with OrderJsonProtocol {
implicit val timeout: Timeout = 3.seconds
implicit val ec: ExecutionContext = system.executionContext
def routes: Route = {
pathPrefix("orders") {
concat(
// POST /orders - Create new order
path("orders") {
post {
entity(as[CreateOrderRequest]) { request =>
val orderId = UUID.randomUUID().toString
val orderRef: EntityRef[OrderCommand] = sharding.entityRefFor(OrderActor.TypeKey, orderId)
val future: Future[OrderResponse] = orderRef.ask(
CreateOrder(request.customerId, request.items, _)
)
onComplete(future) {
case Success(OrderCreatedResponse(order)) =>
complete(StatusCodes.Created, order)
case Success(OrderErrorResponse(message)) =>
complete(StatusCodes.BadRequest, Map("error" -> message))
case Failure(ex) =>
complete(StatusCodes.InternalServerError, Map("error" -> ex.getMessage))
}
}
}
},
// GET /orders/{orderId}
path("orders" / Segment) { orderId =>
get {
val orderRef = sharding.entityRefFor(OrderActor.TypeKey, orderId)
val future: Future[OrderResponse] = orderRef.ask(GetOrder(orderId, _))
onComplete(future) {
case Success(OrderRetrievedResponse(Some(order))) =>
complete(order)
case Success(OrderRetrievedResponse(None)) =>
complete(StatusCodes.NotFound, Map("error" -> "Order not found"))
case Failure(ex) =>
complete(StatusCodes.InternalServerError, Map("error" -> ex.getMessage))
}
}
},
// POST /orders/{orderId}/items
path("orders" / Segment / "items") { orderId =>
post {
entity(as[AddItemRequest]) { request =>
val orderRef = sharding.entityRefFor(OrderActor.TypeKey, orderId)
val future: Future[OrderResponse] = orderRef.ask(
AddItem(orderId, request.item, _)
)
onComplete(future) {
case Success(OrderUpdatedResponse(order)) =>
complete(order)
case Success(OrderErrorResponse(message)) =>
complete(StatusCodes.BadRequest, Map("error" -> message))
case Failure(ex) =>
complete(StatusCodes.InternalServerError, Map("error" -> ex.getMessage))
}
}
}
},
// POST /orders/{orderId}/confirm
path("orders" / Segment / "confirm") { orderId =>
post {
val orderRef = sharding.entityRefFor(OrderActor.TypeKey, orderId)
val future: Future[OrderResponse] = orderRef.ask(ConfirmOrder(orderId, _))
onComplete(future) {
case Success(OrderUpdatedResponse(order)) =>
complete(order)
case Success(OrderErrorResponse(message)) =>
complete(StatusCodes.BadRequest, Map("error" -> message))
case Failure(ex) =>
complete(StatusCodes.InternalServerError, Map("error" -> ex.getMessage))
}
}
}
)
}
}
}
Event Sourcing and CQRS
Read Model Projections
// OrderProjection.scala
package com.example.order.projection
import akka.actor.typed.ActorSystem
import akka.projection.eventsourced.EventEnvelope
import akka.projection.scaladsl.Handler
import akka.projection.jdbc.scaladsl.{JdbcHandler, JdbcProjection}
import akka.projection.{ProjectionBehavior, ProjectionId}
import com.example.order._
import slick.jdbc.PostgresProfile.api._
import scala.concurrent.{ExecutionContext, Future}
// Read model tables
case class OrderSummary(
id: String,
customerId: String,
itemCount: Int,
totalAmount: BigDecimal,
status: String,
createdAt: Long,
updatedAt: Long
)
class OrderSummaryTable(tag: Tag) extends Table[OrderSummary](tag, "order_summary") {
def id = column[String]("id", O.PrimaryKey)
def customerId = column[String]("customer_id")
def itemCount = column[Int]("item_count")
def totalAmount = column[BigDecimal]("total_amount")
def status = column[String]("status")
def createdAt = column[Long]("created_at")
def updatedAt = column[Long]("updated_at")
def * = (id, customerId, itemCount, totalAmount, status, createdAt, updatedAt) <> (OrderSummary.tupled, OrderSummary.unapply)
}
val orderSummaries = TableQuery[OrderSummaryTable]
// Projection handler
class OrderProjectionHandler(database: Database)(implicit ec: ExecutionContext)
extends JdbcHandler[EventEnvelope[OrderEvent], Database]() {
override def process(database: Database, envelope: EventEnvelope[OrderEvent]): Future[Done] = {
envelope.event match {
case OrderCreated(orderId, customerId, items, timestamp) =>
val summary = OrderSummary(
id = orderId,
customerId = customerId,
itemCount = items.length,
totalAmount = items.map(i => i.price * i.quantity).sum,
status = "Draft",
createdAt = timestamp.toEpochMilli,
updatedAt = timestamp.toEpochMilli
)
database.run(orderSummaries += summary).map(_ => Done)
case ItemAdded(orderId, item, timestamp) =>
val updateAction = orderSummaries
.filter(_.id === orderId)
.map(o => (o.itemCount, o.totalAmount, o.updatedAt))
.update((999, item.price * item.quantity, timestamp.toEpochMilli)) // Simplified
database.run(updateAction).map(_ => Done)
case OrderConfirmed(orderId, timestamp) =>
val updateAction = orderSummaries
.filter(_.id === orderId)
.map(o => (o.status, o.updatedAt))
.update(("Confirmed", timestamp.toEpochMilli))
database.run(updateAction).map(_ => Done)
}
}
}
// Projection setup
object OrderProjection {
def create(system: ActorSystem[_], database: Database): ProjectionBehavior.Command = {
val handler = new OrderProjectionHandler(database)(system.executionContext)
JdbcProjection.exactlyOnce(
projectionId = ProjectionId("OrderProjection", "orders"),
sourceProvider = EventSourcedProvider.eventsByTag[OrderEvent](
system = system,
readJournalPluginId = "jdbc-read-journal",
tag = "order"
),
handler = () => handler,
sessionFactory = () => database
)(system)
}
}
Query Service
// OrderQueryService.scala
package com.example.order.query
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import com.example.order.projection.{OrderSummary, OrderSummaryTable}
import slick.jdbc.PostgresProfile.api._
import scala.concurrent.{ExecutionContext, Future}
// Query commands
sealed trait OrderQuery
case class GetOrderSummary(orderId: String, replyTo: ActorRef[OrderQueryResponse]) extends OrderQuery
case class GetCustomerOrders(customerId: String, replyTo: ActorRef[OrderQueryResponse]) extends OrderQuery
case class GetOrdersByStatus(status: String, replyTo: ActorRef[OrderQueryResponse]) extends OrderQuery
// Query responses
sealed trait OrderQueryResponse
case class OrderSummaryResponse(summary: Option[OrderSummary]) extends OrderQueryResponse
case class OrderSummariesResponse(summaries: List[OrderSummary]) extends OrderQueryResponse
case class QueryErrorResponse(message: String) extends OrderQueryResponse
class OrderQueryService(database: Database)(implicit ec: ExecutionContext) {
private val orderSummaries = TableQuery[OrderSummaryTable]
def apply(): Behavior[OrderQuery] = {
Behaviors.receiveMessage {
case GetOrderSummary(orderId, replyTo) =>
val future = database.run(
orderSummaries.filter(_.id === orderId).result.headOption
)
future.foreach(result => replyTo ! OrderSummaryResponse(result))
Behaviors.same
case GetCustomerOrders(customerId, replyTo) =>
val future = database.run(
orderSummaries
.filter(_.customerId === customerId)
.sortBy(_.createdAt.desc)
.result
)
future.foreach(results => replyTo ! OrderSummariesResponse(results.toList))
Behaviors.same
case GetOrdersByStatus(status, replyTo) =>
val future = database.run(
orderSummaries
.filter(_.status === status)
.sortBy(_.updatedAt.desc)
.result
)
future.foreach(results => replyTo ! OrderSummariesResponse(results.toList))
Behaviors.same
}
}
}
Service Communication Patterns
Event-Driven Communication
// EventBus.scala
package com.example.messaging
import akka.actor.typed.{ActorRef, ActorSystem}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.pubsub.Topic
import com.example.CborSerializable
// Domain events
sealed trait DomainEvent extends CborSerializable {
def aggregateId: String
def timestamp: Long
}
case class OrderConfirmedEvent(
aggregateId: String,
customerId: String,
totalAmount: BigDecimal,
timestamp: Long = System.currentTimeMillis()
) extends DomainEvent
case class PaymentProcessedEvent(
aggregateId: String,
orderId: String,
amount: BigDecimal,
timestamp: Long = System.currentTimeMillis()
) extends DomainEvent
// Event bus
object EventBus {
def create(system: ActorSystem[_]): ActorRef[Topic.Command[DomainEvent]] = {
system.systemActorOf(Topic[DomainEvent]("domain-events"), "event-bus")
}
}
// Event listener
class OrderEventListener {
def apply(): Behavior[DomainEvent] = {
Behaviors.receiveMessage {
case OrderConfirmedEvent(orderId, customerId, amount, _) =>
// Trigger payment processing
println(s"Order $orderId confirmed for customer $customerId with amount $amount")
// Could send command to payment service
Behaviors.same
case PaymentProcessedEvent(paymentId, orderId, amount, _) =>
// Update order status to paid
println(s"Payment $paymentId processed for order $orderId with amount $amount")
Behaviors.same
case _ =>
Behaviors.same
}
}
}
// Usage in order service
class OrderServiceWithEvents(eventBus: ActorRef[Topic.Command[DomainEvent]]) {
def publishOrderConfirmed(order: Order): Unit = {
val event = OrderConfirmedEvent(
order.id,
order.customerId,
order.total
)
eventBus ! Topic.Publish(event)
}
}
Saga Pattern for Distributed Transactions
// OrderSaga.scala
package com.example.saga
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler}
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.{EventSourcedBehavior, Effect}
import scala.concurrent.duration._
// Saga commands
sealed trait SagaCommand
case class StartOrderSaga(orderId: String, customerId: String, amount: BigDecimal) extends SagaCommand
case class PaymentSucceeded(orderId: String) extends SagaCommand
case class PaymentFailed(orderId: String, reason: String) extends SagaCommand
case class InventoryReserved(orderId: String) extends SagaCommand
case class InventoryReservationFailed(orderId: String, reason: String) extends SagaCommand
case object SagaTimeout extends SagaCommand
// Saga events
sealed trait SagaEvent
case class SagaStarted(orderId: String, customerId: String, amount: BigDecimal) extends SagaEvent
case class PaymentRequested(orderId: String) extends SagaEvent
case class InventoryRequested(orderId: String) extends SagaEvent
case class SagaCompleted(orderId: String) extends SagaEvent
case class SagaFailed(orderId: String, reason: String) extends SagaEvent
case class CompensationStarted(orderId: String) extends SagaEvent
// Saga state
sealed trait SagaState
case object Idle extends SagaState
case object PaymentPending extends SagaState
case object InventoryPending extends SagaState
case object Completed extends SagaState
case object Failed extends SagaState
case object Compensating extends SagaState
case class OrderSagaData(
orderId: String,
customerId: String,
amount: BigDecimal,
state: SagaState,
paymentCompleted: Boolean = false,
inventoryReserved: Boolean = false
)
object OrderSaga {
def apply(
sagaId: String,
paymentService: ActorRef[PaymentCommand],
inventoryService: ActorRef[InventoryCommand]
): Behavior[SagaCommand] = {
Behaviors.withTimers { timers =>
Behaviors.setup { context =>
EventSourcedBehavior[SagaCommand, SagaEvent, Option[OrderSagaData]](
persistenceId = PersistenceId("OrderSaga", sagaId),
emptyState = None,
commandHandler = commandHandler(paymentService, inventoryService, timers),
eventHandler = eventHandler
)
}
}
}
private def commandHandler(
paymentService: ActorRef[PaymentCommand],
inventoryService: ActorRef[InventoryCommand],
timers: TimerScheduler[SagaCommand]
): (Option[OrderSagaData], SagaCommand) => Effect[SagaEvent, Option[OrderSagaData]] = {
(state, command) =>
state match {
case None =>
command match {
case StartOrderSaga(orderId, customerId, amount) =>
timers.startSingleTimer("saga-timeout", SagaTimeout, 30.seconds)
Effect
.persist(SagaStarted(orderId, customerId, amount))
.thenRun { _ =>
// Start payment processing
paymentService ! ProcessPayment(orderId, customerId, amount)
// Reserve inventory
inventoryService ! ReserveInventory(orderId)
}
.thenPersist(PaymentRequested(orderId))
.thenPersist(InventoryRequested(orderId))
case _ => Effect.unhandled
}
case Some(data) =>
command match {
case PaymentSucceeded(orderId) if data.orderId == orderId =>
val newData = data.copy(paymentCompleted = true)
if (newData.inventoryReserved) {
Effect.persist(SagaCompleted(orderId))
} else {
Effect.noReply
}
case PaymentFailed(orderId, reason) if data.orderId == orderId =>
Effect
.persist(SagaFailed(orderId, reason))
.thenRun { _ =>
// Start compensation
if (data.inventoryReserved) {
inventoryService ! ReleaseInventory(orderId)
}
}
case InventoryReserved(orderId) if data.orderId == orderId =>
val newData = data.copy(inventoryReserved = true)
if (newData.paymentCompleted) {
Effect.persist(SagaCompleted(orderId))
} else {
Effect.noReply
}
case InventoryReservationFailed(orderId, reason) if data.orderId == orderId =>
Effect
.persist(SagaFailed(orderId, reason))
.thenRun { _ =>
// Start compensation
if (data.paymentCompleted) {
paymentService ! RefundPayment(orderId)
}
}
case SagaTimeout =>
Effect
.persist(SagaFailed(data.orderId, "Saga timeout"))
.thenRun { _ =>
// Compensate all completed steps
if (data.paymentCompleted) {
paymentService ! RefundPayment(data.orderId)
}
if (data.inventoryReserved) {
inventoryService ! ReleaseInventory(data.orderId)
}
}
case _ => Effect.unhandled
}
}
}
private val eventHandler: (Option[OrderSagaData], SagaEvent) => Option[OrderSagaData] = {
(state, event) =>
event match {
case SagaStarted(orderId, customerId, amount) =>
Some(OrderSagaData(orderId, customerId, amount, PaymentPending))
case PaymentRequested(_) =>
state.map(_.copy(state = PaymentPending))
case InventoryRequested(_) =>
state.map(_.copy(state = InventoryPending))
case SagaCompleted(_) =>
state.map(_.copy(state = Completed))
case SagaFailed(_, _) =>
state.map(_.copy(state = Failed))
case CompensationStarted(_) =>
state.map(_.copy(state = Compensating))
}
}
}
// Supporting service commands
sealed trait PaymentCommand
case class ProcessPayment(orderId: String, customerId: String, amount: BigDecimal) extends PaymentCommand
case class RefundPayment(orderId: String) extends PaymentCommand
sealed trait InventoryCommand
case class ReserveInventory(orderId: String) extends InventoryCommand
case class ReleaseInventory(orderId: String) extends InventoryCommand
Clustering and High Availability
Cluster Setup
// ClusterApp.scala
package com.example
import akka.actor.typed.{ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.typed.{Cluster, Join}
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.http.scaladsl.Http
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.management.scaladsl.AkkaManagement
import com.example.order.{OrderActor, OrderRoutes}
import scala.concurrent.duration._
import scala.util.{Success, Failure}
object ClusterApp {
def apply(): Behavior[Nothing] = {
Behaviors.setup[Nothing] { context =>
implicit val system = context.system
// Start Akka Management
AkkaManagement(system).start()
// Start cluster bootstrap
ClusterBootstrap(system).start()
// Initialize cluster sharding
val sharding = ClusterSharding(system)
sharding.init(Entity(OrderActor.TypeKey) { entityContext =>
OrderActor(entityContext.entityId)
}.withStopMessage(Stop))
// Start HTTP server
val orderRoutes = new OrderRoutes(sharding)
Http()
.newServerAt("0.0.0.0", 8080)
.bind(orderRoutes.routes)
.onComplete {
case Success(binding) =>
context.log.info(s"HTTP server started at ${binding.localAddress}")
case Failure(ex) =>
context.log.error("Failed to start HTTP server", ex)
}(system.executionContext)
// Cluster event handling
Cluster(system).subscriptions ! Subscribe(
context.messageAdapter[ClusterEvent](ClusterEventAdapter),
classOf[ClusterEvent]
)
Behaviors.empty
}
}
case class ClusterEventAdapter(event: ClusterEvent)
def main(args: Array[String]): Unit = {
ActorSystem(ClusterApp(), "ClusterSystem")
}
}
Service Discovery and Load Balancing
// ServiceDiscovery.scala
package com.example.discovery
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.discovery.{Discovery, ServiceDiscovery}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Flow, Sink, Source}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
class ServiceClient(system: ActorSystem[_]) {
implicit val sys = system
implicit val ec: ExecutionContext = system.executionContext
private val discovery: ServiceDiscovery = Discovery(system).discovery
def callService(serviceName: String, path: String): Future[HttpResponse] = {
for {
resolved <- discovery.lookup(serviceName, 3.seconds)
target = resolved.addresses.headOption.getOrElse(
throw new RuntimeException(s"No instances found for service $serviceName")
)
response <- Http().singleRequest(
HttpRequest(uri = s"http://${target.host}:${target.port.getOrElse(8080)}$path")
)
} yield response
}
// Circuit breaker pattern
private val circuitBreaker = new akka.pattern.CircuitBreaker(
scheduler = system.scheduler,
maxFailures = 5,
callTimeout = 10.seconds,
resetTimeout = 30.seconds
)(system.executionContext)
def callServiceWithCircuitBreaker(serviceName: String, path: String): Future[HttpResponse] = {
circuitBreaker.withCircuitBreaker(callService(serviceName, path))
}
}
Monitoring and Observability
Health Checks
// HealthCheck.scala
package com.example.health
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, StatusCodes}
import akka.http.scaladsl.server.{Directives, Route}
import akka.cluster.{Cluster, MemberStatus}
import akka.management.cluster.ClusterHealthCheck
import spray.json._
case class HealthStatus(
status: String,
cluster: ClusterHealth,
services: Map[String, ServiceHealth]
)
case class ClusterHealth(
leader: Boolean,
members: Int,
unreachable: Int
)
case class ServiceHealth(
status: String,
lastCheck: Long
)
trait HealthJsonProtocol extends DefaultJsonProtocol {
implicit val serviceHealthFormat = jsonFormat2(ServiceHealth)
implicit val clusterHealthFormat = jsonFormat3(ClusterHealth)
implicit val healthStatusFormat = jsonFormat3(HealthStatus)
}
class HealthCheckRoutes(system: ActorSystem[_]) extends Directives with HealthJsonProtocol {
def routes: Route = {
path("health") {
get {
val cluster = Cluster(system)
val clusterHealth = ClusterHealth(
leader = cluster.state.leader.contains(cluster.selfAddress),
members = cluster.state.members.count(_.status == MemberStatus.Up),
unreachable = cluster.state.unreachable.size
)
val serviceHealth = Map(
"database" -> ServiceHealth("healthy", System.currentTimeMillis()),
"cache" -> ServiceHealth("healthy", System.currentTimeMillis())
)
val overallStatus = if (clusterHealth.unreachable == 0) "healthy" else "degraded"
val health = HealthStatus(overallStatus, clusterHealth, serviceHealth)
complete(health)
}
} ~
path("ready") {
get {
val cluster = Cluster(system)
if (cluster.selfMember.status == MemberStatus.Up) {
complete(StatusCodes.OK, "Ready")
} else {
complete(StatusCodes.ServiceUnavailable, "Not ready")
}
}
} ~
path("alive") {
get {
complete(StatusCodes.OK, "Alive")
}
}
}
}
Metrics and Tracing
// Metrics.scala
package com.example.metrics
import akka.actor.typed.ActorSystem
import kamon.Kamon
import kamon.prometheus.PrometheusReporter
import cinnamon.akka.typed._
import cinnamon.prometheus._
object MetricsSetup {
def initialize(system: ActorSystem[_]): Unit = {
// Initialize Kamon
Kamon.init()
// Add Prometheus reporter
Kamon.registerModule("prometheus", new PrometheusReporter())
// Custom metrics
val orderCounter = Kamon.counter("orders.created")
val orderTimer = Kamon.timer("orders.processing.time")
val errorCounter = Kamon.counter("orders.errors")
// System metrics
system.setMetricsReporter(CinnamonPrometheusReporter())
}
def recordOrderCreated(): Unit = {
Kamon.counter("orders.created").increment()
}
def recordOrderProcessingTime(durationMs: Long): Unit = {
Kamon.timer("orders.processing.time").record(durationMs)
}
def recordError(errorType: String): Unit = {
Kamon.counter("orders.errors").withTag("type", errorType).increment()
}
}
Deployment and DevOps
Kubernetes Deployment
# kubernetes/order-service.yaml
apiVersion: apps/v1
kind: Deployment
Comments
Be the first to comment on this lesson!