Enterprise Integration Patterns: Messaging, APIs, and System Architecture

Modern enterprise applications require sophisticated integration strategies to connect disparate systems, services, and data sources. This comprehensive lesson covers enterprise integration patterns, messaging systems, API design principles, and architectural approaches for building scalable, maintainable distributed systems.

Message-Based Integration Patterns

Asynchronous Messaging with Akka

import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{KillSwitches, Materializer}
import akka.util.Timeout
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import java.time.Instant
import java.util.UUID

// Message definitions for enterprise integration
object IntegrationMessages {

  // Base message types
  sealed trait Message {
    def messageId: String
    def timestamp: Instant
    def correlationId: Option[String]
    def headers: Map[String, String]
  }

  // Command messages (request actions)
  sealed trait Command extends Message

  case class ProcessOrder(
    messageId: String = UUID.randomUUID().toString,
    timestamp: Instant = Instant.now(),
    correlationId: Option[String] = None,
    headers: Map[String, String] = Map.empty,
    orderId: String,
    customerId: String,
    items: List[OrderItem],
    totalAmount: BigDecimal
  ) extends Command

  case class UpdateInventory(
    messageId: String = UUID.randomUUID().toString,
    timestamp: Instant = Instant.now(),
    correlationId: Option[String] = None,
    headers: Map[String, String] = Map.empty,
    productId: String,
    quantity: Int,
    operation: InventoryOperation
  ) extends Command

  case class SendNotification(
    messageId: String = UUID.randomUUID().toString,
    timestamp: Instant = Instant.now(),
    correlationId: Option[String] = None,
    headers: Map[String, String] = Map.empty,
    recipient: String,
    channel: NotificationChannel,
    content: String,
    priority: Priority
  ) extends Command

  // Event messages (notifications of state changes)
  sealed trait Event extends Message

  case class OrderProcessed(
    messageId: String = UUID.randomUUID().toString,
    timestamp: Instant = Instant.now(),
    correlationId: Option[String] = None,
    headers: Map[String, String] = Map.empty,
    orderId: String,
    status: OrderStatus,
    processedAt: Instant
  ) extends Event

  case class InventoryUpdated(
    messageId: String = UUID.randomUUID().toString,
    timestamp: Instant = Instant.now(),
    correlationId: Option[String] = None,
    headers: Map[String, String] = Map.empty,
    productId: String,
    previousQuantity: Int,
    newQuantity: Int,
    operation: InventoryOperation
  ) extends Event

  case class PaymentProcessed(
    messageId: String = UUID.randomUUID().toString,
    timestamp: Instant = Instant.now(),
    correlationId: Option[String] = None,
    headers: Map[String, String] = Map.empty,
    paymentId: String,
    orderId: String,
    amount: BigDecimal,
    status: PaymentStatus
  ) extends Event

  // Supporting types
  case class OrderItem(productId: String, quantity: Int, price: BigDecimal)

  sealed trait InventoryOperation
  case object Increase extends InventoryOperation
  case object Decrease extends InventoryOperation
  case object Reserve extends InventoryOperation
  case object Release extends InventoryOperation

  sealed trait NotificationChannel
  case object Email extends NotificationChannel
  case object SMS extends NotificationChannel
  case object Push extends NotificationChannel

  sealed trait Priority
  case object Low extends Priority
  case object Normal extends Priority
  case object High extends Priority
  case object Critical extends Priority

  sealed trait OrderStatus
  case object Pending extends OrderStatus
  case object Processing extends OrderStatus
  case object Confirmed extends OrderStatus
  case object Shipped extends OrderStatus
  case object Delivered extends OrderStatus
  case object Cancelled extends OrderStatus

  sealed trait PaymentStatus
  case object PaymentPending extends PaymentStatus
  case object PaymentSuccessful extends PaymentStatus
  case object PaymentFailed extends PaymentStatus
  case object PaymentRefunded extends PaymentStatus
}

// Message Bus implementation
class MessageBus(implicit system: ActorSystem[_]) {

  import IntegrationMessages._

  private val subscriptions = scala.collection.mutable.Map[Class[_], List[ActorRef[Message]]]()

  def publish[T <: Message](message: T): Unit = {
    val messageClass = message.getClass
    val subscribers = subscriptions.getOrElse(messageClass, List.empty)

    subscribers.foreach { subscriber =>
      subscriber ! message
    }

    // Also publish to wildcard subscribers
    val wildcardSubscribers = subscriptions.getOrElse(classOf[Message], List.empty)
    wildcardSubscribers.foreach { subscriber =>
      subscriber ! message
    }
  }

  def subscribe[T <: Message](messageClass: Class[T], subscriber: ActorRef[Message]): Unit = {
    val currentSubscribers = subscriptions.getOrElse(messageClass, List.empty)
    subscriptions(messageClass) = subscriber :: currentSubscribers
  }

  def unsubscribe[T <: Message](messageClass: Class[T], subscriber: ActorRef[Message]): Unit = {
    val currentSubscribers = subscriptions.getOrElse(messageClass, List.empty)
    subscriptions(messageClass) = currentSubscribers.filterNot(_ == subscriber)
  }
}

// Message Handler base trait
trait MessageHandler[T <: IntegrationMessages.Message] {
  def handle(message: T): Future[Unit]
  def canHandle(message: IntegrationMessages.Message): Boolean
}

// Order Processing Service
object OrderProcessingService {

  import IntegrationMessages._

  sealed trait Protocol
  case class HandleMessage(message: Message) extends Protocol
  case class ProcessOrderCommand(command: ProcessOrder) extends Protocol
  case class OrderProcessingComplete(orderId: String, success: Boolean) extends Protocol

