Reactive Streams and Akka Streams: Building Resilient Data Pipelines

Reactive Streams and Akka Streams provide powerful abstractions for building resilient, high-throughput data processing pipelines with built-in backpressure handling. This comprehensive lesson covers stream fundamentals, flow composition, error recovery, and advanced patterns for scalable streaming applications.

Reactive Streams Fundamentals

Core Concepts and Backpressure

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.util.ByteString
import akka.{Done, NotUsed}
import scala.concurrent.duration._
import scala.concurrent.{Future, ExecutionContext}
import scala.util.{Success, Failure, Random}
import java.nio.file.Paths
import java.time.{Instant, LocalDateTime}

// Basic reactive streams setup
object ReactiveStreamsBasics {

  implicit val system: ActorSystem = ActorSystem("reactive-streams")
  implicit val materializer: Materializer = Materializer(system)
  implicit val ec: ExecutionContext = system.dispatcher

  // Simple source creation and consumption
  def basicSourceExample(): Future[Done] = {
    val source: Source[Int, NotUsed] = Source(1 to 100)

    source
      .map(_ * 2)
      .filter(_ % 4 == 0)
      .take(10)
      .runWith(Sink.foreach(println))
  }

  // Backpressure demonstration
  def backpressureExample(): Future[Done] = {
    val fastSource = Source.tick(0.millis, 1.millis, "fast-element")
    val slowSink = Sink.foreach[String] { element =>
      Thread.sleep(100) // Simulate slow processing
      println(s"Processed: $element")
    }

    fastSource
      .buffer(10, OverflowStrategy.dropHead) // Handle backpressure
      .take(50)
      .runWith(slowSink)
  }

  // Custom source with state
  def createCustomSource(): Source[String, NotUsed] = {
    Source.unfold(0) { state =>
      if (state < 100) {
        Some((state + 1, s"Item-$state"))
      } else {
        None
      }
    }
  }

  // Rate-limited processing
  def rateLimitedProcessing(): Future[Done] = {
    Source(1 to 1000)
      .throttle(10, 1.second) // 10 elements per second
      .map(processItem)
      .runWith(Sink.ignore)
  }

  private def processItem(item: Int): String = {
    // Simulate processing
    Thread.sleep(Random.nextInt(50))
    s"Processed-$item"
  }

  // Error handling in streams
  def errorHandlingExample(): Future[Done] = {
    Source(1 to 20)
      .map { i =>
        if (i % 7 == 0) throw new RuntimeException(s"Error at $i")
        else i * 2
      }
      .recover {
        case _: RuntimeException => -1
      }
      .runWith(Sink.foreach(println))
  }

  // Supervision strategies
  def supervisionExample(): Future[Done] = {
    val decider: Supervision.Decider = {
      case _: ArithmeticException => Supervision.Resume
      case _: IllegalArgumentException => Supervision.Restart
      case _: RuntimeException => Supervision.Stop
      case _ => Supervision.Escalate
    }

    val settings = ActorMaterializerSettings(system)
      .withSupervisionStrategy(decider)

    implicit val materializer: Materializer = Materializer(settings)

    Source(1 to 20)
      .map { i =>
        if (i % 5 == 0) throw new ArithmeticException("Division by zero")
        if (i % 7 == 0) throw new IllegalArgumentException("Invalid argument")
        i * 2
      }
      .runWith(Sink.foreach(println))
  }
}

// Advanced source patterns
object AdvancedSources {

  implicit val system: ActorSystem = ActorSystem("advanced-sources")
  implicit val materializer: Materializer = Materializer(system)
  implicit val ec: ExecutionContext = system.dispatcher

  // Timer-based sources
  def timerBasedSources(): Source[String, Cancellable] = {
    Source.tick(
      initialDelay = 1.second,
      interval = 5.seconds,
      tick = "heartbeat"
    ).map(_ => s"Heartbeat at ${Instant.now()}")
  }

  // Queue-based source for dynamic data
  def queueBasedSource(): (SourceQueueWithComplete[String], Source[String, NotUsed]) = {
    val (queue, source) = Source.queue[String](
      bufferSize = 100,
      overflowStrategy = OverflowStrategy.backpressure
    ).preMaterialize()

    // Simulate external data feeding
    system.scheduler.scheduleAtFixedRate(1.second, 2.seconds) { () =>
      queue.offer(s"External data at ${System.currentTimeMillis()}")
    }

    (queue, source)
  }

