Reactive Streams and Akka Streams: Building Resilient Data Pipelines
Reactive Streams and Akka Streams provide powerful abstractions for building resilient, high-throughput data processing pipelines with built-in backpressure handling. This comprehensive lesson covers stream fundamentals, flow composition, error recovery, and advanced patterns for scalable streaming applications.
Reactive Streams Fundamentals
Core Concepts and Backpressure
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.util.ByteString
import akka.{Done, NotUsed}
import scala.concurrent.duration._
import scala.concurrent.{Future, ExecutionContext}
import scala.util.{Success, Failure, Random}
import java.nio.file.Paths
import java.time.{Instant, LocalDateTime}
// Basic reactive streams setup
object ReactiveStreamsBasics {
implicit val system: ActorSystem = ActorSystem("reactive-streams")
implicit val materializer: Materializer = Materializer(system)
implicit val ec: ExecutionContext = system.dispatcher
// Simple source creation and consumption
def basicSourceExample(): Future[Done] = {
val source: Source[Int, NotUsed] = Source(1 to 100)
source
.map(_ * 2)
.filter(_ % 4 == 0)
.take(10)
.runWith(Sink.foreach(println))
}
// Backpressure demonstration
def backpressureExample(): Future[Done] = {
val fastSource = Source.tick(0.millis, 1.millis, "fast-element")
val slowSink = Sink.foreach[String] { element =>
Thread.sleep(100) // Simulate slow processing
println(s"Processed: $element")
}
fastSource
.buffer(10, OverflowStrategy.dropHead) // Handle backpressure
.take(50)
.runWith(slowSink)
}
// Custom source with state
def createCustomSource(): Source[String, NotUsed] = {
Source.unfold(0) { state =>
if (state < 100) {
Some((state + 1, s"Item-$state"))
} else {
None
}
}
}
// Rate-limited processing
def rateLimitedProcessing(): Future[Done] = {
Source(1 to 1000)
.throttle(10, 1.second) // 10 elements per second
.map(processItem)
.runWith(Sink.ignore)
}
private def processItem(item: Int): String = {
// Simulate processing
Thread.sleep(Random.nextInt(50))
s"Processed-$item"
}
// Error handling in streams
def errorHandlingExample(): Future[Done] = {
Source(1 to 20)
.map { i =>
if (i % 7 == 0) throw new RuntimeException(s"Error at $i")
else i * 2
}
.recover {
case _: RuntimeException => -1
}
.runWith(Sink.foreach(println))
}
// Supervision strategies
def supervisionExample(): Future[Done] = {
val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _: IllegalArgumentException => Supervision.Restart
case _: RuntimeException => Supervision.Stop
case _ => Supervision.Escalate
}
val settings = ActorMaterializerSettings(system)
.withSupervisionStrategy(decider)
implicit val materializer: Materializer = Materializer(settings)
Source(1 to 20)
.map { i =>
if (i % 5 == 0) throw new ArithmeticException("Division by zero")
if (i % 7 == 0) throw new IllegalArgumentException("Invalid argument")
i * 2
}
.runWith(Sink.foreach(println))
}
}
// Advanced source patterns
object AdvancedSources {
implicit val system: ActorSystem = ActorSystem("advanced-sources")
implicit val materializer: Materializer = Materializer(system)
implicit val ec: ExecutionContext = system.dispatcher
// Timer-based sources
def timerBasedSources(): Source[String, Cancellable] = {
Source.tick(
initialDelay = 1.second,
interval = 5.seconds,
tick = "heartbeat"
).map(_ => s"Heartbeat at ${Instant.now()}")
}
// Queue-based source for dynamic data
def queueBasedSource(): (SourceQueueWithComplete[String], Source[String, NotUsed]) = {
val (queue, source) = Source.queue[String](
bufferSize = 100,
overflowStrategy = OverflowStrategy.backpressure
).preMaterialize()
// Simulate external data feeding
system.scheduler.scheduleAtFixedRate(1.second, 2.seconds) { () =>
queue.offer(s"External data at ${System.currentTimeMillis()}")
}
(queue, source)
}
// ActorRef-based source
def actorRefSource(): Source[String, ActorRef] = {
Source.actorRef[String](
completionMatcher = {
case "complete" => CompletionStrategy.immediately
},
failureMatcher = {
case "fail" => new RuntimeException("Stream failed")
},
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropHead
)
}
// File-based sources
def fileSource(filePath: String): Source[ByteString, Future[IOResult]] = {
FileIO.fromPath(Paths.get(filePath))
}
def csvFileSource(filePath: String): Source[Map[String, String], Future[IOResult]] = {
FileIO.fromPath(Paths.get(filePath))
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 8192))
.map(_.utf8String)
.drop(1) // Skip header
.map(parseCsvLine)
}
private def parseCsvLine(line: String): Map[String, String] = {
val headers = Array("id", "name", "email", "age")
val values = line.split(",").map(_.trim)
headers.zip(values).toMap
}
// HTTP-based source
def httpSource(): Source[String, NotUsed] = {
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
Source.repeat(HttpRequest(uri = "https://api.example.com/data"))
.throttle(1, 5.seconds) // Rate limit API calls
.mapAsync(1) { request =>
Http().singleRequest(request).flatMap { response =>
response.entity.toStrict(5.seconds).map(_.data.utf8String)
}
}
}
// Database source with pagination
def databaseSource(): Source[DatabaseRecord, NotUsed] = {
Source.unfoldAsync(0) { offset =>
fetchDatabasePage(offset, pageSize = 100).map { records =>
if (records.nonEmpty) {
Some((offset + records.length, records))
} else {
None
}
}
}.mapConcat(identity)
}
private def fetchDatabasePage(offset: Int, pageSize: Int): Future[List[DatabaseRecord]] = {
// Simulate database query
Future {
if (offset < 1000) {
(offset until Math.min(offset + pageSize, 1000)).map { i =>
DatabaseRecord(s"id-$i", s"name-$i", s"email-$i@example.com")
}.toList
} else {
List.empty
}
}
}
case class DatabaseRecord(id: String, name: String, email: String)
}
Flow Composition and Transformation
// Complex flow compositions
object FlowComposition {
implicit val system: ActorSystem = ActorSystem("flow-composition")
implicit val materializer: Materializer = Materializer(system)
implicit val ec: ExecutionContext = system.dispatcher
// Data transformation pipeline
case class RawEvent(id: String, timestamp: Long, data: String, eventType: String)
case class ProcessedEvent(id: String, timestamp: Instant, payload: Map[String, Any], category: String, enriched: Boolean)
case class AggregatedMetrics(category: String, count: Long, averageProcessingTime: Long, errorRate: Double)
// Multi-stage processing flow
def dataProcessingPipeline(): Flow[RawEvent, ProcessedEvent, NotUsed] = {
Flow[RawEvent]
.map(parseEvent)
.filter(_.isDefined)
.map(_.get)
.via(enrichmentFlow)
.via(validationFlow)
.via(transformationFlow)
}
private def parseEvent(raw: RawEvent): Option[ProcessedEvent] = {
try {
val payload = parseJsonString(raw.data)
Some(ProcessedEvent(
id = raw.id,
timestamp = Instant.ofEpochMilli(raw.timestamp),
payload = payload,
category = raw.eventType,
enriched = false
))
} catch {
case _: Exception => None
}
}
private def parseJsonString(json: String): Map[String, Any] = {
// Simplified JSON parsing
Map("raw" -> json, "parsed" -> true)
}
// Enrichment flow with external lookups
private def enrichmentFlow: Flow[ProcessedEvent, ProcessedEvent, NotUsed] = {
Flow[ProcessedEvent]
.mapAsync(parallelism = 4) { event =>
enrichWithExternalData(event)
}
}
private def enrichWithExternalData(event: ProcessedEvent): Future[ProcessedEvent] = {
// Simulate external API call
Future {
Thread.sleep(Random.nextInt(50))
event.copy(
payload = event.payload + ("enriched_data" -> s"enriched-${event.id}"),
enriched = true
)
}
}
// Validation flow with error handling
private def validationFlow: Flow[ProcessedEvent, ProcessedEvent, NotUsed] = {
Flow[ProcessedEvent]
.map { event =>
if (isValidEvent(event)) event
else throw new IllegalArgumentException(s"Invalid event: ${event.id}")
}
.recover {
case _: IllegalArgumentException =>
ProcessedEvent("error", Instant.now(), Map("error" -> true), "error", false)
}
}
private def isValidEvent(event: ProcessedEvent): Boolean = {
event.id.nonEmpty && event.payload.nonEmpty && event.category.nonEmpty
}
// Transformation flow with business logic
private def transformationFlow: Flow[ProcessedEvent, ProcessedEvent, NotUsed] = {
Flow[ProcessedEvent]
.map(applyBusinessRules)
.map(addMetadata)
}
private def applyBusinessRules(event: ProcessedEvent): ProcessedEvent = {
val transformedPayload = event.payload.map {
case (key, value: String) if key.contains("amount") =>
(key, value.toDoubleOption.getOrElse(0.0))
case (key, value) => (key, value)
}
event.copy(payload = transformedPayload)
}
private def addMetadata(event: ProcessedEvent): ProcessedEvent = {
val metadata = Map(
"processing_timestamp" -> Instant.now().toEpochMilli,
"version" -> "1.0",
"processor" -> "akka-streams"
)
event.copy(payload = event.payload ++ metadata)
}
// Branching and merging flows
def branchingFlow(): Flow[ProcessedEvent, ProcessedEvent, NotUsed] = {
Flow[ProcessedEvent]
.partition(3, event => event.category match {
case "user" => 0
case "order" => 1
case _ => 2
})
.via(Flow[ProcessedEvent].map(processUserEvent))
.via(Flow[ProcessedEvent].map(processOrderEvent))
.via(Flow[ProcessedEvent].map(processOtherEvent))
.merge(3)
}
private def processUserEvent(event: ProcessedEvent): ProcessedEvent = {
if (event.category == "user") {
event.copy(payload = event.payload + ("user_processed" -> true))
} else event
}
private def processOrderEvent(event: ProcessedEvent): ProcessedEvent = {
if (event.category == "order") {
event.copy(payload = event.payload + ("order_processed" -> true))
} else event
}
private def processOtherEvent(event: ProcessedEvent): ProcessedEvent = {
if (event.category != "user" && event.category != "order") {
event.copy(payload = event.payload + ("other_processed" -> true))
} else event
}
// Aggregation flows with windowing
def aggregationFlow(): Flow[ProcessedEvent, AggregatedMetrics, NotUsed] = {
Flow[ProcessedEvent]
.groupedWithin(100, 10.seconds)
.map(aggregateEvents)
}
private def aggregateEvents(events: Seq[ProcessedEvent]): AggregatedMetrics = {
val category = events.headOption.map(_.category).getOrElse("unknown")
val count = events.length
val errorCount = events.count(_.category == "error")
val errorRate = if (count > 0) errorCount.toDouble / count else 0.0
// Simulate processing time calculation
val avgProcessingTime = Random.nextInt(100) + 50
AggregatedMetrics(category, count, avgProcessingTime, errorRate)
}
// Conditional flows
def conditionalFlow(): Flow[ProcessedEvent, ProcessedEvent, NotUsed] = {
Flow[ProcessedEvent]
.filter(_.enriched)
.map { event =>
if (event.payload.contains("priority")) {
processPriorityEvent(event)
} else {
processNormalEvent(event)
}
}
}
private def processPriorityEvent(event: ProcessedEvent): ProcessedEvent = {
event.copy(payload = event.payload + ("priority_processed" -> true))
}
private def processNormalEvent(event: ProcessedEvent): ProcessedEvent = {
event.copy(payload = event.payload + ("normal_processed" -> true))
}
}
// Advanced flow patterns
object AdvancedFlowPatterns {
implicit val system: ActorSystem = ActorSystem("advanced-flows")
implicit val materializer: Materializer = Materializer(system)
implicit val ec: ExecutionContext = system.dispatcher
// Retry flow with exponential backoff
def retryFlow[T](maxRetries: Int = 3): Flow[T, T, NotUsed] = {
Flow[T].mapAsync(1) { element =>
retryWithBackoff(maxRetries, 1.second) {
processWithPossibleFailure(element)
}
}
}
private def retryWithBackoff[T](
maxRetries: Int,
initialDelay: FiniteDuration
)(operation: => Future[T]): Future[T] = {
def attempt(retriesLeft: Int, delay: FiniteDuration): Future[T] = {
operation.recoverWith {
case _: Exception if retriesLeft > 0 =>
akka.pattern.after(delay, system.scheduler) {
attempt(retriesLeft - 1, delay * 2)
}
}
}
attempt(maxRetries, initialDelay)
}
private def processWithPossibleFailure[T](element: T): Future[T] = {
Future {
if (Random.nextDouble() < 0.3) { // 30% failure rate
throw new RuntimeException("Random failure")
}
element
}
}
// Circuit breaker flow
def circuitBreakerFlow[T](): Flow[T, T, NotUsed] = {
import akka.pattern.CircuitBreaker
val circuitBreaker = CircuitBreaker(
scheduler = system.scheduler,
maxFailures = 5,
callTimeout = 10.seconds,
resetTimeout = 1.minute
)
Flow[T].mapAsync(1) { element =>
circuitBreaker.withCircuitBreaker {
processWithCircuitBreaker(element)
}
}
}
private def processWithCircuitBreaker[T](element: T): Future[T] = {
Future {
if (Random.nextDouble() < 0.2) { // 20% failure rate
throw new RuntimeException("Service unavailable")
}
Thread.sleep(Random.nextInt(100))
element
}
}
// Batching flow
def batchingFlow[T](batchSize: Int, maxDuration: FiniteDuration): Flow[T, Seq[T], NotUsed] = {
Flow[T]
.groupedWithin(batchSize, maxDuration)
.filter(_.nonEmpty)
}
// Deduplication flow
def deduplicationFlow[T](keyExtractor: T => String): Flow[T, T, NotUsed] = {
Flow[T]
.statefulMapConcat { () =>
val seen = scala.collection.mutable.Set[String]()
{ element: T =>
val key = keyExtractor(element)
if (seen.contains(key)) {
List.empty // Duplicate, filter out
} else {
seen.add(key)
List(element)
}
}
}
}
// Rate limiting flow
def rateLimitingFlow[T](
elementsPerSecond: Int,
burstSize: Int = 10
): Flow[T, T, NotUsed] = {
Flow[T]
.throttle(
elements = elementsPerSecond,
per = 1.second,
maximumBurst = burstSize,
mode = ThrottleMode.Shaping
)
}
// Load balancing flow
def loadBalancingFlow[T](
workerCount: Int
)(processor: T => Future[T]): Flow[T, T, NotUsed] = {
Flow[T]
.mapAsyncUnordered(workerCount)(processor)
}
// Timeout flow
def timeoutFlow[T](timeout: FiniteDuration): Flow[T, T, NotUsed] = {
Flow[T]
.idleTimeout(timeout)
.recover {
case _: TimeoutException =>
throw new RuntimeException("Flow timeout exceeded")
}
}
// Monitoring flow
def monitoringFlow[T](name: String): Flow[T, T, NotUsed] = {
Flow[T]
.map { element =>
system.log.debug(s"Processing element in flow $name: $element")
element
}
.wireTap(element =>
system.log.info(s"Processed element in flow $name")
)
}
// Sampling flow
def samplingFlow[T](sampleRate: Double): Flow[T, T, NotUsed] = {
Flow[T]
.filter(_ => Random.nextDouble() < sampleRate)
}
// Checkpoint flow for error recovery
def checkpointFlow[T](checkpointInterval: Int): Flow[T, T, NotUsed] = {
Flow[T]
.zipWithIndex
.map { case (element, index) =>
if (index % checkpointInterval == 0) {
saveCheckpoint(index, element)
}
element
}
}
private def saveCheckpoint[T](index: Long, element: T): Unit = {
// Simulate checkpoint saving
system.log.info(s"Checkpoint saved at index $index")
}
}
Advanced Sinks and Materialization
Custom Sinks and Complex Outputs
// Advanced sink patterns
object AdvancedSinks {
implicit val system: ActorSystem = ActorSystem("advanced-sinks")
implicit val materializer: Materializer = Materializer(system)
implicit val ec: ExecutionContext = system.dispatcher
// Database sink with batching
def databaseSink[T](
batchSize: Int = 100,
flushInterval: FiniteDuration = 5.seconds
)(
writeFunction: Seq[T] => Future[Done]
): Sink[T, Future[Done]] = {
Flow[T]
.groupedWithin(batchSize, flushInterval)
.mapAsync(1)(writeFunction)
.toMat(Sink.ignore)(Keep.right)
}
// File sink with rotation
def rotatingFileSink(
baseFileName: String,
maxFileSize: Long = 10 * 1024 * 1024, // 10MB
maxFiles: Int = 10
): Sink[ByteString, Future[Done]] = {
Flow[ByteString]
.statefulMapConcat { () =>
var currentFile = 0
var currentSize = 0L
{ data: ByteString =>
currentSize += data.length
if (currentSize > maxFileSize) {
currentFile = (currentFile + 1) % maxFiles
currentSize = data.length
}
List((s"$baseFileName.$currentFile", data))
}
}
.groupBy(maxFiles, _._1)
.map(_._2)
.to(FileIO.toPath(Paths.get("output")))
.run()
}
// Kafka sink
def kafkaSink[T](
topic: String,
bootstrapServers: String
)(
serializer: T => String
): Sink[T, Future[Done]] = {
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
Flow[T]
.map { element =>
new ProducerRecord[String, String](topic, serializer(element))
}
.to(Producer.plainSink(producerSettings))
}
// Elasticsearch sink
def elasticsearchSink[T](
indexName: String,
typeName: String
)(
documentId: T => String,
serializer: T => String
): Sink[T, Future[Done]] = {
Flow[T]
.map { element =>
ElasticsearchDocument(
id = documentId(element),
source = serializer(element)
)
}
.groupedWithin(100, 5.seconds)
.mapAsync(1)(bulkIndexToElasticsearch)
.toMat(Sink.ignore)(Keep.right)
}
case class ElasticsearchDocument(id: String, source: String)
private def bulkIndexToElasticsearch(documents: Seq[ElasticsearchDocument]): Future[Done] = {
// Simulate Elasticsearch bulk indexing
Future {
println(s"Bulk indexing ${documents.length} documents to Elasticsearch")
Thread.sleep(100)
Done
}
}
// HTTP sink
def httpSink[T](
endpoint: String,
parallelism: Int = 4
)(
requestBuilder: T => HttpRequest
): Sink[T, Future[Done]] = {
import akka.http.scaladsl.Http
Flow[T]
.map(requestBuilder)
.mapAsync(parallelism) { request =>
Http().singleRequest(request).map { response =>
if (response.status.isSuccess()) {
response.discardEntityBytes()
Done
} else {
throw new RuntimeException(s"HTTP request failed with status: ${response.status}")
}
}
}
.toMat(Sink.ignore)(Keep.right)
}
// WebSocket sink
def webSocketSink[T](
url: String
)(
serializer: T => String
): Sink[T, Future[Done]] = {
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.stream.scaladsl.{Flow, Sink, Source}
val wsFlow = Http().webSocketClientFlow(WebSocketRequest(url))
Flow[T]
.map(element => TextMessage(serializer(element)))
.via(wsFlow)
.to(Sink.ignore)
}
// Multi-output sink (fan-out)
def multiOutputSink[T](
sinks: Sink[T, _]*
): Sink[T, NotUsed] = {
Sink.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[T](sinks.length))
sinks.zipWithIndex.foreach { case (sink, index) =>
broadcast.out(index) ~> sink
}
SinkShape(broadcast.in)
})
}
// Conditional sink
def conditionalSink[T](
predicate: T => Boolean,
trueSink: Sink[T, _],
falseSink: Sink[T, _]
): Sink[T, NotUsed] = {
Sink.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partition = builder.add(Partition[T](2, element => if (predicate(element)) 0 else 1))
partition.out(0) ~> trueSink
partition.out(1) ~> falseSink
SinkShape(partition.in)
})
}
// Metrics collection sink
def metricsSink[T](
metricName: String
)(
valueExtractor: T => Double
): Sink[T, Future[Done]] = {
Flow[T]
.map(valueExtractor)
.scan((0.0, 0L, 0.0, Double.MaxValue, Double.MinValue)) {
case ((sum, count, _, min, max), value) =>
(sum + value, count + 1, sum / (count + 1), Math.min(min, value), Math.max(max, value))
}
.throttle(1, 10.seconds)
.map { case (sum, count, avg, min, max) =>
println(s"Metrics for $metricName - Count: $count, Sum: $sum, Avg: $avg, Min: $min, Max: $max")
}
.toMat(Sink.ignore)(Keep.right)
}
// Logging sink with different levels
def loggingSink[T](
logLevel: String = "INFO"
)(
formatter: T => String
): Sink[T, Future[Done]] = {
Sink.foreach[T] { element =>
val message = formatter(element)
logLevel.toUpperCase match {
case "DEBUG" => system.log.debug(message)
case "INFO" => system.log.info(message)
case "WARN" => system.log.warning(message)
case "ERROR" => system.log.error(message)
case _ => println(message)
}
}
}
}
// Graph DSL and complex topologies
object GraphDSLPatterns {
implicit val system: ActorSystem = ActorSystem("graph-dsl")
implicit val materializer: Materializer = Materializer(system)
implicit val ec: ExecutionContext = system.dispatcher
// Fan-out and fan-in pattern
def fanOutFanInGraph[T](): RunnableGraph[NotUsed] = {
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val source = Source(1 to 100)
val broadcast = builder.add(Broadcast[Int](3))
val merge = builder.add(Merge[String](3))
val sink = Sink.foreach[String](println)
val flow1 = Flow[Int].map(i => s"Flow1: ${i * 2}")
val flow2 = Flow[Int].map(i => s"Flow2: ${i * 3}")
val flow3 = Flow[Int].map(i => s"Flow3: ${i * 4}")
source ~> broadcast
broadcast.out(0) ~> flow1 ~> merge.in(0)
broadcast.out(1) ~> flow2 ~> merge.in(1)
broadcast.out(2) ~> flow3 ~> merge.in(2)
merge ~> sink
ClosedShape
})
}
// Priority merge pattern
def priorityMergeGraph(): RunnableGraph[NotUsed] = {
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val highPrioritySource = Source.tick(2.seconds, 2.seconds, "HIGH")
val normalPrioritySource = Source.tick(1.second, 1.second, "NORMAL")
val lowPrioritySource = Source.tick(500.millis, 500.millis, "LOW")
val priorityMerge = builder.add(MergePreferred[String](2))
val sink = Sink.foreach[String](message => println(s"Processing: $message"))
highPrioritySource ~> priorityMerge.preferred
normalPrioritySource ~> priorityMerge.in(0)
lowPrioritySource ~> priorityMerge.in(1)
priorityMerge ~> sink
ClosedShape
})
}
// Load balancer pattern
def loadBalancerGraph[T](workerCount: Int): Graph[FlowShape[T, T], NotUsed] = {
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val balance = builder.add(Balance[T](workerCount))
val merge = builder.add(Merge[T](workerCount))
val workerFlow = Flow[T].map { element =>
Thread.sleep(Random.nextInt(1000)) // Simulate work
element
}
for (i <- 0 until workerCount) {
balance.out(i) ~> workerFlow.async ~> merge.in(i)
}
FlowShape(balance.in, merge.out)
}
}
// Cyclic graph with feedback
def cyclicGraphWithFeedback(): RunnableGraph[NotUsed] = {
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val source = Source(1 to 10)
val merge = builder.add(Merge[Int](2))
val broadcast = builder.add(Broadcast[Int](2))
val sink = Sink.foreach[Int](println)
val processingFlow = Flow[Int].map(_ + 1)
val feedbackFlow = Flow[Int]
.filter(_ < 50) // Prevent infinite feedback
.map(_ * 2)
source ~> merge ~> processingFlow ~> broadcast
broadcast.out(0) ~> sink
broadcast.out(1) ~> feedbackFlow ~> merge
ClosedShape
})
}
// Hub pattern for multiple consumers
def hubPattern(): (Sink[String, NotUsed], Source[String, NotUsed]) = {
val (sink, source) = MergeHub.source[String](perProducerBufferSize = 16)
.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both)
.run()
(sink, source)
}
// Error handling graph with dead letter queue
def errorHandlingGraph[T](): Graph[FlowShape[T, T], NotUsed] = {
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partition = builder.add(Partition[Either[Throwable, T]](2, {
case Left(_) => 0 // Error path
case Right(_) => 1 // Success path
}))
val merge = builder.add(Merge[T](1))
val processingFlow = Flow[T].map { element =>
if (Random.nextDouble() < 0.1) { // 10% error rate
Left(new RuntimeException(s"Processing failed for $element"))
} else {
Right(element)
}
}
val errorFlow = Flow[Either[Throwable, T]]
.collect { case Left(error) => error }
.to(Sink.foreach(error => println(s"Error: ${error.getMessage}")))
val successFlow = Flow[Either[Throwable, T]]
.collect { case Right(value) => value }
FlowShape.of(
processingFlow.via(partition.in).via(partition.out(1)).via(successFlow).outlet,
merge.out
)
}
}
}
Error Recovery and Resilience Patterns
Supervision and Recovery Strategies
object ResiliencePatterns {
implicit val system: ActorSystem = ActorSystem("resilience")
implicit val materializer: Materializer = Materializer(system)
implicit val ec: ExecutionContext = system.dispatcher
// Restart strategy for transient failures
def restartOnFailureSource[T](
sourceFactory: () => Source[T, _],
minBackoff: FiniteDuration = 3.seconds,
maxBackoff: FiniteDuration = 30.seconds,
randomFactor: Double = 0.2
): Source[T, NotUsed] = {
RestartSource.withBackoff(
RestartSettings(minBackoff, maxBackoff, randomFactor)
)(sourceFactory)
}
// Graceful degradation pattern
def gracefulDegradationFlow[T, R](
primaryProcessor: T => Future[R],
fallbackProcessor: T => Future[R],
timeout: FiniteDuration = 5.seconds
): Flow[T, R, NotUsed] = {
Flow[T].mapAsync(1) { element =>
val primaryFuture = primaryProcessor(element)
val timeoutFuture = akka.pattern.after(timeout, system.scheduler) {
Future.failed(new TimeoutException("Primary processor timeout"))
}
Future.firstCompletedOf(Seq(primaryFuture, timeoutFuture))
.recoverWith {
case _: Exception => fallbackProcessor(element)
}
}
}
// Bulkhead pattern for isolation
def bulkheadPattern[T](
criticalProcessor: T => Future[T],
normalProcessor: T => Future[T],
classifier: T => Boolean,
criticalParallelism: Int = 2,
normalParallelism: Int = 8
): Flow[T, T, NotUsed] = {
Flow[T]
.partition(2, element => if (classifier(element)) 0 else 1)
.via(Flow[T].mapAsync(criticalParallelism)(criticalProcessor))
.via(Flow[T].mapAsync(normalParallelism)(normalProcessor))
.merge(2)
}
// Health check and monitoring
def healthCheckFlow[T](
healthChecker: () => Future[Boolean],
checkInterval: FiniteDuration = 30.seconds
): Flow[T, T, NotUsed] = {
Flow[T]
.zip(Source.tick(0.seconds, checkInterval, ()).mapAsync(1)(_ => healthChecker()))
.map {
case (element, isHealthy) =>
if (!isHealthy) {
system.log.warning("Health check failed")
throw new RuntimeException("System unhealthy")
}
element
}
}
// Dead letter queue pattern
def deadLetterQueueSink[T](
primarySink: Sink[T, Future[Done]],
deadLetterSink: Sink[T, Future[Done]],
maxRetries: Int = 3
): Sink[T, Future[Done]] = {
Flow[T]
.mapAsync(1) { element =>
retryElement(element, primarySink, maxRetries)
.recoverWith {
case _ =>
system.log.warning(s"Element failed after $maxRetries retries, sending to dead letter queue")
Source.single(element).runWith(deadLetterSink)
}
}
.toMat(Sink.ignore)(Keep.right)
}
private def retryElement[T](
element: T,
sink: Sink[T, Future[Done]],
retriesLeft: Int
): Future[Done] = {
Source.single(element).runWith(sink).recoverWith {
case _ if retriesLeft > 0 =>
akka.pattern.after(1.second, system.scheduler) {
retryElement(element, sink, retriesLeft - 1)
}
}
}
// Kill switch for graceful shutdown
def killSwitchExample(): Unit = {
val (killSwitch, done) = Source.tick(0.seconds, 1.second, "tick")
.viaMat(KillSwitches.single)(Keep.right)
.to(Sink.foreach(println))
.run()
// Shutdown after 10 seconds
system.scheduler.scheduleOnce(10.seconds) {
killSwitch.shutdown()
}
done.onComplete {
case Success(_) => println("Stream completed gracefully")
case Failure(exception) => println(s"Stream failed: $exception")
}
}
// Shared kill switch for multiple streams
def sharedKillSwitchExample(): Unit = {
val sharedKillSwitch = KillSwitches.shared("my-streams")
val stream1 = Source.tick(0.seconds, 1.second, "stream1")
.via(sharedKillSwitch.flow)
.to(Sink.foreach(println))
.run()
val stream2 = Source.tick(0.seconds, 2.seconds, "stream2")
.via(sharedKillSwitch.flow)
.to(Sink.foreach(println))
.run()
// Shutdown all streams after 15 seconds
system.scheduler.scheduleOnce(15.seconds) {
sharedKillSwitch.shutdown()
}
}
}
Conclusion
Reactive Streams and Akka Streams provide powerful abstractions for building resilient, high-throughput data processing systems. Key concepts include:
Core Principles:
- Backpressure handling for flow control
- Non-blocking asynchronous processing
- Composable stream operations
- Built-in error handling and supervision
Stream Components:
- Sources for data generation and ingestion
- Flows for transformation and processing
- Sinks for data consumption and output
- Graph DSL for complex topologies
Advanced Patterns:
- Rate limiting and throttling
- Circuit breakers and bulkheads
- Retry mechanisms with exponential backoff
- Dead letter queues for error handling
Resilience Features:
- Supervision strategies for fault tolerance
- Graceful degradation patterns
- Health monitoring and alerting
- Kill switches for controlled shutdown
Performance Optimization:
- Parallelism and load balancing
- Batching and buffering strategies
- Resource isolation and management
- Monitoring and metrics collection
Integration Capabilities:
- File system and network I/O
- Database and message queue connectivity
- HTTP and WebSocket endpoints
- External service integration
Best Practices:
- Proper error handling and recovery
- Resource management and cleanup
- Testing strategies for stream applications
- Monitoring and observability
Akka Streams enables developers to build sophisticated data processing pipelines that can handle varying loads, recover from failures, and maintain high throughput while preserving data integrity and system stability.
Comments
Be the first to comment on this lesson!