  def apply(messageBus: MessageBus, inventoryService: ActorRef[InventoryService.Protocol], paymentService: ActorRef[PaymentService.Protocol]): Behavior[Protocol] = {
    Behaviors.setup { context =>
      // Subscribe to relevant messages
      messageBus.subscribe(classOf[ProcessOrder], context.self.asInstanceOf[ActorRef[Message]])

      active(messageBus, inventoryService, paymentService)
    }
  }

  private def active(
    messageBus: MessageBus,
    inventoryService: ActorRef[InventoryService.Protocol],
    paymentService: ActorRef[PaymentService.Protocol]
  ): Behavior[Protocol] = {
    Behaviors.receive { (context, message) =>
      message match {
        case HandleMessage(msg: ProcessOrder) =>
          processOrder(context, messageBus, inventoryService, paymentService, msg)
          Behaviors.same

        case ProcessOrderCommand(command) =>
          processOrder(context, messageBus, inventoryService, paymentService, command)
          Behaviors.same

        case OrderProcessingComplete(orderId, success) =>
          if (success) {
            val event = OrderProcessed(
              orderId = orderId,
              status = Confirmed,
              processedAt = Instant.now()
            )
            messageBus.publish(event)
          } else {
            val event = OrderProcessed(
              orderId = orderId,
              status = Cancelled,
              processedAt = Instant.now()
            )
            messageBus.publish(event)
          }
          Behaviors.same

        case _ => Behaviors.unhandled
      }
    }
  }

  private def processOrder(
    context: ActorContext[Protocol],
    messageBus: MessageBus,
    inventoryService: ActorRef[InventoryService.Protocol],
    paymentService: ActorRef[PaymentService.Protocol],
    order: ProcessOrder
  ): Unit = {

    context.log.info(s"Processing order ${order.orderId}")

    // Step 1: Reserve inventory
    order.items.foreach { item =>
      val reserveCommand = UpdateInventory(
        correlationId = Some(order.orderId),
        productId = item.productId,
        quantity = item.quantity,
        operation = Reserve
      )
      inventoryService ! InventoryService.HandleCommand(reserveCommand)
    }

    // Step 2: Process payment (simplified)
    paymentService ! PaymentService.ProcessPayment(order.orderId, order.totalAmount)

    // In a real implementation, you would coordinate these steps
    // and handle failures appropriately
    context.self ! OrderProcessingComplete(order.orderId, success = true)
  }
}

// Inventory Service
object InventoryService {

  import IntegrationMessages._

  sealed trait Protocol
  case class HandleCommand(command: UpdateInventory) extends Protocol
  case class CheckInventory(productId: String, requiredQuantity: Int, replyTo: ActorRef[InventoryCheckResult]) extends Protocol

  case class InventoryCheckResult(productId: String, available: Boolean, currentQuantity: Int)

  def apply(messageBus: MessageBus): Behavior[Protocol] = {
    Behaviors.setup { context =>
      messageBus.subscribe(classOf[UpdateInventory], context.self.asInstanceOf[ActorRef[Message]])

      // Initialize with some sample inventory
      val initialInventory = Map(
        "product-1" -> 100,
        "product-2" -> 50,
        "product-3" -> 200
      )

      active(messageBus, initialInventory)
    }
  }

  private def active(messageBus: MessageBus, inventory: Map[String, Int]): Behavior[Protocol] = {
    Behaviors.receive { (context, message) =>
      message match {
        case HandleCommand(command) =>
          val currentQuantity = inventory.getOrElse(command.productId, 0)

          val newQuantity = command.operation match {
            case Increase => currentQuantity + command.quantity
            case Decrease => math.max(0, currentQuantity - command.quantity)
            case Reserve => 
              if (currentQuantity >= command.quantity) {
                currentQuantity - command.quantity
              } else {
                context.log.warn(s"Insufficient inventory for product ${command.productId}")
                currentQuantity
              }
            case Release => currentQuantity + command.quantity
          }

          val updatedInventory = inventory + (command.productId -> newQuantity)

          // Publish inventory updated event
          val event = InventoryUpdated(
            correlationId = command.correlationId,
            productId = command.productId,
            previousQuantity = currentQuantity,
            newQuantity = newQuantity,
            operation = command.operation
          )
          messageBus.publish(event)

          active(messageBus, updatedInventory)

        case CheckInventory(productId, requiredQuantity, replyTo) =>
          val currentQuantity = inventory.getOrElse(productId, 0)
          val available = currentQuantity >= requiredQuantity
          replyTo ! InventoryCheckResult(productId, available, currentQuantity)
          Behaviors.same
      }
    }
  }
}

// Payment Service
object PaymentService {

  import IntegrationMessages._

  sealed trait Protocol
  case class ProcessPayment(orderId: String, amount: BigDecimal) extends Protocol
  case class RefundPayment(paymentId: String, amount: BigDecimal) extends Protocol

  def apply(messageBus: MessageBus): Behavior[Protocol] = {
    Behaviors.setup { context =>
      active(messageBus)
    }
  }

  private def active(messageBus: MessageBus): Behavior[Protocol] = {
    Behaviors.receive { (context, message) =>
      message match {
        case ProcessPayment(orderId, amount) =>
          // Simulate payment processing
          val paymentId = UUID.randomUUID().toString

          // In a real implementation, you would integrate with payment gateways
          val success = scala.util.Random.nextBoolean() // Random success/failure for demo

          val status = if (success) PaymentSuccessful else PaymentFailed

          val event = PaymentProcessed(
            paymentId = paymentId,
            orderId = orderId,
            amount = amount,
            status = status
          )

          messageBus.publish(event)
          Behaviors.same

        case RefundPayment(paymentId, amount) =>
          // Handle refund logic
          val event = PaymentProcessed(
            paymentId = paymentId,
            orderId = "", // Would need to track original order
            amount = amount,
            status = PaymentRefunded
          )

          messageBus.publish(event)
          Behaviors.same
      }
    }
  }
}