  // ActorRef-based source
  def actorRefSource(): Source[String, ActorRef] = {
    Source.actorRef[String](
      completionMatcher = {
        case "complete" => CompletionStrategy.immediately
      },
      failureMatcher = {
        case "fail" => new RuntimeException("Stream failed")
      },
      bufferSize = 100,
      overflowStrategy = OverflowStrategy.dropHead
    )
  }

  // File-based sources
  def fileSource(filePath: String): Source[ByteString, Future[IOResult]] = {
    FileIO.fromPath(Paths.get(filePath))
  }

  def csvFileSource(filePath: String): Source[Map[String, String], Future[IOResult]] = {
    FileIO.fromPath(Paths.get(filePath))
      .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 8192))
      .map(_.utf8String)
      .drop(1) // Skip header
      .map(parseCsvLine)
  }

  private def parseCsvLine(line: String): Map[String, String] = {
    val headers = Array("id", "name", "email", "age")
    val values = line.split(",").map(_.trim)
    headers.zip(values).toMap
  }

  // HTTP-based source
  def httpSource(): Source[String, NotUsed] = {
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model._

    Source.repeat(HttpRequest(uri = "https://api.example.com/data"))
      .throttle(1, 5.seconds) // Rate limit API calls
      .mapAsync(1) { request =>
        Http().singleRequest(request).flatMap { response =>
          response.entity.toStrict(5.seconds).map(_.data.utf8String)
        }
      }
  }

  // Database source with pagination
  def databaseSource(): Source[DatabaseRecord, NotUsed] = {
    Source.unfoldAsync(0) { offset =>
      fetchDatabasePage(offset, pageSize = 100).map { records =>
        if (records.nonEmpty) {
          Some((offset + records.length, records))
        } else {
          None
        }
      }
    }.mapConcat(identity)
  }

  private def fetchDatabasePage(offset: Int, pageSize: Int): Future[List[DatabaseRecord]] = {
    // Simulate database query
    Future {
      if (offset < 1000) {
        (offset until Math.min(offset + pageSize, 1000)).map { i =>
          DatabaseRecord(s"id-$i", s"name-$i", s"email-$i@example.com")
        }.toList
      } else {
        List.empty
      }
    }
  }

  case class DatabaseRecord(id: String, name: String, email: String)
}

Flow Composition and Transformation

// Complex flow compositions
object FlowComposition {

  implicit val system: ActorSystem = ActorSystem("flow-composition")
  implicit val materializer: Materializer = Materializer(system)
  implicit val ec: ExecutionContext = system.dispatcher

  // Data transformation pipeline
  case class RawEvent(id: String, timestamp: Long, data: String, eventType: String)
  case class ProcessedEvent(id: String, timestamp: Instant, payload: Map[String, Any], category: String, enriched: Boolean)
  case class AggregatedMetrics(category: String, count: Long, averageProcessingTime: Long, errorRate: Double)

  // Multi-stage processing flow
  def dataProcessingPipeline(): Flow[RawEvent, ProcessedEvent, NotUsed] = {
    Flow[RawEvent]
      .map(parseEvent)
      .filter(_.isDefined)
      .map(_.get)
      .via(enrichmentFlow)
      .via(validationFlow)
      .via(transformationFlow)
  }

  private def parseEvent(raw: RawEvent): Option[ProcessedEvent] = {
    try {
      val payload = parseJsonString(raw.data)
      Some(ProcessedEvent(
        id = raw.id,
        timestamp = Instant.ofEpochMilli(raw.timestamp),
        payload = payload,
        category = raw.eventType,
        enriched = false
      ))
    } catch {
      case _: Exception => None
    }
  }

  private def parseJsonString(json: String): Map[String, Any] = {
    // Simplified JSON parsing
    Map("raw" -> json, "parsed" -> true)
  }

  // Enrichment flow with external lookups
  private def enrichmentFlow: Flow[ProcessedEvent, ProcessedEvent, NotUsed] = {
    Flow[ProcessedEvent]
      .mapAsync(parallelism = 4) { event =>
        enrichWithExternalData(event)
      }
  }

