Enterprise Integration Patterns: Messaging, APIs, and System Architecture
Modern enterprise applications require sophisticated integration strategies to connect disparate systems, services, and data sources. This comprehensive lesson covers enterprise integration patterns, messaging systems, API design principles, and architectural approaches for building scalable, maintainable distributed systems.
Message-Based Integration Patterns
Asynchronous Messaging with Akka
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{KillSwitches, Materializer}
import akka.util.Timeout
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import java.time.Instant
import java.util.UUID
// Message definitions for enterprise integration
object IntegrationMessages {
// Base message types
sealed trait Message {
def messageId: String
def timestamp: Instant
def correlationId: Option[String]
def headers: Map[String, String]
}
// Command messages (request actions)
sealed trait Command extends Message
case class ProcessOrder(
messageId: String = UUID.randomUUID().toString,
timestamp: Instant = Instant.now(),
correlationId: Option[String] = None,
headers: Map[String, String] = Map.empty,
orderId: String,
customerId: String,
items: List[OrderItem],
totalAmount: BigDecimal
) extends Command
case class UpdateInventory(
messageId: String = UUID.randomUUID().toString,
timestamp: Instant = Instant.now(),
correlationId: Option[String] = None,
headers: Map[String, String] = Map.empty,
productId: String,
quantity: Int,
operation: InventoryOperation
) extends Command
case class SendNotification(
messageId: String = UUID.randomUUID().toString,
timestamp: Instant = Instant.now(),
correlationId: Option[String] = None,
headers: Map[String, String] = Map.empty,
recipient: String,
channel: NotificationChannel,
content: String,
priority: Priority
) extends Command
// Event messages (notifications of state changes)
sealed trait Event extends Message
case class OrderProcessed(
messageId: String = UUID.randomUUID().toString,
timestamp: Instant = Instant.now(),
correlationId: Option[String] = None,
headers: Map[String, String] = Map.empty,
orderId: String,
status: OrderStatus,
processedAt: Instant
) extends Event
case class InventoryUpdated(
messageId: String = UUID.randomUUID().toString,
timestamp: Instant = Instant.now(),
correlationId: Option[String] = None,
headers: Map[String, String] = Map.empty,
productId: String,
previousQuantity: Int,
newQuantity: Int,
operation: InventoryOperation
) extends Event
case class PaymentProcessed(
messageId: String = UUID.randomUUID().toString,
timestamp: Instant = Instant.now(),
correlationId: Option[String] = None,
headers: Map[String, String] = Map.empty,
paymentId: String,
orderId: String,
amount: BigDecimal,
status: PaymentStatus
) extends Event
// Supporting types
case class OrderItem(productId: String, quantity: Int, price: BigDecimal)
sealed trait InventoryOperation
case object Increase extends InventoryOperation
case object Decrease extends InventoryOperation
case object Reserve extends InventoryOperation
case object Release extends InventoryOperation
sealed trait NotificationChannel
case object Email extends NotificationChannel
case object SMS extends NotificationChannel
case object Push extends NotificationChannel
sealed trait Priority
case object Low extends Priority
case object Normal extends Priority
case object High extends Priority
case object Critical extends Priority
sealed trait OrderStatus
case object Pending extends OrderStatus
case object Processing extends OrderStatus
case object Confirmed extends OrderStatus
case object Shipped extends OrderStatus
case object Delivered extends OrderStatus
case object Cancelled extends OrderStatus
sealed trait PaymentStatus
case object PaymentPending extends PaymentStatus
case object PaymentSuccessful extends PaymentStatus
case object PaymentFailed extends PaymentStatus
case object PaymentRefunded extends PaymentStatus
}
// Message Bus implementation
class MessageBus(implicit system: ActorSystem[_]) {
import IntegrationMessages._
private val subscriptions = scala.collection.mutable.Map[Class[_], List[ActorRef[Message]]]()
def publish[T <: Message](message: T): Unit = {
val messageClass = message.getClass
val subscribers = subscriptions.getOrElse(messageClass, List.empty)
subscribers.foreach { subscriber =>
subscriber ! message
}
// Also publish to wildcard subscribers
val wildcardSubscribers = subscriptions.getOrElse(classOf[Message], List.empty)
wildcardSubscribers.foreach { subscriber =>
subscriber ! message
}
}
def subscribe[T <: Message](messageClass: Class[T], subscriber: ActorRef[Message]): Unit = {
val currentSubscribers = subscriptions.getOrElse(messageClass, List.empty)
subscriptions(messageClass) = subscriber :: currentSubscribers
}
def unsubscribe[T <: Message](messageClass: Class[T], subscriber: ActorRef[Message]): Unit = {
val currentSubscribers = subscriptions.getOrElse(messageClass, List.empty)
subscriptions(messageClass) = currentSubscribers.filterNot(_ == subscriber)
}
}
// Message Handler base trait
trait MessageHandler[T <: IntegrationMessages.Message] {
def handle(message: T): Future[Unit]
def canHandle(message: IntegrationMessages.Message): Boolean
}
// Order Processing Service
object OrderProcessingService {
import IntegrationMessages._
sealed trait Protocol
case class HandleMessage(message: Message) extends Protocol
case class ProcessOrderCommand(command: ProcessOrder) extends Protocol
case class OrderProcessingComplete(orderId: String, success: Boolean) extends Protocol
def apply(messageBus: MessageBus, inventoryService: ActorRef[InventoryService.Protocol], paymentService: ActorRef[PaymentService.Protocol]): Behavior[Protocol] = {
Behaviors.setup { context =>
// Subscribe to relevant messages
messageBus.subscribe(classOf[ProcessOrder], context.self.asInstanceOf[ActorRef[Message]])
active(messageBus, inventoryService, paymentService)
}
}
private def active(
messageBus: MessageBus,
inventoryService: ActorRef[InventoryService.Protocol],
paymentService: ActorRef[PaymentService.Protocol]
): Behavior[Protocol] = {
Behaviors.receive { (context, message) =>
message match {
case HandleMessage(msg: ProcessOrder) =>
processOrder(context, messageBus, inventoryService, paymentService, msg)
Behaviors.same
case ProcessOrderCommand(command) =>
processOrder(context, messageBus, inventoryService, paymentService, command)
Behaviors.same
case OrderProcessingComplete(orderId, success) =>
if (success) {
val event = OrderProcessed(
orderId = orderId,
status = Confirmed,
processedAt = Instant.now()
)
messageBus.publish(event)
} else {
val event = OrderProcessed(
orderId = orderId,
status = Cancelled,
processedAt = Instant.now()
)
messageBus.publish(event)
}
Behaviors.same
case _ => Behaviors.unhandled
}
}
}
private def processOrder(
context: ActorContext[Protocol],
messageBus: MessageBus,
inventoryService: ActorRef[InventoryService.Protocol],
paymentService: ActorRef[PaymentService.Protocol],
order: ProcessOrder
): Unit = {
context.log.info(s"Processing order ${order.orderId}")
// Step 1: Reserve inventory
order.items.foreach { item =>
val reserveCommand = UpdateInventory(
correlationId = Some(order.orderId),
productId = item.productId,
quantity = item.quantity,
operation = Reserve
)
inventoryService ! InventoryService.HandleCommand(reserveCommand)
}
// Step 2: Process payment (simplified)
paymentService ! PaymentService.ProcessPayment(order.orderId, order.totalAmount)
// In a real implementation, you would coordinate these steps
// and handle failures appropriately
context.self ! OrderProcessingComplete(order.orderId, success = true)
}
}
// Inventory Service
object InventoryService {
import IntegrationMessages._
sealed trait Protocol
case class HandleCommand(command: UpdateInventory) extends Protocol
case class CheckInventory(productId: String, requiredQuantity: Int, replyTo: ActorRef[InventoryCheckResult]) extends Protocol
case class InventoryCheckResult(productId: String, available: Boolean, currentQuantity: Int)
def apply(messageBus: MessageBus): Behavior[Protocol] = {
Behaviors.setup { context =>
messageBus.subscribe(classOf[UpdateInventory], context.self.asInstanceOf[ActorRef[Message]])
// Initialize with some sample inventory
val initialInventory = Map(
"product-1" -> 100,
"product-2" -> 50,
"product-3" -> 200
)
active(messageBus, initialInventory)
}
}
private def active(messageBus: MessageBus, inventory: Map[String, Int]): Behavior[Protocol] = {
Behaviors.receive { (context, message) =>
message match {
case HandleCommand(command) =>
val currentQuantity = inventory.getOrElse(command.productId, 0)
val newQuantity = command.operation match {
case Increase => currentQuantity + command.quantity
case Decrease => math.max(0, currentQuantity - command.quantity)
case Reserve =>
if (currentQuantity >= command.quantity) {
currentQuantity - command.quantity
} else {
context.log.warn(s"Insufficient inventory for product ${command.productId}")
currentQuantity
}
case Release => currentQuantity + command.quantity
}
val updatedInventory = inventory + (command.productId -> newQuantity)
// Publish inventory updated event
val event = InventoryUpdated(
correlationId = command.correlationId,
productId = command.productId,
previousQuantity = currentQuantity,
newQuantity = newQuantity,
operation = command.operation
)
messageBus.publish(event)
active(messageBus, updatedInventory)
case CheckInventory(productId, requiredQuantity, replyTo) =>
val currentQuantity = inventory.getOrElse(productId, 0)
val available = currentQuantity >= requiredQuantity
replyTo ! InventoryCheckResult(productId, available, currentQuantity)
Behaviors.same
}
}
}
}
// Payment Service
object PaymentService {
import IntegrationMessages._
sealed trait Protocol
case class ProcessPayment(orderId: String, amount: BigDecimal) extends Protocol
case class RefundPayment(paymentId: String, amount: BigDecimal) extends Protocol
def apply(messageBus: MessageBus): Behavior[Protocol] = {
Behaviors.setup { context =>
active(messageBus)
}
}
private def active(messageBus: MessageBus): Behavior[Protocol] = {
Behaviors.receive { (context, message) =>
message match {
case ProcessPayment(orderId, amount) =>
// Simulate payment processing
val paymentId = UUID.randomUUID().toString
// In a real implementation, you would integrate with payment gateways
val success = scala.util.Random.nextBoolean() // Random success/failure for demo
val status = if (success) PaymentSuccessful else PaymentFailed
val event = PaymentProcessed(
paymentId = paymentId,
orderId = orderId,
amount = amount,
status = status
)
messageBus.publish(event)
Behaviors.same
case RefundPayment(paymentId, amount) =>
// Handle refund logic
val event = PaymentProcessed(
paymentId = paymentId,
orderId = "", // Would need to track original order
amount = amount,
status = PaymentRefunded
)
messageBus.publish(event)
Behaviors.same
}
}
}
}
// Notification Service
object NotificationService {
import IntegrationMessages._
sealed trait Protocol
case class HandleNotification(notification: SendNotification) extends Protocol
def apply(messageBus: MessageBus): Behavior[Protocol] = {
Behaviors.setup { context =>
messageBus.subscribe(classOf[SendNotification], context.self.asInstanceOf[ActorRef[Message]])
// Also subscribe to events to send automatic notifications
messageBus.subscribe(classOf[OrderProcessed], context.self.asInstanceOf[ActorRef[Message]])
messageBus.subscribe(classOf[PaymentProcessed], context.self.asInstanceOf[ActorRef[Message]])
active(messageBus)
}
}
private def active(messageBus: MessageBus): Behavior[Protocol] = {
Behaviors.receive { (context, message) =>
message match {
case HandleNotification(notification) =>
sendNotification(context, notification)
Behaviors.same
case _ => Behaviors.unhandled
}
}
}
private def sendNotification(context: ActorContext[Protocol], notification: SendNotification): Unit = {
context.log.info(s"Sending ${notification.channel} notification to ${notification.recipient}: ${notification.content}")
// In a real implementation, you would integrate with email services, SMS providers, etc.
notification.channel match {
case Email => sendEmail(notification.recipient, notification.content)
case SMS => sendSMS(notification.recipient, notification.content)
case Push => sendPushNotification(notification.recipient, notification.content)
}
}
private def sendEmail(recipient: String, content: String): Unit = {
// Email implementation
}
private def sendSMS(recipient: String, content: String): Unit = {
// SMS implementation
}
private def sendPushNotification(recipient: String, content: String): Unit = {
// Push notification implementation
}
}
Event Sourcing and CQRS Integration
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}
import akka.persistence.typed.PersistenceId
import akka.actor.typed.Behavior
import spray.json._
import DefaultJsonProtocol._
// Event Sourcing for Order Management
object OrderAggregate {
// Commands
sealed trait Command
case class CreateOrder(orderId: String, customerId: String, items: List[OrderItem]) extends Command
case class AddItem(item: OrderItem) extends Command
case class RemoveItem(productId: String) extends Command
case class ConfirmOrder() extends Command
case class CancelOrder(reason: String) extends Command
case class ShipOrder(trackingNumber: String) extends Command
// Events
sealed trait Event
case class OrderCreated(orderId: String, customerId: String, items: List[OrderItem]) extends Event
case class ItemAdded(item: OrderItem) extends Event
case class ItemRemoved(productId: String) extends Event
case class OrderConfirmed(confirmedAt: Instant) extends Event
case class OrderCancelled(reason: String, cancelledAt: Instant) extends Event
case class OrderShipped(trackingNumber: String, shippedAt: Instant) extends Event
// State
case class OrderState(
orderId: String,
customerId: String,
items: List[OrderItem],
status: OrderStatus,
totalAmount: BigDecimal,
createdAt: Instant,
confirmedAt: Option[Instant] = None,
shippedAt: Option[Instant] = None,
cancelledAt: Option[Instant] = None,
trackingNumber: Option[String] = None
) {
def addItem(item: OrderItem): OrderState = {
val updatedItems = item :: items.filterNot(_.productId == item.productId)
copy(
items = updatedItems,
totalAmount = updatedItems.map(i => i.price * i.quantity).sum
)
}
def removeItem(productId: String): OrderState = {
val updatedItems = items.filterNot(_.productId == productId)
copy(
items = updatedItems,
totalAmount = updatedItems.map(i => i.price * i.quantity).sum
)
}
def canConfirm: Boolean = status == Pending && items.nonEmpty
def canCancel: Boolean = status == Pending || status == Processing
def canShip: Boolean = status == Confirmed
}
def apply(orderId: String): Behavior[Command] = {
EventSourcedBehavior[Command, Event, Option[OrderState]](
persistenceId = PersistenceId.ofUniqueId(s"order-$orderId"),
emptyState = None,
commandHandler = commandHandler,
eventHandler = eventHandler
)
}
private def commandHandler: (Option[OrderState], Command) => Effect[Event, Option[OrderState]] = {
(state, command) =>
state match {
case None =>
command match {
case CreateOrder(orderId, customerId, items) =>
Effect.persist(OrderCreated(orderId, customerId, items))
case _ =>
Effect.unhandled
}
case Some(currentState) =>
command match {
case CreateOrder(_, _, _) =>
Effect.unhandled // Order already exists
case AddItem(item) =>
if (currentState.status == Pending) {
Effect.persist(ItemAdded(item))
} else {
Effect.unhandled
}
case RemoveItem(productId) =>
if (currentState.status == Pending && currentState.items.exists(_.productId == productId)) {
Effect.persist(ItemRemoved(productId))
} else {
Effect.unhandled
}
case ConfirmOrder() =>
if (currentState.canConfirm) {
Effect.persist(OrderConfirmed(Instant.now()))
} else {
Effect.unhandled
}
case CancelOrder(reason) =>
if (currentState.canCancel) {
Effect.persist(OrderCancelled(reason, Instant.now()))
} else {
Effect.unhandled
}
case ShipOrder(trackingNumber) =>
if (currentState.canShip) {
Effect.persist(OrderShipped(trackingNumber, Instant.now()))
} else {
Effect.unhandled
}
}
}
}
private def eventHandler: (Option[OrderState], Event) => Option[OrderState] = {
(state, event) =>
event match {
case OrderCreated(orderId, customerId, items) =>
val totalAmount = items.map(i => i.price * i.quantity).sum
Some(OrderState(
orderId = orderId,
customerId = customerId,
items = items,
status = Pending,
totalAmount = totalAmount,
createdAt = Instant.now()
))
case ItemAdded(item) =>
state.map(_.addItem(item))
case ItemRemoved(productId) =>
state.map(_.removeItem(productId))
case OrderConfirmed(confirmedAt) =>
state.map(_.copy(status = Confirmed, confirmedAt = Some(confirmedAt)))
case OrderCancelled(_, cancelledAt) =>
state.map(_.copy(status = Cancelled, cancelledAt = Some(cancelledAt)))
case OrderShipped(trackingNumber, shippedAt) =>
state.map(_.copy(
status = Shipped,
trackingNumber = Some(trackingNumber),
shippedAt = Some(shippedAt)
))
}
}
}
// Read Model for CQRS
trait OrderReadModel {
def findByCustomerId(customerId: String): Future[List[OrderSummary]]
def findByStatus(status: OrderStatus): Future[List[OrderSummary]]
def getOrderDetails(orderId: String): Future[Option[OrderDetails]]
def getOrderHistory(orderId: String): Future[List[OrderEvent]]
}
case class OrderSummary(
orderId: String,
customerId: String,
status: OrderStatus,
totalAmount: BigDecimal,
itemCount: Int,
createdAt: Instant
)
case class OrderDetails(
orderId: String,
customerId: String,
items: List[OrderItem],
status: OrderStatus,
totalAmount: BigDecimal,
createdAt: Instant,
confirmedAt: Option[Instant],
shippedAt: Option[Instant],
trackingNumber: Option[String]
)
case class OrderEvent(
eventType: String,
timestamp: Instant,
details: Map[String, String]
)
// Read Model Implementation
class OrderReadModelImpl(database: Database)(implicit ec: ExecutionContext) extends OrderReadModel {
// Database schema would be defined here
// This is a simplified implementation
def findByCustomerId(customerId: String): Future[List[OrderSummary]] = {
// Query implementation
Future.successful(List.empty)
}
def findByStatus(status: OrderStatus): Future[List[OrderSummary]] = {
// Query implementation
Future.successful(List.empty)
}
def getOrderDetails(orderId: String): Future[Option[OrderDetails]] = {
// Query implementation
Future.successful(None)
}
def getOrderHistory(orderId: String): Future[List[OrderEvent]] = {
// Query implementation
Future.successful(List.empty)
}
}
// Event Handler for Read Model Updates
object ReadModelUpdater {
import OrderAggregate._
def apply(readModel: OrderReadModel): Behavior[Event] = {
Behaviors.receive { (context, event) =>
event match {
case OrderCreated(orderId, customerId, items) =>
// Update read model
context.log.info(s"Updating read model for order created: $orderId")
Behaviors.same
case OrderConfirmed(confirmedAt) =>
// Update read model
context.log.info(s"Updating read model for order confirmed")
Behaviors.same
case OrderShipped(trackingNumber, shippedAt) =>
// Update read model
context.log.info(s"Updating read model for order shipped: $trackingNumber")
Behaviors.same
case _ =>
Behaviors.same
}
}
}
}
RESTful API Design Patterns
Comprehensive API Implementation
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.model._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json._
import DefaultJsonProtocol._
import scala.concurrent.{ExecutionContext, Future}
import java.time.Instant
// API Models
object ApiModels {
// Request/Response DTOs
case class CreateOrderRequest(
customerId: String,
items: List[OrderItemRequest]
)
case class OrderItemRequest(
productId: String,
quantity: Int,
price: BigDecimal
)
case class UpdateOrderRequest(
items: Option[List[OrderItemRequest]] = None
)
case class OrderResponse(
orderId: String,
customerId: String,
items: List[OrderItemResponse],
status: String,
totalAmount: BigDecimal,
createdAt: String,
links: List[Link]
)
case class OrderItemResponse(
productId: String,
quantity: Int,
price: BigDecimal,
subtotal: BigDecimal
)
case class Link(
rel: String,
href: String,
method: String
)
case class ErrorResponse(
error: String,
code: String,
message: String,
timestamp: String,
path: String
)
case class PaginatedResponse[T](
data: List[T],
pagination: PaginationInfo
)
case class PaginationInfo(
page: Int,
size: Int,
totalElements: Long,
totalPages: Int,
first: Boolean,
last: Boolean,
links: List[Link]
)
// JSON formats
implicit val orderItemRequestFormat: RootJsonFormat[OrderItemRequest] = jsonFormat3(OrderItemRequest)
implicit val createOrderRequestFormat: RootJsonFormat[CreateOrderRequest] = jsonFormat2(CreateOrderRequest)
implicit val updateOrderRequestFormat: RootJsonFormat[UpdateOrderRequest] = jsonFormat1(UpdateOrderRequest)
implicit val orderItemResponseFormat: RootJsonFormat[OrderItemResponse] = jsonFormat4(OrderItemResponse)
implicit val linkFormat: RootJsonFormat[Link] = jsonFormat3(Link)
implicit val orderResponseFormat: RootJsonFormat[OrderResponse] = jsonFormat7(OrderResponse)
implicit val errorResponseFormat: RootJsonFormat[ErrorResponse] = jsonFormat5(ErrorResponse)
implicit val paginationInfoFormat: RootJsonFormat[PaginationInfo] = jsonFormat7(PaginationInfo)
implicit def paginatedResponseFormat[T: JsonFormat]: RootJsonFormat[PaginatedResponse[T]] = jsonFormat2(PaginatedResponse[T])
}
// API Service Layer
class OrderApiService(
orderService: OrderService,
readModel: OrderReadModel
)(implicit ec: ExecutionContext) {
import ApiModels._
def createOrder(request: CreateOrderRequest): Future[Either[ApiError, OrderResponse]] = {
val items = request.items.map(item => OrderItem(item.productId, item.quantity, item.price))
orderService.createOrder(request.customerId, items).map {
case Right(order) => Right(orderToResponse(order))
case Left(error) => Left(ApiError.ValidationError(error))
}
}
def getOrder(orderId: String): Future[Either[ApiError, OrderResponse]] = {
readModel.getOrderDetails(orderId).map {
case Some(order) => Right(orderDetailsToResponse(order))
case None => Left(ApiError.NotFound(s"Order $orderId not found"))
}
}
def updateOrder(orderId: String, request: UpdateOrderRequest): Future[Either[ApiError, OrderResponse]] = {
request.items match {
case Some(items) =>
val orderItems = items.map(item => OrderItem(item.productId, item.quantity, item.price))
orderService.updateOrderItems(orderId, orderItems).map {
case Right(order) => Right(orderToResponse(order))
case Left(error) => Left(ApiError.ValidationError(error))
}
case None =>
Future.successful(Left(ApiError.ValidationError("No updates provided")))
}
}
def listOrders(
customerId: Option[String],
status: Option[String],
page: Int,
size: Int
): Future[Either[ApiError, PaginatedResponse[OrderResponse]]] = {
val futureOrders = (customerId, status) match {
case (Some(cId), Some(st)) =>
// In a real implementation, you'd have a method that filters by both
readModel.findByCustomerId(cId)
case (Some(cId), None) =>
readModel.findByCustomerId(cId)
case (None, Some(st)) =>
OrderStatus.fromString(st) match {
case Some(orderStatus) => readModel.findByStatus(orderStatus)
case None => Future.successful(List.empty)
}
case (None, None) =>
// Get all orders (with pagination)
Future.successful(List.empty) // Simplified
}
futureOrders.map { orders =>
val totalElements = orders.length.toLong
val totalPages = math.ceil(totalElements.toDouble / size).toInt
val startIndex = page * size
val endIndex = math.min(startIndex + size, orders.length)
val pageData = orders.slice(startIndex, endIndex)
val responses = pageData.map(orderSummaryToResponse)
val pagination = PaginationInfo(
page = page,
size = size,
totalElements = totalElements,
totalPages = totalPages,
first = page == 0,
last = page >= totalPages - 1,
links = createPaginationLinks(page, totalPages, size, customerId, status)
)
Right(PaginatedResponse(responses, pagination))
}
}
def cancelOrder(orderId: String, reason: String): Future[Either[ApiError, Unit]] = {
orderService.cancelOrder(orderId, reason).map {
case Right(_) => Right(())
case Left(error) => Left(ApiError.ValidationError(error))
}
}
def shipOrder(orderId: String, trackingNumber: String): Future[Either[ApiError, OrderResponse]] = {
orderService.shipOrder(orderId, trackingNumber).map {
case Right(order) => Right(orderToResponse(order))
case Left(error) => Left(ApiError.ValidationError(error))
}
}
private def orderToResponse(order: Order): OrderResponse = {
OrderResponse(
orderId = order.orderId,
customerId = order.customerId,
items = order.items.map(itemToResponse),
status = order.status.toString,
totalAmount = order.totalAmount,
createdAt = order.createdAt.toString,
links = createOrderLinks(order.orderId)
)
}
private def orderDetailsToResponse(details: OrderDetails): OrderResponse = {
OrderResponse(
orderId = details.orderId,
customerId = details.customerId,
items = details.items.map(itemToResponse),
status = details.status.toString,
totalAmount = details.totalAmount,
createdAt = details.createdAt.toString,
links = createOrderLinks(details.orderId)
)
}
private def orderSummaryToResponse(summary: OrderSummary): OrderResponse = {
OrderResponse(
orderId = summary.orderId,
customerId = summary.customerId,
items = List.empty, // Summary doesn't include items
status = summary.status.toString,
totalAmount = summary.totalAmount,
createdAt = summary.createdAt.toString,
links = createOrderLinks(summary.orderId)
)
}
private def itemToResponse(item: OrderItem): OrderItemResponse = {
OrderItemResponse(
productId = item.productId,
quantity = item.quantity,
price = item.price,
subtotal = item.price * item.quantity
)
}
private def createOrderLinks(orderId: String): List[Link] = {
List(
Link("self", s"/api/v1/orders/$orderId", "GET"),
Link("update", s"/api/v1/orders/$orderId", "PUT"),
Link("cancel", s"/api/v1/orders/$orderId/cancel", "POST"),
Link("ship", s"/api/v1/orders/$orderId/ship", "POST")
)
}
private def createPaginationLinks(
currentPage: Int,
totalPages: Int,
size: Int,
customerId: Option[String],
status: Option[String]
): List[Link] = {
val baseQuery = List(
customerId.map("customerId=" + _),
status.map("status=" + _),
Some(s"size=$size")
).flatten.mkString("&")
List(
Some(Link("first", s"/api/v1/orders?page=0&$baseQuery", "GET")),
if (currentPage > 0) Some(Link("prev", s"/api/v1/orders?page=${currentPage - 1}&$baseQuery", "GET")) else None,
if (currentPage < totalPages - 1) Some(Link("next", s"/api/v1/orders?page=${currentPage + 1}&$baseQuery", "GET")) else None,
Some(Link("last", s"/api/v1/orders?page=${totalPages - 1}&$baseQuery", "GET"))
).flatten
}
}
// Error handling
sealed trait ApiError {
def message: String
def code: String
}
object ApiError {
case class ValidationError(message: String) extends ApiError {
val code = "VALIDATION_ERROR"
}
case class NotFound(message: String) extends ApiError {
val code = "NOT_FOUND"
}
case class Conflict(message: String) extends ApiError {
val code = "CONFLICT"
}
case class InternalError(message: String) extends ApiError {
val code = "INTERNAL_ERROR"
}
}
// HTTP Routes
class OrderRoutes(apiService: OrderApiService)(implicit ec: ExecutionContext) {
import ApiModels._
val routes: Route =
pathPrefix("api" / "v1" / "orders") {
concat(
// POST /api/v1/orders
pathEnd {
post {
entity(as[CreateOrderRequest]) { request =>
onComplete(apiService.createOrder(request)) {
case Success(Right(order)) =>
complete(StatusCodes.Created, order)
case Success(Left(error)) =>
complete(errorToHttpResponse(error))
case Failure(exception) =>
complete(StatusCodes.InternalServerError, createErrorResponse(exception))
}
}
}
},
// GET /api/v1/orders
pathEnd {
get {
parameters("customerId".optional, "status".optional, "page".as[Int] ? 0, "size".as[Int] ? 20) {
(customerId, status, page, size) =>
onComplete(apiService.listOrders(customerId, status, page, size)) {
case Success(Right(orders)) =>
complete(StatusCodes.OK, orders)
case Success(Left(error)) =>
complete(errorToHttpResponse(error))
case Failure(exception) =>
complete(StatusCodes.InternalServerError, createErrorResponse(exception))
}
}
}
},
// GET /api/v1/orders/{orderId}
path(Segment) { orderId =>
get {
onComplete(apiService.getOrder(orderId)) {
case Success(Right(order)) =>
complete(StatusCodes.OK, order)
case Success(Left(error)) =>
complete(errorToHttpResponse(error))
case Failure(exception) =>
complete(StatusCodes.InternalServerError, createErrorResponse(exception))
}
}
},
// PUT /api/v1/orders/{orderId}
path(Segment) { orderId =>
put {
entity(as[UpdateOrderRequest]) { request =>
onComplete(apiService.updateOrder(orderId, request)) {
case Success(Right(order)) =>
complete(StatusCodes.OK, order)
case Success(Left(error)) =>
complete(errorToHttpResponse(error))
case Failure(exception) =>
complete(StatusCodes.InternalServerError, createErrorResponse(exception))
}
}
}
},
// POST /api/v1/orders/{orderId}/cancel
path(Segment / "cancel") { orderId =>
post {
entity(as[String]) { reason =>
onComplete(apiService.cancelOrder(orderId, reason)) {
case Success(Right(_)) =>
complete(StatusCodes.NoContent)
case Success(Left(error)) =>
complete(errorToHttpResponse(error))
case Failure(exception) =>
complete(StatusCodes.InternalServerError, createErrorResponse(exception))
}
}
}
},
// POST /api/v1/orders/{orderId}/ship
path(Segment / "ship") { orderId =>
post {
entity(as[String]) { trackingNumber =>
onComplete(apiService.shipOrder(orderId, trackingNumber)) {
case Success(Right(order)) =>
complete(StatusCodes.OK, order)
case Success(Left(error)) =>
complete(errorToHttpResponse(error))
case Failure(exception) =>
complete(StatusCodes.InternalServerError, createErrorResponse(exception))
}
}
}
}
)
}
private def errorToHttpResponse(error: ApiError): (StatusCode, ErrorResponse) = {
val statusCode = error match {
case _: ApiError.ValidationError => StatusCodes.BadRequest
case _: ApiError.NotFound => StatusCodes.NotFound
case _: ApiError.Conflict => StatusCodes.Conflict
case _: ApiError.InternalError => StatusCodes.InternalServerError
}
val errorResponse = ErrorResponse(
error = error.getClass.getSimpleName,
code = error.code,
message = error.message,
timestamp = Instant.now().toString,
path = "" // Would be populated by middleware
)
(statusCode, errorResponse)
}
private def createErrorResponse(exception: Throwable): ErrorResponse = {
ErrorResponse(
error = exception.getClass.getSimpleName,
code = "INTERNAL_ERROR",
message = "An unexpected error occurred",
timestamp = Instant.now().toString,
path = ""
)
}
}
Service Mesh and Microservices Patterns
Circuit Breaker and Resilience Patterns
import akka.pattern.CircuitBreaker
import akka.actor.{ActorSystem, Scheduler}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}
// Circuit Breaker Implementation
class ResilientService(
name: String,
maxFailures: Int = 5,
callTimeout: FiniteDuration = 10.seconds,
resetTimeout: FiniteDuration = 1.minute
)(implicit system: ActorSystem, ec: ExecutionContext) {
private val circuitBreaker = new CircuitBreaker(
scheduler = system.scheduler,
maxFailures = maxFailures,
callTimeout = callTimeout,
resetTimeout = resetTimeout
).onOpen {
system.log.warning(s"Circuit breaker for $name is OPEN")
}.onHalfOpen {
system.log.info(s"Circuit breaker for $name is HALF-OPEN")
}.onClose {
system.log.info(s"Circuit breaker for $name is CLOSED")
}
def withCircuitBreaker[T](operation: => Future[T]): Future[T] = {
circuitBreaker.withCircuitBreaker(operation)
}
def isOpen: Boolean = circuitBreaker.isOpen
def isHalfOpen: Boolean = circuitBreaker.isHalfOpen
def isClosed: Boolean = circuitBreaker.isClosed
}
// Retry Logic
class RetryPolicy(
maxRetries: Int = 3,
baseDelay: FiniteDuration = 1.second,
maxDelay: FiniteDuration = 30.seconds,
backoffMultiplier: Double = 2.0
)(implicit ec: ExecutionContext, scheduler: Scheduler) {
def retry[T](operation: => Future[T]): Future[T] = {
def attemptWithRetry(attempt: Int): Future[T] = {
operation.recoverWith {
case exception if attempt < maxRetries && isRetryable(exception) =>
val delay = calculateDelay(attempt)
akka.pattern.after(delay, scheduler) {
attemptWithRetry(attempt + 1)
}
case exception =>
Future.failed(exception)
}
}
attemptWithRetry(0)
}
private def isRetryable(exception: Throwable): Boolean = {
exception match {
case _: java.net.ConnectException => true
case _: java.net.SocketTimeoutException => true
case _: java.util.concurrent.TimeoutException => true
case _ => false
}
}
private def calculateDelay(attempt: Int): FiniteDuration = {
val delay = baseDelay * math.pow(backoffMultiplier, attempt)
FiniteDuration(math.min(delay.toMillis, maxDelay.toMillis), MILLISECONDS)
}
}
// Bulkhead Pattern
class BulkheadExecutor(
name: String,
corePoolSize: Int = 10,
maxPoolSize: Int = 20,
keepAliveTime: Long = 60
) {
import java.util.concurrent.{ThreadPoolExecutor, LinkedBlockingQueue, TimeUnit}
private val executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue[Runnable](100),
(r: Runnable) => {
val thread = new Thread(r, s"$name-bulkhead-thread")
thread.setDaemon(true)
thread
}
)
implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(executor)
def execute[T](operation: => Future[T]): Future[T] = {
operation
}
def shutdown(): Unit = {
executor.shutdown()
}
def getActiveThreads: Int = executor.getActiveCount
def getQueueSize: Int = executor.getQueue.size()
}
// Service Discovery
trait ServiceDiscovery {
def discover(serviceName: String): Future[List[ServiceInstance]]
def register(instance: ServiceInstance): Future[Unit]
def deregister(serviceId: String): Future[Unit]
}
case class ServiceInstance(
id: String,
name: String,
host: String,
port: Int,
metadata: Map[String, String] = Map.empty,
health: ServiceHealth = ServiceHealth.Healthy
)
sealed trait ServiceHealth
object ServiceHealth {
case object Healthy extends ServiceHealth
case object Unhealthy extends ServiceHealth
case object Unknown extends ServiceHealth
}
// Simple in-memory service discovery (for testing)
class InMemoryServiceDiscovery(implicit ec: ExecutionContext) extends ServiceDiscovery {
private val services = scala.collection.mutable.Map[String, List[ServiceInstance]]()
def discover(serviceName: String): Future[List[ServiceInstance]] = {
Future.successful(services.getOrElse(serviceName, List.empty).filter(_.health == ServiceHealth.Healthy))
}
def register(instance: ServiceInstance): Future[Unit] = {
val currentInstances = services.getOrElse(instance.name, List.empty)
val updatedInstances = instance :: currentInstances.filterNot(_.id == instance.id)
services(instance.name) = updatedInstances
Future.successful(())
}
def deregister(serviceId: String): Future[Unit] = {
services.keys.foreach { serviceName =>
val currentInstances = services(serviceName)
services(serviceName) = currentInstances.filterNot(_.id == serviceId)
}
Future.successful(())
}
}
// Load Balancer
trait LoadBalancer {
def selectInstance(instances: List[ServiceInstance]): Option[ServiceInstance]
}
class RoundRobinLoadBalancer extends LoadBalancer {
private var currentIndex = 0
def selectInstance(instances: List[ServiceInstance]): Option[ServiceInstance] = {
if (instances.isEmpty) {
None
} else {
val instance = instances(currentIndex % instances.size)
currentIndex += 1
Some(instance)
}
}
}
class WeightedRandomLoadBalancer extends LoadBalancer {
def selectInstance(instances: List[ServiceInstance]): Option[ServiceInstance] = {
if (instances.isEmpty) {
None
} else {
// Simple random selection (could be enhanced with weights)
val randomIndex = scala.util.Random.nextInt(instances.size)
Some(instances(randomIndex))
}
}
}
// Service Client with Resilience
class ResilientServiceClient(
serviceName: String,
serviceDiscovery: ServiceDiscovery,
loadBalancer: LoadBalancer,
resilientService: ResilientService,
retryPolicy: RetryPolicy
)(implicit ec: ExecutionContext) {
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
def makeRequest(path: String, method: HttpMethod = HttpMethods.GET, entity: RequestEntity = HttpEntity.Empty): Future[HttpResponse] = {
resilientService.withCircuitBreaker {
retryPolicy.retry {
for {
instances <- serviceDiscovery.discover(serviceName)
instance <- Future.fromTry(scala.util.Try(loadBalancer.selectInstance(instances).get))
uri = s"http://${instance.host}:${instance.port}$path"
request = HttpRequest(method, uri, entity = entity)
response <- Http().singleRequest(request)
} yield response
}
}
}
def get(path: String): Future[HttpResponse] = makeRequest(path, HttpMethods.GET)
def post(path: String, entity: RequestEntity): Future[HttpResponse] = makeRequest(path, HttpMethods.POST, entity)
def put(path: String, entity: RequestEntity): Future[HttpResponse] = makeRequest(path, HttpMethods.PUT, entity)
def delete(path: String): Future[HttpResponse] = makeRequest(path, HttpMethods.DELETE)
}
// Distributed Tracing
import brave.{Span, Tracing}
import brave.propagation.TraceContext
class DistributedTracingService {
def createSpan(operationName: String): Span = {
Tracing.current().tracer().nextSpan().name(operationName).start()
}
def createChildSpan(parent: Span, operationName: String): Span = {
Tracing.current().tracer().nextSpan(parent.context()).name(operationName).start()
}
def addTag(span: Span, key: String, value: String): Unit = {
span.tag(key, value)
}
def addLog(span: Span, message: String): Unit = {
span.annotate(message)
}
def finishSpan(span: Span): Unit = {
span.end()
}
def withSpan[T](operationName: String)(operation: Span => T): T = {
val span = createSpan(operationName)
try {
operation(span)
} finally {
finishSpan(span)
}
}
}
Conclusion
Enterprise integration patterns provide the foundation for building robust, scalable, and maintainable distributed systems. Key takeaways include:
Message-Based Integration:
- Asynchronous communication reduces coupling
- Event-driven architecture enables scalability
- Message patterns facilitate system evolution
- Proper error handling and retry mechanisms
API Design Excellence:
- RESTful principles with HATEOAS
- Comprehensive error handling and status codes
- Pagination and filtering for large datasets
- Versioning strategies for API evolution
Resilience Patterns:
- Circuit breakers prevent cascade failures
- Retry policies handle transient failures
- Bulkhead pattern isolates failures
- Timeout mechanisms prevent resource exhaustion
Event Sourcing and CQRS:
- Event sourcing provides audit trails
- CQRS optimizes read and write models
- Event replay enables system recovery
- Eventual consistency in distributed systems
Service Discovery and Load Balancing:
- Dynamic service registration and discovery
- Multiple load balancing strategies
- Health checking and failure detection
- Service mesh integration
Monitoring and Observability:
- Distributed tracing for request correlation
- Metrics collection for system health
- Structured logging for debugging
- Performance monitoring and alerting
Security Considerations:
- Authentication and authorization patterns
- API rate limiting and throttling
- Input validation and sanitization
- Secure communication channels
Testing Strategies:
- Contract testing for API compatibility
- Integration testing for system behavior
- Chaos engineering for resilience validation
- Performance testing for scalability
Deployment Patterns:
- Blue-green deployments for zero downtime
- Canary releases for risk mitigation
- Feature flags for controlled rollouts
- Infrastructure as code for consistency
Data Management:
- Distributed transaction patterns
- Saga pattern for long-running processes
- Data consistency strategies
- Event streaming for real-time processing
Enterprise integration is about building systems that can evolve, scale, and remain maintainable over time. The patterns and practices covered in this lesson provide a solid foundation for architecting modern distributed systems that meet enterprise requirements for reliability, performance, and operational excellence.
Success in enterprise integration requires balancing multiple concerns: performance, reliability, maintainability, security, and business agility. The key is to start simple, measure everything, and evolve the architecture based on real-world requirements and constraints.
Comments
Be the first to comment on this lesson!