// Notification Service
object NotificationService {

  import IntegrationMessages._

  sealed trait Protocol
  case class HandleNotification(notification: SendNotification) extends Protocol

  def apply(messageBus: MessageBus): Behavior[Protocol] = {
    Behaviors.setup { context =>
      messageBus.subscribe(classOf[SendNotification], context.self.asInstanceOf[ActorRef[Message]])

      // Also subscribe to events to send automatic notifications
      messageBus.subscribe(classOf[OrderProcessed], context.self.asInstanceOf[ActorRef[Message]])
      messageBus.subscribe(classOf[PaymentProcessed], context.self.asInstanceOf[ActorRef[Message]])

      active(messageBus)
    }
  }

  private def active(messageBus: MessageBus): Behavior[Protocol] = {
    Behaviors.receive { (context, message) =>
      message match {
        case HandleNotification(notification) =>
          sendNotification(context, notification)
          Behaviors.same

        case _ => Behaviors.unhandled
      }
    }
  }

  private def sendNotification(context: ActorContext[Protocol], notification: SendNotification): Unit = {
    context.log.info(s"Sending ${notification.channel} notification to ${notification.recipient}: ${notification.content}")

    // In a real implementation, you would integrate with email services, SMS providers, etc.
    notification.channel match {
      case Email => sendEmail(notification.recipient, notification.content)
      case SMS => sendSMS(notification.recipient, notification.content)
      case Push => sendPushNotification(notification.recipient, notification.content)
    }
  }

  private def sendEmail(recipient: String, content: String): Unit = {
    // Email implementation
  }

  private def sendSMS(recipient: String, content: String): Unit = {
    // SMS implementation
  }

  private def sendPushNotification(recipient: String, content: String): Unit = {
    // Push notification implementation
  }
}

Event Sourcing and CQRS Integration

import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}
import akka.persistence.typed.PersistenceId
import akka.actor.typed.Behavior
import spray.json._
import DefaultJsonProtocol._

// Event Sourcing for Order Management
object OrderAggregate {

  // Commands
  sealed trait Command
  case class CreateOrder(orderId: String, customerId: String, items: List[OrderItem]) extends Command
  case class AddItem(item: OrderItem) extends Command
  case class RemoveItem(productId: String) extends Command
  case class ConfirmOrder() extends Command
  case class CancelOrder(reason: String) extends Command
  case class ShipOrder(trackingNumber: String) extends Command

  // Events
  sealed trait Event
  case class OrderCreated(orderId: String, customerId: String, items: List[OrderItem]) extends Event
  case class ItemAdded(item: OrderItem) extends Event
  case class ItemRemoved(productId: String) extends Event
  case class OrderConfirmed(confirmedAt: Instant) extends Event
  case class OrderCancelled(reason: String, cancelledAt: Instant) extends Event
  case class OrderShipped(trackingNumber: String, shippedAt: Instant) extends Event

  // State
  case class OrderState(
    orderId: String,
    customerId: String,
    items: List[OrderItem],
    status: OrderStatus,
    totalAmount: BigDecimal,
    createdAt: Instant,
    confirmedAt: Option[Instant] = None,
    shippedAt: Option[Instant] = None,
    cancelledAt: Option[Instant] = None,
    trackingNumber: Option[String] = None
  ) {

    def addItem(item: OrderItem): OrderState = {
      val updatedItems = item :: items.filterNot(_.productId == item.productId)
      copy(
        items = updatedItems,
        totalAmount = updatedItems.map(i => i.price * i.quantity).sum
      )
    }

    def removeItem(productId: String): OrderState = {
      val updatedItems = items.filterNot(_.productId == productId)
      copy(
        items = updatedItems,
        totalAmount = updatedItems.map(i => i.price * i.quantity).sum
      )
    }

    def canConfirm: Boolean = status == Pending && items.nonEmpty
    def canCancel: Boolean = status == Pending || status == Processing
    def canShip: Boolean = status == Confirmed
  }

  def apply(orderId: String): Behavior[Command] = {
    EventSourcedBehavior[Command, Event, Option[OrderState]](
      persistenceId = PersistenceId.ofUniqueId(s"order-$orderId"),
      emptyState = None,
      commandHandler = commandHandler,
      eventHandler = eventHandler
    )
  }

  private def commandHandler: (Option[OrderState], Command) => Effect[Event, Option[OrderState]] = {
    (state, command) =>
      state match {
        case None =>
          command match {
            case CreateOrder(orderId, customerId, items) =>
              Effect.persist(OrderCreated(orderId, customerId, items))

            case _ =>
              Effect.unhandled
          }

        case Some(currentState) =>
          command match {
            case CreateOrder(_, _, _) =>
              Effect.unhandled // Order already exists

            case AddItem(item) =>
              if (currentState.status == Pending) {
                Effect.persist(ItemAdded(item))
              } else {
                Effect.unhandled
              }

            case RemoveItem(productId) =>
              if (currentState.status == Pending && currentState.items.exists(_.productId == productId)) {
                Effect.persist(ItemRemoved(productId))
              } else {
                Effect.unhandled
              }

            case ConfirmOrder() =>
              if (currentState.canConfirm) {
                Effect.persist(OrderConfirmed(Instant.now()))
              } else {
                Effect.unhandled
              }

            case CancelOrder(reason) =>
              if (currentState.canCancel) {
                Effect.persist(OrderCancelled(reason, Instant.now()))
              } else {
                Effect.unhandled
              }

            case ShipOrder(trackingNumber) =>
              if (currentState.canShip) {
                Effect.persist(OrderShipped(trackingNumber, Instant.now()))
              } else {
                Effect.unhandled
              }
          }
      }
  }