  private def enrichWithExternalData(event: ProcessedEvent): Future[ProcessedEvent] = {
    // Simulate external API call
    Future {
      Thread.sleep(Random.nextInt(50))
      event.copy(
        payload = event.payload + ("enriched_data" -> s"enriched-${event.id}"),
        enriched = true
      )
    }
  }

  // Validation flow with error handling
  private def validationFlow: Flow[ProcessedEvent, ProcessedEvent, NotUsed] = {
    Flow[ProcessedEvent]
      .map { event =>
        if (isValidEvent(event)) event
        else throw new IllegalArgumentException(s"Invalid event: ${event.id}")
      }
      .recover {
        case _: IllegalArgumentException => 
          ProcessedEvent("error", Instant.now(), Map("error" -> true), "error", false)
      }
  }

  private def isValidEvent(event: ProcessedEvent): Boolean = {
    event.id.nonEmpty && event.payload.nonEmpty && event.category.nonEmpty
  }

  // Transformation flow with business logic
  private def transformationFlow: Flow[ProcessedEvent, ProcessedEvent, NotUsed] = {
    Flow[ProcessedEvent]
      .map(applyBusinessRules)
      .map(addMetadata)
  }

  private def applyBusinessRules(event: ProcessedEvent): ProcessedEvent = {
    val transformedPayload = event.payload.map {
      case (key, value: String) if key.contains("amount") => 
        (key, value.toDoubleOption.getOrElse(0.0))
      case (key, value) => (key, value)
    }

    event.copy(payload = transformedPayload)
  }

  private def addMetadata(event: ProcessedEvent): ProcessedEvent = {
    val metadata = Map(
      "processing_timestamp" -> Instant.now().toEpochMilli,
      "version" -> "1.0",
      "processor" -> "akka-streams"
    )

    event.copy(payload = event.payload ++ metadata)
  }

  // Branching and merging flows
  def branchingFlow(): Flow[ProcessedEvent, ProcessedEvent, NotUsed] = {
    Flow[ProcessedEvent]
      .partition(3, event => event.category match {
        case "user" => 0
        case "order" => 1
        case _ => 2
      })
      .via(Flow[ProcessedEvent].map(processUserEvent))
      .via(Flow[ProcessedEvent].map(processOrderEvent))
      .via(Flow[ProcessedEvent].map(processOtherEvent))
      .merge(3)
  }

  private def processUserEvent(event: ProcessedEvent): ProcessedEvent = {
    if (event.category == "user") {
      event.copy(payload = event.payload + ("user_processed" -> true))
    } else event
  }

  private def processOrderEvent(event: ProcessedEvent): ProcessedEvent = {
    if (event.category == "order") {
      event.copy(payload = event.payload + ("order_processed" -> true))
    } else event
  }

  private def processOtherEvent(event: ProcessedEvent): ProcessedEvent = {
    if (event.category != "user" && event.category != "order") {
      event.copy(payload = event.payload + ("other_processed" -> true))
    } else event
  }

  // Aggregation flows with windowing
  def aggregationFlow(): Flow[ProcessedEvent, AggregatedMetrics, NotUsed] = {
    Flow[ProcessedEvent]
      .groupedWithin(100, 10.seconds)
      .map(aggregateEvents)
  }

  private def aggregateEvents(events: Seq[ProcessedEvent]): AggregatedMetrics = {
    val category = events.headOption.map(_.category).getOrElse("unknown")
    val count = events.length
    val errorCount = events.count(_.category == "error")
    val errorRate = if (count > 0) errorCount.toDouble / count else 0.0

    // Simulate processing time calculation
    val avgProcessingTime = Random.nextInt(100) + 50

    AggregatedMetrics(category, count, avgProcessingTime, errorRate)
  }

  // Conditional flows
  def conditionalFlow(): Flow[ProcessedEvent, ProcessedEvent, NotUsed] = {
    Flow[ProcessedEvent]
      .filter(_.enriched)
      .map { event =>
        if (event.payload.contains("priority")) {
          processPriorityEvent(event)
        } else {
          processNormalEvent(event)
        }
      }
  }

  private def processPriorityEvent(event: ProcessedEvent): ProcessedEvent = {
    event.copy(payload = event.payload + ("priority_processed" -> true))
  }

  private def processNormalEvent(event: ProcessedEvent): ProcessedEvent = {
    event.copy(payload = event.payload + ("normal_processed" -> true))
  }
}

// Advanced flow patterns
object AdvancedFlowPatterns {