  private def eventHandler: (Option[OrderState], Event) => Option[OrderState] = {
    (state, event) =>
      event match {
        case OrderCreated(orderId, customerId, items) =>
          val totalAmount = items.map(i => i.price * i.quantity).sum
          Some(OrderState(
            orderId = orderId,
            customerId = customerId,
            items = items,
            status = Pending,
            totalAmount = totalAmount,
            createdAt = Instant.now()
          ))

        case ItemAdded(item) =>
          state.map(_.addItem(item))

        case ItemRemoved(productId) =>
          state.map(_.removeItem(productId))

        case OrderConfirmed(confirmedAt) =>
          state.map(_.copy(status = Confirmed, confirmedAt = Some(confirmedAt)))

        case OrderCancelled(_, cancelledAt) =>
          state.map(_.copy(status = Cancelled, cancelledAt = Some(cancelledAt)))

        case OrderShipped(trackingNumber, shippedAt) =>
          state.map(_.copy(
            status = Shipped,
            trackingNumber = Some(trackingNumber),
            shippedAt = Some(shippedAt)
          ))
      }
  }
}

// Read Model for CQRS
trait OrderReadModel {
  def findByCustomerId(customerId: String): Future[List[OrderSummary]]
  def findByStatus(status: OrderStatus): Future[List[OrderSummary]]
  def getOrderDetails(orderId: String): Future[Option[OrderDetails]]
  def getOrderHistory(orderId: String): Future[List[OrderEvent]]
}

case class OrderSummary(
  orderId: String,
  customerId: String,
  status: OrderStatus,
  totalAmount: BigDecimal,
  itemCount: Int,
  createdAt: Instant
)

case class OrderDetails(
  orderId: String,
  customerId: String,
  items: List[OrderItem],
  status: OrderStatus,
  totalAmount: BigDecimal,
  createdAt: Instant,
  confirmedAt: Option[Instant],
  shippedAt: Option[Instant],
  trackingNumber: Option[String]
)

case class OrderEvent(
  eventType: String,
  timestamp: Instant,
  details: Map[String, String]
)

// Read Model Implementation
class OrderReadModelImpl(database: Database)(implicit ec: ExecutionContext) extends OrderReadModel {

  // Database schema would be defined here
  // This is a simplified implementation

  def findByCustomerId(customerId: String): Future[List[OrderSummary]] = {
    // Query implementation
    Future.successful(List.empty)
  }

  def findByStatus(status: OrderStatus): Future[List[OrderSummary]] = {
    // Query implementation
    Future.successful(List.empty)
  }

  def getOrderDetails(orderId: String): Future[Option[OrderDetails]] = {
    // Query implementation
    Future.successful(None)
  }

  def getOrderHistory(orderId: String): Future[List[OrderEvent]] = {
    // Query implementation
    Future.successful(List.empty)
  }
}

// Event Handler for Read Model Updates
object ReadModelUpdater {

  import OrderAggregate._

  def apply(readModel: OrderReadModel): Behavior[Event] = {
    Behaviors.receive { (context, event) =>
      event match {
        case OrderCreated(orderId, customerId, items) =>
          // Update read model
          context.log.info(s"Updating read model for order created: $orderId")
          Behaviors.same

        case OrderConfirmed(confirmedAt) =>
          // Update read model
          context.log.info(s"Updating read model for order confirmed")
          Behaviors.same

        case OrderShipped(trackingNumber, shippedAt) =>
          // Update read model
          context.log.info(s"Updating read model for order shipped: $trackingNumber")
          Behaviors.same

        case _ =>
          Behaviors.same
      }
    }
  }
}

RESTful API Design Patterns

Comprehensive API Implementation

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.model._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json._
import DefaultJsonProtocol._
import scala.concurrent.{ExecutionContext, Future}
import java.time.Instant

// API Models
object ApiModels {

  // Request/Response DTOs
  case class CreateOrderRequest(
    customerId: String,
    items: List[OrderItemRequest]
  )

  case class OrderItemRequest(
    productId: String,
    quantity: Int,
    price: BigDecimal
  )

  case class UpdateOrderRequest(
    items: Option[List[OrderItemRequest]] = None
  )

  case class OrderResponse(
    orderId: String,
    customerId: String,
    items: List[OrderItemResponse],
    status: String,
    totalAmount: BigDecimal,
    createdAt: String,
    links: List[Link]
  )

  case class OrderItemResponse(
    productId: String,
    quantity: Int,
    price: BigDecimal,
    subtotal: BigDecimal
  )

  case class Link(
    rel: String,
    href: String,
    method: String
  )

  case class ErrorResponse(
    error: String,
    code: String,
    message: String,
    timestamp: String,
    path: String
  )

  case class PaginatedResponse[T](
    data: List[T],
    pagination: PaginationInfo
  )

  case class PaginationInfo(
    page: Int,
    size: Int,
    totalElements: Long,
    totalPages: Int,
    first: Boolean,
    last: Boolean,
    links: List[Link]
  )

  // JSON formats
  implicit val orderItemRequestFormat: RootJsonFormat[OrderItemRequest] = jsonFormat3(OrderItemRequest)
  implicit val createOrderRequestFormat: RootJsonFormat[CreateOrderRequest] = jsonFormat2(CreateOrderRequest)
  implicit val updateOrderRequestFormat: RootJsonFormat[UpdateOrderRequest] = jsonFormat1(UpdateOrderRequest)
  implicit val orderItemResponseFormat: RootJsonFormat[OrderItemResponse] = jsonFormat4(OrderItemResponse)
  implicit val linkFormat: RootJsonFormat[Link] = jsonFormat3(Link)
  implicit val orderResponseFormat: RootJsonFormat[OrderResponse] = jsonFormat7(OrderResponse)
  implicit val errorResponseFormat: RootJsonFormat[ErrorResponse] = jsonFormat5(ErrorResponse)
  implicit val paginationInfoFormat: RootJsonFormat[PaginationInfo] = jsonFormat7(PaginationInfo)
  implicit def paginatedResponseFormat[T: JsonFormat]: RootJsonFormat[PaginatedResponse[T]] = jsonFormat2(PaginatedResponse[T])
}

// API Service Layer
class OrderApiService(
  orderService: OrderService,
  readModel: OrderReadModel
)(implicit ec: ExecutionContext) {

  import ApiModels._

  def createOrder(request: CreateOrderRequest): Future[Either[ApiError, OrderResponse]] = {
    val items = request.items.map(item => OrderItem(item.productId, item.quantity, item.price))

    orderService.createOrder(request.customerId, items).map {
      case Right(order) => Right(orderToResponse(order))
      case Left(error) => Left(ApiError.ValidationError(error))
    }
  }

  def getOrder(orderId: String): Future[Either[ApiError, OrderResponse]] = {
    readModel.getOrderDetails(orderId).map {
      case Some(order) => Right(orderDetailsToResponse(order))
      case None => Left(ApiError.NotFound(s"Order $orderId not found"))
    }
  }

  def updateOrder(orderId: String, request: UpdateOrderRequest): Future[Either[ApiError, OrderResponse]] = {
    request.items match {
      case Some(items) =>
        val orderItems = items.map(item => OrderItem(item.productId, item.quantity, item.price))
        orderService.updateOrderItems(orderId, orderItems).map {
          case Right(order) => Right(orderToResponse(order))
          case Left(error) => Left(ApiError.ValidationError(error))
        }
      case None =>
        Future.successful(Left(ApiError.ValidationError("No updates provided")))
    }
  }

  def listOrders(
    customerId: Option[String],
    status: Option[String],
    page: Int,
    size: Int
  ): Future[Either[ApiError, PaginatedResponse[OrderResponse]]] = {

    val futureOrders = (customerId, status) match {
      case (Some(cId), Some(st)) =>
        // In a real implementation, you'd have a method that filters by both
        readModel.findByCustomerId(cId)
      case (Some(cId), None) =>
        readModel.findByCustomerId(cId)
      case (None, Some(st)) =>
        OrderStatus.fromString(st) match {
          case Some(orderStatus) => readModel.findByStatus(orderStatus)
          case None => Future.successful(List.empty)
        }
      case (None, None) =>
        // Get all orders (with pagination)
        Future.successful(List.empty) // Simplified
    }

    futureOrders.map { orders =>
      val totalElements = orders.length.toLong
      val totalPages = math.ceil(totalElements.toDouble / size).toInt
      val startIndex = page * size
      val endIndex = math.min(startIndex + size, orders.length)
      val pageData = orders.slice(startIndex, endIndex)

      val responses = pageData.map(orderSummaryToResponse)

      val pagination = PaginationInfo(
        page = page,
        size = size,
        totalElements = totalElements,
        totalPages = totalPages,
        first = page == 0,
        last = page >= totalPages - 1,
        links = createPaginationLinks(page, totalPages, size, customerId, status)
      )

      Right(PaginatedResponse(responses, pagination))
    }
  }

  def cancelOrder(orderId: String, reason: String): Future[Either[ApiError, Unit]] = {
    orderService.cancelOrder(orderId, reason).map {
      case Right(_) => Right(())
      case Left(error) => Left(ApiError.ValidationError(error))
    }
  }

  def shipOrder(orderId: String, trackingNumber: String): Future[Either[ApiError, OrderResponse]] = {
    orderService.shipOrder(orderId, trackingNumber).map {
      case Right(order) => Right(orderToResponse(order))
      case Left(error) => Left(ApiError.ValidationError(error))
    }
  }

  private def orderToResponse(order: Order): OrderResponse = {
    OrderResponse(
      orderId = order.orderId,
      customerId = order.customerId,
      items = order.items.map(itemToResponse),
      status = order.status.toString,
      totalAmount = order.totalAmount,
      createdAt = order.createdAt.toString,
      links = createOrderLinks(order.orderId)
    )
  }

  private def orderDetailsToResponse(details: OrderDetails): OrderResponse = {
    OrderResponse(
      orderId = details.orderId,
      customerId = details.customerId,
      items = details.items.map(itemToResponse),
      status = details.status.toString,
      totalAmount = details.totalAmount,
      createdAt = details.createdAt.toString,
      links = createOrderLinks(details.orderId)
    )
  }

  private def orderSummaryToResponse(summary: OrderSummary): OrderResponse = {
    OrderResponse(
      orderId = summary.orderId,
      customerId = summary.customerId,
      items = List.empty, // Summary doesn't include items
      status = summary.status.toString,
      totalAmount = summary.totalAmount,
      createdAt = summary.createdAt.toString,
      links = createOrderLinks(summary.orderId)
    )
  }

  private def itemToResponse(item: OrderItem): OrderItemResponse = {
    OrderItemResponse(
      productId = item.productId,
      quantity = item.quantity,
      price = item.price,
      subtotal = item.price * item.quantity
    )
  }

  private def createOrderLinks(orderId: String): List[Link] = {
    List(
      Link("self", s"/api/v1/orders/$orderId", "GET"),
      Link("update", s"/api/v1/orders/$orderId", "PUT"),
      Link("cancel", s"/api/v1/orders/$orderId/cancel", "POST"),
      Link("ship", s"/api/v1/orders/$orderId/ship", "POST")
    )
  }

  private def createPaginationLinks(
    currentPage: Int,
    totalPages: Int,
    size: Int,
    customerId: Option[String],
    status: Option[String]
  ): List[Link] = {
    val baseQuery = List(
      customerId.map("customerId=" + _),
      status.map("status=" + _),
      Some(s"size=$size")
    ).flatten.mkString("&")

    List(
      Some(Link("first", s"/api/v1/orders?page=0&$baseQuery", "GET")),
      if (currentPage > 0) Some(Link("prev", s"/api/v1/orders?page=${currentPage - 1}&$baseQuery", "GET")) else None,
      if (currentPage < totalPages - 1) Some(Link("next", s"/api/v1/orders?page=${currentPage + 1}&$baseQuery", "GET")) else None,
      Some(Link("last", s"/api/v1/orders?page=${totalPages - 1}&$baseQuery", "GET"))
    ).flatten
  }
}