  implicit val system: ActorSystem = ActorSystem("advanced-flows")
  implicit val materializer: Materializer = Materializer(system)
  implicit val ec: ExecutionContext = system.dispatcher

  // Retry flow with exponential backoff
  def retryFlow[T](maxRetries: Int = 3): Flow[T, T, NotUsed] = {
    Flow[T].mapAsync(1) { element =>
      retryWithBackoff(maxRetries, 1.second) {
        processWithPossibleFailure(element)
      }
    }
  }

  private def retryWithBackoff[T](
    maxRetries: Int,
    initialDelay: FiniteDuration
  )(operation: => Future[T]): Future[T] = {

    def attempt(retriesLeft: Int, delay: FiniteDuration): Future[T] = {
      operation.recoverWith {
        case _: Exception if retriesLeft > 0 =>
          akka.pattern.after(delay, system.scheduler) {
            attempt(retriesLeft - 1, delay * 2)
          }
      }
    }

    attempt(maxRetries, initialDelay)
  }

  private def processWithPossibleFailure[T](element: T): Future[T] = {
    Future {
      if (Random.nextDouble() < 0.3) { // 30% failure rate
        throw new RuntimeException("Random failure")
      }
      element
    }
  }

  // Circuit breaker flow
  def circuitBreakerFlow[T](): Flow[T, T, NotUsed] = {
    import akka.pattern.CircuitBreaker

    val circuitBreaker = CircuitBreaker(
      scheduler = system.scheduler,
      maxFailures = 5,
      callTimeout = 10.seconds,
      resetTimeout = 1.minute
    )

    Flow[T].mapAsync(1) { element =>
      circuitBreaker.withCircuitBreaker {
        processWithCircuitBreaker(element)
      }
    }
  }

  private def processWithCircuitBreaker[T](element: T): Future[T] = {
    Future {
      if (Random.nextDouble() < 0.2) { // 20% failure rate
        throw new RuntimeException("Service unavailable")
      }
      Thread.sleep(Random.nextInt(100))
      element
    }
  }

  // Batching flow
  def batchingFlow[T](batchSize: Int, maxDuration: FiniteDuration): Flow[T, Seq[T], NotUsed] = {
    Flow[T]
      .groupedWithin(batchSize, maxDuration)
      .filter(_.nonEmpty)
  }

  // Deduplication flow
  def deduplicationFlow[T](keyExtractor: T => String): Flow[T, T, NotUsed] = {
    Flow[T]
      .statefulMapConcat { () =>
        val seen = scala.collection.mutable.Set[String]()

        { element: T =>
          val key = keyExtractor(element)
          if (seen.contains(key)) {
            List.empty // Duplicate, filter out
          } else {
            seen.add(key)
            List(element)
          }
        }
      }
  }

  // Rate limiting flow
  def rateLimitingFlow[T](
    elementsPerSecond: Int,
    burstSize: Int = 10
  ): Flow[T, T, NotUsed] = {
    Flow[T]
      .throttle(
        elements = elementsPerSecond,
        per = 1.second,
        maximumBurst = burstSize,
        mode = ThrottleMode.Shaping
      )
  }

  // Load balancing flow
  def loadBalancingFlow[T](
    workerCount: Int
  )(processor: T => Future[T]): Flow[T, T, NotUsed] = {
    Flow[T]
      .mapAsyncUnordered(workerCount)(processor)
  }

  // Timeout flow
  def timeoutFlow[T](timeout: FiniteDuration): Flow[T, T, NotUsed] = {
    Flow[T]
      .idleTimeout(timeout)
      .recover {
        case _: TimeoutException => 
          throw new RuntimeException("Flow timeout exceeded")
      }
  }

  // Monitoring flow
  def monitoringFlow[T](name: String): Flow[T, T, NotUsed] = {
    Flow[T]
      .map { element =>
        system.log.debug(s"Processing element in flow $name: $element")
        element
      }
      .wireTap(element => 
        system.log.info(s"Processed element in flow $name")
      )
  }

  // Sampling flow
  def samplingFlow[T](sampleRate: Double): Flow[T, T, NotUsed] = {
    Flow[T]
      .filter(_ => Random.nextDouble() < sampleRate)
  }