// Error handling
sealed trait ApiError {
  def message: String
  def code: String
}

object ApiError {
  case class ValidationError(message: String) extends ApiError {
    val code = "VALIDATION_ERROR"
  }

  case class NotFound(message: String) extends ApiError {
    val code = "NOT_FOUND"
  }

  case class Conflict(message: String) extends ApiError {
    val code = "CONFLICT"
  }

  case class InternalError(message: String) extends ApiError {
    val code = "INTERNAL_ERROR"
  }
}

// HTTP Routes
class OrderRoutes(apiService: OrderApiService)(implicit ec: ExecutionContext) {

  import ApiModels._

  val routes: Route = 
    pathPrefix("api" / "v1" / "orders") {
      concat(
        // POST /api/v1/orders
        pathEnd {
          post {
            entity(as[CreateOrderRequest]) { request =>
              onComplete(apiService.createOrder(request)) {
                case Success(Right(order)) =>
                  complete(StatusCodes.Created, order)
                case Success(Left(error)) =>
                  complete(errorToHttpResponse(error))
                case Failure(exception) =>
                  complete(StatusCodes.InternalServerError, createErrorResponse(exception))
              }
            }
          }
        },

        // GET /api/v1/orders
        pathEnd {
          get {
            parameters("customerId".optional, "status".optional, "page".as[Int] ? 0, "size".as[Int] ? 20) {
              (customerId, status, page, size) =>
                onComplete(apiService.listOrders(customerId, status, page, size)) {
                  case Success(Right(orders)) =>
                    complete(StatusCodes.OK, orders)
                  case Success(Left(error)) =>
                    complete(errorToHttpResponse(error))
                  case Failure(exception) =>
                    complete(StatusCodes.InternalServerError, createErrorResponse(exception))
                }
            }
          }
        },

        // GET /api/v1/orders/{orderId}
        path(Segment) { orderId =>
          get {
            onComplete(apiService.getOrder(orderId)) {
              case Success(Right(order)) =>
                complete(StatusCodes.OK, order)
              case Success(Left(error)) =>
                complete(errorToHttpResponse(error))
              case Failure(exception) =>
                complete(StatusCodes.InternalServerError, createErrorResponse(exception))
            }
          }
        },

        // PUT /api/v1/orders/{orderId}
        path(Segment) { orderId =>
          put {
            entity(as[UpdateOrderRequest]) { request =>
              onComplete(apiService.updateOrder(orderId, request)) {
                case Success(Right(order)) =>
                  complete(StatusCodes.OK, order)
                case Success(Left(error)) =>
                  complete(errorToHttpResponse(error))
                case Failure(exception) =>
                  complete(StatusCodes.InternalServerError, createErrorResponse(exception))
              }
            }
          }
        },

        // POST /api/v1/orders/{orderId}/cancel
        path(Segment / "cancel") { orderId =>
          post {
            entity(as[String]) { reason =>
              onComplete(apiService.cancelOrder(orderId, reason)) {
                case Success(Right(_)) =>
                  complete(StatusCodes.NoContent)
                case Success(Left(error)) =>
                  complete(errorToHttpResponse(error))
                case Failure(exception) =>
                  complete(StatusCodes.InternalServerError, createErrorResponse(exception))
              }
            }
          }
        },

        // POST /api/v1/orders/{orderId}/ship
        path(Segment / "ship") { orderId =>
          post {
            entity(as[String]) { trackingNumber =>
              onComplete(apiService.shipOrder(orderId, trackingNumber)) {
                case Success(Right(order)) =>
                  complete(StatusCodes.OK, order)
                case Success(Left(error)) =>
                  complete(errorToHttpResponse(error))
                case Failure(exception) =>
                  complete(StatusCodes.InternalServerError, createErrorResponse(exception))
              }
            }
          }
        }
      )
    }

  private def errorToHttpResponse(error: ApiError): (StatusCode, ErrorResponse) = {
    val statusCode = error match {
      case _: ApiError.ValidationError => StatusCodes.BadRequest
      case _: ApiError.NotFound => StatusCodes.NotFound
      case _: ApiError.Conflict => StatusCodes.Conflict
      case _: ApiError.InternalError => StatusCodes.InternalServerError
    }

    val errorResponse = ErrorResponse(
      error = error.getClass.getSimpleName,
      code = error.code,
      message = error.message,
      timestamp = Instant.now().toString,
      path = "" // Would be populated by middleware
    )

    (statusCode, errorResponse)
  }

  private def createErrorResponse(exception: Throwable): ErrorResponse = {
    ErrorResponse(
      error = exception.getClass.getSimpleName,
      code = "INTERNAL_ERROR",
      message = "An unexpected error occurred",
      timestamp = Instant.now().toString,
      path = ""
    )
  }
}

Service Mesh and Microservices Patterns

Circuit Breaker and Resilience Patterns

import akka.pattern.CircuitBreaker
import akka.actor.{ActorSystem, Scheduler}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

// Circuit Breaker Implementation
class ResilientService(
  name: String,
  maxFailures: Int = 5,
  callTimeout: FiniteDuration = 10.seconds,
  resetTimeout: FiniteDuration = 1.minute
)(implicit system: ActorSystem, ec: ExecutionContext) {

  private val circuitBreaker = new CircuitBreaker(
    scheduler = system.scheduler,
    maxFailures = maxFailures,
    callTimeout = callTimeout,
    resetTimeout = resetTimeout
  ).onOpen {
    system.log.warning(s"Circuit breaker for $name is OPEN")
  }.onHalfOpen {
    system.log.info(s"Circuit breaker for $name is HALF-OPEN")
  }.onClose {
    system.log.info(s"Circuit breaker for $name is CLOSED")
  }

  def withCircuitBreaker[T](operation: => Future[T]): Future[T] = {
    circuitBreaker.withCircuitBreaker(operation)
  }

  def isOpen: Boolean = circuitBreaker.isOpen
  def isHalfOpen: Boolean = circuitBreaker.isHalfOpen
  def isClosed: Boolean = circuitBreaker.isClosed
}

// Retry Logic
class RetryPolicy(
  maxRetries: Int = 3,
  baseDelay: FiniteDuration = 1.second,
  maxDelay: FiniteDuration = 30.seconds,
  backoffMultiplier: Double = 2.0
)(implicit ec: ExecutionContext, scheduler: Scheduler) {

  def retry[T](operation: => Future[T]): Future[T] = {
    def attemptWithRetry(attempt: Int): Future[T] = {
      operation.recoverWith {
        case exception if attempt < maxRetries && isRetryable(exception) =>
          val delay = calculateDelay(attempt)
          akka.pattern.after(delay, scheduler) {
            attemptWithRetry(attempt + 1)
          }
        case exception =>
          Future.failed(exception)
      }
    }

    attemptWithRetry(0)
  }

  private def isRetryable(exception: Throwable): Boolean = {
    exception match {
      case _: java.net.ConnectException => true
      case _: java.net.SocketTimeoutException => true
      case _: java.util.concurrent.TimeoutException => true
      case _ => false
    }
  }

  private def calculateDelay(attempt: Int): FiniteDuration = {
    val delay = baseDelay * math.pow(backoffMultiplier, attempt)
    FiniteDuration(math.min(delay.toMillis, maxDelay.toMillis), MILLISECONDS)
  }
}

// Bulkhead Pattern
class BulkheadExecutor(
  name: String,
  corePoolSize: Int = 10,
  maxPoolSize: Int = 20,
  keepAliveTime: Long = 60
) {

  import java.util.concurrent.{ThreadPoolExecutor, LinkedBlockingQueue, TimeUnit}

  private val executor = new ThreadPoolExecutor(
    corePoolSize,
    maxPoolSize,
    keepAliveTime,
    TimeUnit.SECONDS,
    new LinkedBlockingQueue[Runnable](100),
    (r: Runnable) => {
      val thread = new Thread(r, s"$name-bulkhead-thread")
      thread.setDaemon(true)
      thread
    }
  )

  implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(executor)

  def execute[T](operation: => Future[T]): Future[T] = {
    operation
  }

  def shutdown(): Unit = {
    executor.shutdown()
  }

  def getActiveThreads: Int = executor.getActiveCount
  def getQueueSize: Int = executor.getQueue.size()
}

// Service Discovery
trait ServiceDiscovery {
  def discover(serviceName: String): Future[List[ServiceInstance]]
  def register(instance: ServiceInstance): Future[Unit]
  def deregister(serviceId: String): Future[Unit]
}

case class ServiceInstance(
  id: String,
  name: String,
  host: String,
  port: Int,
  metadata: Map[String, String] = Map.empty,
  health: ServiceHealth = ServiceHealth.Healthy
)

sealed trait ServiceHealth
object ServiceHealth {
  case object Healthy extends ServiceHealth
  case object Unhealthy extends ServiceHealth
  case object Unknown extends ServiceHealth
}

// Simple in-memory service discovery (for testing)
class InMemoryServiceDiscovery(implicit ec: ExecutionContext) extends ServiceDiscovery {

  private val services = scala.collection.mutable.Map[String, List[ServiceInstance]]()

  def discover(serviceName: String): Future[List[ServiceInstance]] = {
    Future.successful(services.getOrElse(serviceName, List.empty).filter(_.health == ServiceHealth.Healthy))
  }

  def register(instance: ServiceInstance): Future[Unit] = {
    val currentInstances = services.getOrElse(instance.name, List.empty)
    val updatedInstances = instance :: currentInstances.filterNot(_.id == instance.id)
    services(instance.name) = updatedInstances
    Future.successful(())
  }