  // Checkpoint flow for error recovery
  def checkpointFlow[T](checkpointInterval: Int): Flow[T, T, NotUsed] = {
    Flow[T]
      .zipWithIndex
      .map { case (element, index) =>
        if (index % checkpointInterval == 0) {
          saveCheckpoint(index, element)
        }
        element
      }
  }

  private def saveCheckpoint[T](index: Long, element: T): Unit = {
    // Simulate checkpoint saving
    system.log.info(s"Checkpoint saved at index $index")
  }
}

Advanced Sinks and Materialization

Custom Sinks and Complex Outputs

// Advanced sink patterns
object AdvancedSinks {

  implicit val system: ActorSystem = ActorSystem("advanced-sinks")
  implicit val materializer: Materializer = Materializer(system)
  implicit val ec: ExecutionContext = system.dispatcher

  // Database sink with batching
  def databaseSink[T](
    batchSize: Int = 100,
    flushInterval: FiniteDuration = 5.seconds
  )(
    writeFunction: Seq[T] => Future[Done]
  ): Sink[T, Future[Done]] = {

    Flow[T]
      .groupedWithin(batchSize, flushInterval)
      .mapAsync(1)(writeFunction)
      .toMat(Sink.ignore)(Keep.right)
  }

  // File sink with rotation
  def rotatingFileSink(
    baseFileName: String,
    maxFileSize: Long = 10 * 1024 * 1024, // 10MB
    maxFiles: Int = 10
  ): Sink[ByteString, Future[Done]] = {

    Flow[ByteString]
      .statefulMapConcat { () =>
        var currentFile = 0
        var currentSize = 0L

        { data: ByteString =>
          currentSize += data.length

          if (currentSize > maxFileSize) {
            currentFile = (currentFile + 1) % maxFiles
            currentSize = data.length
          }

          List((s"$baseFileName.$currentFile", data))
        }
      }
      .groupBy(maxFiles, _._1)
      .map(_._2)
      .to(FileIO.toPath(Paths.get("output")))
      .run()
  }

  // Kafka sink
  def kafkaSink[T](
    topic: String,
    bootstrapServers: String
  )(
    serializer: T => String
  ): Sink[T, Future[Done]] = {

    import akka.kafka.ProducerSettings
    import akka.kafka.scaladsl.Producer
    import org.apache.kafka.clients.producer.ProducerRecord
    import org.apache.kafka.common.serialization.StringSerializer

    val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
      .withBootstrapServers(bootstrapServers)

    Flow[T]
      .map { element =>
        new ProducerRecord[String, String](topic, serializer(element))
      }
      .to(Producer.plainSink(producerSettings))
  }

  // Elasticsearch sink
  def elasticsearchSink[T](
    indexName: String,
    typeName: String
  )(
    documentId: T => String,
    serializer: T => String
  ): Sink[T, Future[Done]] = {

    Flow[T]
      .map { element =>
        ElasticsearchDocument(
          id = documentId(element),
          source = serializer(element)
        )
      }
      .groupedWithin(100, 5.seconds)
      .mapAsync(1)(bulkIndexToElasticsearch)
      .toMat(Sink.ignore)(Keep.right)
  }

  case class ElasticsearchDocument(id: String, source: String)

  private def bulkIndexToElasticsearch(documents: Seq[ElasticsearchDocument]): Future[Done] = {
    // Simulate Elasticsearch bulk indexing
    Future {
      println(s"Bulk indexing ${documents.length} documents to Elasticsearch")
      Thread.sleep(100)
      Done
    }
  }

  // HTTP sink
  def httpSink[T](
    endpoint: String,
    parallelism: Int = 4
  )(
    requestBuilder: T => HttpRequest
  ): Sink[T, Future[Done]] = {

    import akka.http.scaladsl.Http

    Flow[T]
      .map(requestBuilder)
      .mapAsync(parallelism) { request =>
        Http().singleRequest(request).map { response =>
          if (response.status.isSuccess()) {
            response.discardEntityBytes()
            Done
          } else {
            throw new RuntimeException(s"HTTP request failed with status: ${response.status}")
          }
        }
      }
      .toMat(Sink.ignore)(Keep.right)
  }

  // WebSocket sink
  def webSocketSink[T](
    url: String
  )(
    serializer: T => String
  ): Sink[T, Future[Done]] = {

    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model.ws._
    import akka.stream.scaladsl.{Flow, Sink, Source}

    val wsFlow = Http().webSocketClientFlow(WebSocketRequest(url))

    Flow[T]
      .map(element => TextMessage(serializer(element)))
      .via(wsFlow)
      .to(Sink.ignore)
  }