  def deregister(serviceId: String): Future[Unit] = {
    services.keys.foreach { serviceName =>
      val currentInstances = services(serviceName)
      services(serviceName) = currentInstances.filterNot(_.id == serviceId)
    }
    Future.successful(())
  }
}

// Load Balancer
trait LoadBalancer {
  def selectInstance(instances: List[ServiceInstance]): Option[ServiceInstance]
}

class RoundRobinLoadBalancer extends LoadBalancer {
  private var currentIndex = 0

  def selectInstance(instances: List[ServiceInstance]): Option[ServiceInstance] = {
    if (instances.isEmpty) {
      None
    } else {
      val instance = instances(currentIndex % instances.size)
      currentIndex += 1
      Some(instance)
    }
  }
}

class WeightedRandomLoadBalancer extends LoadBalancer {
  def selectInstance(instances: List[ServiceInstance]): Option[ServiceInstance] = {
    if (instances.isEmpty) {
      None
    } else {
      // Simple random selection (could be enhanced with weights)
      val randomIndex = scala.util.Random.nextInt(instances.size)
      Some(instances(randomIndex))
    }
  }
}

// Service Client with Resilience
class ResilientServiceClient(
  serviceName: String,
  serviceDiscovery: ServiceDiscovery,
  loadBalancer: LoadBalancer,
  resilientService: ResilientService,
  retryPolicy: RetryPolicy
)(implicit ec: ExecutionContext) {

  import akka.http.scaladsl.Http
  import akka.http.scaladsl.model._

  def makeRequest(path: String, method: HttpMethod = HttpMethods.GET, entity: RequestEntity = HttpEntity.Empty): Future[HttpResponse] = {
    resilientService.withCircuitBreaker {
      retryPolicy.retry {
        for {
          instances <- serviceDiscovery.discover(serviceName)
          instance <- Future.fromTry(scala.util.Try(loadBalancer.selectInstance(instances).get))
          uri = s"http://${instance.host}:${instance.port}$path"
          request = HttpRequest(method, uri, entity = entity)
          response <- Http().singleRequest(request)
        } yield response
      }
    }
  }

  def get(path: String): Future[HttpResponse] = makeRequest(path, HttpMethods.GET)
  def post(path: String, entity: RequestEntity): Future[HttpResponse] = makeRequest(path, HttpMethods.POST, entity)
  def put(path: String, entity: RequestEntity): Future[HttpResponse] = makeRequest(path, HttpMethods.PUT, entity)
  def delete(path: String): Future[HttpResponse] = makeRequest(path, HttpMethods.DELETE)
}

// Distributed Tracing
import brave.{Span, Tracing}
import brave.propagation.TraceContext

class DistributedTracingService {

  def createSpan(operationName: String): Span = {
    Tracing.current().tracer().nextSpan().name(operationName).start()
  }

  def createChildSpan(parent: Span, operationName: String): Span = {
    Tracing.current().tracer().nextSpan(parent.context()).name(operationName).start()
  }

  def addTag(span: Span, key: String, value: String): Unit = {
    span.tag(key, value)
  }

  def addLog(span: Span, message: String): Unit = {
    span.annotate(message)
  }

  def finishSpan(span: Span): Unit = {
    span.end()
  }

  def withSpan[T](operationName: String)(operation: Span => T): T = {
    val span = createSpan(operationName)
    try {
      operation(span)
    } finally {
      finishSpan(span)
    }
  }
}

Conclusion

Enterprise integration patterns provide the foundation for building robust, scalable, and maintainable distributed systems. Key takeaways include:

Message-Based Integration:

  • Asynchronous communication reduces coupling
  • Event-driven architecture enables scalability
  • Message patterns facilitate system evolution
  • Proper error handling and retry mechanisms

API Design Excellence:

  • RESTful principles with HATEOAS
  • Comprehensive error handling and status codes
  • Pagination and filtering for large datasets
  • Versioning strategies for API evolution

Resilience Patterns:

  • Circuit breakers prevent cascade failures
  • Retry policies handle transient failures
  • Bulkhead pattern isolates failures
  • Timeout mechanisms prevent resource exhaustion

Event Sourcing and CQRS:

  • Event sourcing provides audit trails
  • CQRS optimizes read and write models
  • Event replay enables system recovery
  • Eventual consistency in distributed systems

Service Discovery and Load Balancing:

  • Dynamic service registration and discovery
  • Multiple load balancing strategies
  • Health checking and failure detection
  • Service mesh integration

Monitoring and Observability:

  • Distributed tracing for request correlation
  • Metrics collection for system health
  • Structured logging for debugging
  • Performance monitoring and alerting

Security Considerations:

  • Authentication and authorization patterns
  • API rate limiting and throttling
  • Input validation and sanitization
  • Secure communication channels

Testing Strategies:

  • Contract testing for API compatibility
  • Integration testing for system behavior
  • Chaos engineering for resilience validation
  • Performance testing for scalability

Deployment Patterns:

  • Blue-green deployments for zero downtime
  • Canary releases for risk mitigation
  • Feature flags for controlled rollouts
  • Infrastructure as code for consistency

Data Management:

  • Distributed transaction patterns
  • Saga pattern for long-running processes
  • Data consistency strategies
  • Event streaming for real-time processing

Enterprise integration is about building systems that can evolve, scale, and remain maintainable over time. The patterns and practices covered in this lesson provide a solid foundation for architecting modern distributed systems that meet enterprise requirements for reliability, performance, and operational excellence.

Success in enterprise integration requires balancing multiple concerns: performance, reliability, maintainability, security, and business agility. The key is to start simple, measure everything, and evolve the architecture based on real-world requirements and constraints.