  // Multi-output sink (fan-out)
  def multiOutputSink[T](
    sinks: Sink[T, _]*
  ): Sink[T, NotUsed] = {
    Sink.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[T](sinks.length))

      sinks.zipWithIndex.foreach { case (sink, index) =>
        broadcast.out(index) ~> sink
      }

      SinkShape(broadcast.in)
    })
  }

  // Conditional sink
  def conditionalSink[T](
    predicate: T => Boolean,
    trueSink: Sink[T, _],
    falseSink: Sink[T, _]
  ): Sink[T, NotUsed] = {

    Sink.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val partition = builder.add(Partition[T](2, element => if (predicate(element)) 0 else 1))

      partition.out(0) ~> trueSink
      partition.out(1) ~> falseSink

      SinkShape(partition.in)
    })
  }

  // Metrics collection sink
  def metricsSink[T](
    metricName: String
  )(
    valueExtractor: T => Double
  ): Sink[T, Future[Done]] = {

    Flow[T]
      .map(valueExtractor)
      .scan((0.0, 0L, 0.0, Double.MaxValue, Double.MinValue)) { 
        case ((sum, count, _, min, max), value) =>
          (sum + value, count + 1, sum / (count + 1), Math.min(min, value), Math.max(max, value))
      }
      .throttle(1, 10.seconds)
      .map { case (sum, count, avg, min, max) =>
        println(s"Metrics for $metricName - Count: $count, Sum: $sum, Avg: $avg, Min: $min, Max: $max")
      }
      .toMat(Sink.ignore)(Keep.right)
  }

  // Logging sink with different levels
  def loggingSink[T](
    logLevel: String = "INFO"
  )(
    formatter: T => String
  ): Sink[T, Future[Done]] = {

    Sink.foreach[T] { element =>
      val message = formatter(element)
      logLevel.toUpperCase match {
        case "DEBUG" => system.log.debug(message)
        case "INFO" => system.log.info(message)
        case "WARN" => system.log.warning(message)
        case "ERROR" => system.log.error(message)
        case _ => println(message)
      }
    }
  }
}

// Graph DSL and complex topologies
object GraphDSLPatterns {

  implicit val system: ActorSystem = ActorSystem("graph-dsl")
  implicit val materializer: Materializer = Materializer(system)
  implicit val ec: ExecutionContext = system.dispatcher

  // Fan-out and fan-in pattern
  def fanOutFanInGraph[T](): RunnableGraph[NotUsed] = {
    RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val source = Source(1 to 100)
      val broadcast = builder.add(Broadcast[Int](3))
      val merge = builder.add(Merge[String](3))
      val sink = Sink.foreach[String](println)

      val flow1 = Flow[Int].map(i => s"Flow1: ${i * 2}")
      val flow2 = Flow[Int].map(i => s"Flow2: ${i * 3}")
      val flow3 = Flow[Int].map(i => s"Flow3: ${i * 4}")

      source ~> broadcast

      broadcast.out(0) ~> flow1 ~> merge.in(0)
      broadcast.out(1) ~> flow2 ~> merge.in(1)
      broadcast.out(2) ~> flow3 ~> merge.in(2)

      merge ~> sink

      ClosedShape
    })
  }

  // Priority merge pattern
  def priorityMergeGraph(): RunnableGraph[NotUsed] = {
    RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val highPrioritySource = Source.tick(2.seconds, 2.seconds, "HIGH")
      val normalPrioritySource = Source.tick(1.second, 1.second, "NORMAL")
      val lowPrioritySource = Source.tick(500.millis, 500.millis, "LOW")

      val priorityMerge = builder.add(MergePreferred[String](2))
      val sink = Sink.foreach[String](message => println(s"Processing: $message"))

      highPrioritySource ~> priorityMerge.preferred
      normalPrioritySource ~> priorityMerge.in(0)
      lowPrioritySource ~> priorityMerge.in(1)

      priorityMerge ~> sink

      ClosedShape
    })
  }

  // Load balancer pattern
  def loadBalancerGraph[T](workerCount: Int): Graph[FlowShape[T, T], NotUsed] = {
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val balance = builder.add(Balance[T](workerCount))
      val merge = builder.add(Merge[T](workerCount))

      val workerFlow = Flow[T].map { element =>
        Thread.sleep(Random.nextInt(1000)) // Simulate work
        element
      }

      for (i <- 0 until workerCount) {
        balance.out(i) ~> workerFlow.async ~> merge.in(i)
      }

      FlowShape(balance.in, merge.out)
    }
  }

  // Cyclic graph with feedback
  def cyclicGraphWithFeedback(): RunnableGraph[NotUsed] = {
    RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val source = Source(1 to 10)
      val merge = builder.add(Merge[Int](2))
      val broadcast = builder.add(Broadcast[Int](2))
      val sink = Sink.foreach[Int](println)

      val processingFlow = Flow[Int].map(_ + 1)
      val feedbackFlow = Flow[Int]
        .filter(_ < 50) // Prevent infinite feedback
        .map(_ * 2)

      source ~> merge ~> processingFlow ~> broadcast

      broadcast.out(0) ~> sink
      broadcast.out(1) ~> feedbackFlow ~> merge

      ClosedShape
    })
  }

  // Hub pattern for multiple consumers
  def hubPattern(): (Sink[String, NotUsed], Source[String, NotUsed]) = {
    val (sink, source) = MergeHub.source[String](perProducerBufferSize = 16)
      .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both)
      .run()

    (sink, source)
  }

  // Error handling graph with dead letter queue
  def errorHandlingGraph[T](): Graph[FlowShape[T, T], NotUsed] = {
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val partition = builder.add(Partition[Either[Throwable, T]](2, {
        case Left(_) => 0  // Error path
        case Right(_) => 1 // Success path
      }))

      val merge = builder.add(Merge[T](1))

      val processingFlow = Flow[T].map { element =>
        if (Random.nextDouble() < 0.1) { // 10% error rate
          Left(new RuntimeException(s"Processing failed for $element"))
        } else {
          Right(element)
        }
      }

      val errorFlow = Flow[Either[Throwable, T]]
        .collect { case Left(error) => error }
        .to(Sink.foreach(error => println(s"Error: ${error.getMessage}")))

      val successFlow = Flow[Either[Throwable, T]]
        .collect { case Right(value) => value }

      FlowShape.of(
        processingFlow.via(partition.in).via(partition.out(1)).via(successFlow).outlet,
        merge.out
      )
    }
  }
}

Error Recovery and Resilience Patterns

Supervision and Recovery Strategies

object ResiliencePatterns {

  implicit val system: ActorSystem = ActorSystem("resilience")
  implicit val materializer: Materializer = Materializer(system)
  implicit val ec: ExecutionContext = system.dispatcher

  // Restart strategy for transient failures
  def restartOnFailureSource[T](
    sourceFactory: () => Source[T, _],
    minBackoff: FiniteDuration = 3.seconds,
    maxBackoff: FiniteDuration = 30.seconds,
    randomFactor: Double = 0.2
  ): Source[T, NotUsed] = {

    RestartSource.withBackoff(
      RestartSettings(minBackoff, maxBackoff, randomFactor)
    )(sourceFactory)
  }

  // Graceful degradation pattern
  def gracefulDegradationFlow[T, R](
    primaryProcessor: T => Future[R],
    fallbackProcessor: T => Future[R],
    timeout: FiniteDuration = 5.seconds
  ): Flow[T, R, NotUsed] = {

    Flow[T].mapAsync(1) { element =>
      val primaryFuture = primaryProcessor(element)
      val timeoutFuture = akka.pattern.after(timeout, system.scheduler) {
        Future.failed(new TimeoutException("Primary processor timeout"))
      }

      Future.firstCompletedOf(Seq(primaryFuture, timeoutFuture))
        .recoverWith {
          case _: Exception => fallbackProcessor(element)
        }
    }
  }

  // Bulkhead pattern for isolation
  def bulkheadPattern[T](
    criticalProcessor: T => Future[T],
    normalProcessor: T => Future[T],
    classifier: T => Boolean,
    criticalParallelism: Int = 2,
    normalParallelism: Int = 8
  ): Flow[T, T, NotUsed] = {

    Flow[T]
      .partition(2, element => if (classifier(element)) 0 else 1)
      .via(Flow[T].mapAsync(criticalParallelism)(criticalProcessor))
      .via(Flow[T].mapAsync(normalParallelism)(normalProcessor))
      .merge(2)
  }

  // Health check and monitoring
  def healthCheckFlow[T](
    healthChecker: () => Future[Boolean],
    checkInterval: FiniteDuration = 30.seconds
  ): Flow[T, T, NotUsed] = {

    Flow[T]
      .zip(Source.tick(0.seconds, checkInterval, ()).mapAsync(1)(_ => healthChecker()))
      .map {
        case (element, isHealthy) =>
          if (!isHealthy) {
            system.log.warning("Health check failed")
            throw new RuntimeException("System unhealthy")
          }
          element
      }
  }

  // Dead letter queue pattern
  def deadLetterQueueSink[T](
    primarySink: Sink[T, Future[Done]],
    deadLetterSink: Sink[T, Future[Done]],
    maxRetries: Int = 3
  ): Sink[T, Future[Done]] = {

    Flow[T]
      .mapAsync(1) { element =>
        retryElement(element, primarySink, maxRetries)
          .recoverWith {
            case _ => 
              system.log.warning(s"Element failed after $maxRetries retries, sending to dead letter queue")
              Source.single(element).runWith(deadLetterSink)
          }
      }
      .toMat(Sink.ignore)(Keep.right)
  }

  private def retryElement[T](
    element: T,
    sink: Sink[T, Future[Done]],
    retriesLeft: Int
  ): Future[Done] = {
    Source.single(element).runWith(sink).recoverWith {
      case _ if retriesLeft > 0 =>
        akka.pattern.after(1.second, system.scheduler) {
          retryElement(element, sink, retriesLeft - 1)
        }
    }
  }

  // Kill switch for graceful shutdown
  def killSwitchExample(): Unit = {
    val (killSwitch, done) = Source.tick(0.seconds, 1.second, "tick")
      .viaMat(KillSwitches.single)(Keep.right)
      .to(Sink.foreach(println))
      .run()

    // Shutdown after 10 seconds
    system.scheduler.scheduleOnce(10.seconds) {
      killSwitch.shutdown()
    }

    done.onComplete {
      case Success(_) => println("Stream completed gracefully")
      case Failure(exception) => println(s"Stream failed: $exception")
    }
  }

  // Shared kill switch for multiple streams
  def sharedKillSwitchExample(): Unit = {
    val sharedKillSwitch = KillSwitches.shared("my-streams")

    val stream1 = Source.tick(0.seconds, 1.second, "stream1")
      .via(sharedKillSwitch.flow)
      .to(Sink.foreach(println))
      .run()

    val stream2 = Source.tick(0.seconds, 2.seconds, "stream2")
      .via(sharedKillSwitch.flow)
      .to(Sink.foreach(println))
      .run()

    // Shutdown all streams after 15 seconds
    system.scheduler.scheduleOnce(15.seconds) {
      sharedKillSwitch.shutdown()
    }
  }
}

Conclusion

Reactive Streams and Akka Streams provide powerful abstractions for building resilient, high-throughput data processing systems. Key concepts include:

Core Principles:

  • Backpressure handling for flow control
  • Non-blocking asynchronous processing
  • Composable stream operations
  • Built-in error handling and supervision

Stream Components:

  • Sources for data generation and ingestion
  • Flows for transformation and processing
  • Sinks for data consumption and output
  • Graph DSL for complex topologies

Advanced Patterns:

  • Rate limiting and throttling
  • Circuit breakers and bulkheads
  • Retry mechanisms with exponential backoff
  • Dead letter queues for error handling

Resilience Features:

  • Supervision strategies for fault tolerance
  • Graceful degradation patterns
  • Health monitoring and alerting
  • Kill switches for controlled shutdown

Performance Optimization:

  • Parallelism and load balancing
  • Batching and buffering strategies
  • Resource isolation and management
  • Monitoring and metrics collection

Integration Capabilities:

  • File system and network I/O
  • Database and message queue connectivity
  • HTTP and WebSocket endpoints
  • External service integration

Best Practices:

  • Proper error handling and recovery
  • Resource management and cleanup
  • Testing strategies for stream applications
  • Monitoring and observability

Akka Streams enables developers to build sophisticated data processing pipelines that can handle varying loads, recover from failures, and maintain high throughput while preserving data integrity and system stability.