Reactive Streams and Akka Streams: Processing Data Flows
Stream processing is essential for modern applications dealing with large volumes of data, real-time analytics, and event-driven architectures. Akka Streams provides a powerful implementation of the Reactive Streams specification, offering composable, back-pressured stream processing with built-in resilience and scalability.
Understanding Reactive Streams
Core Principles
- Non-blocking: Streams process data without blocking threads
- Backpressure: Automatic flow control prevents overwhelming consumers
- Composability: Stream operations can be combined and reused
- Resilience: Built-in error handling and recovery mechanisms
- Resource Management: Automatic cleanup and lifecycle management
Reactive Streams API
The Reactive Streams specification defines four key interfaces:
- Publisher: Produces data elements
- Subscriber: Consumes data elements
- Subscription: Represents the connection between Publisher and Subscriber
- Processor: Acts as both Publisher and Subscriber
Setting Up Akka Streams
Dependencies
// build.sbt
val AkkaVersion = "2.7.0"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test,
"com.typesafe.akka" %% "akka-http" % "10.4.0",
"com.typesafe.akka" %% "akka-stream-alpakka-csv" % "4.0.0",
"com.typesafe.akka" %% "akka-stream-alpakka-file" % "4.0.0",
"com.typesafe.akka" %% "akka-stream-alpakka-s3" % "4.0.0",
"com.lightbend.akka" %% "akka-stream-alpakka-slick" % "4.0.0"
)
Basic Stream Setup
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Source, Sink, Flow}
import akka.stream.ActorMaterializer
import akka.{Done, NotUsed}
import scala.concurrent.{Future, ExecutionContext}
import scala.util.{Success, Failure}
implicit val system: ActorSystem = ActorSystem("StreamSystem")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = system.dispatcher
Basic Stream Components
Sources: Data Producers
import akka.stream.scaladsl.Source
import scala.concurrent.duration._
// Simple source from a collection
val numberSource: Source[Int, NotUsed] = Source(1 to 10)
// Infinite source with tick
val tickSource: Source[String, Cancellable] =
Source.tick(0.seconds, 1.second, "tick")
// Source from Future
val futureSource: Source[String, NotUsed] =
Source.future(Future.successful("Hello from Future"))
// Source from single element
val singleSource: Source[String, NotUsed] =
Source.single("Single Element")
// Source from iterator
val iteratorSource: Source[Int, NotUsed] =
Source.fromIterator(() => Iterator.from(1))
// Unfold source - generates elements based on state
val fibonacciSource: Source[BigInt, NotUsed] = Source.unfold((BigInt(0), BigInt(1))) {
case (a, b) => Some(((b, a + b), a))
}
// Example: Run a simple source
numberSource
.runForeach(println)
.onComplete {
case Success(_) => println("Source completed successfully")
case Failure(ex) => println(s"Source failed: ${ex.getMessage}")
}
Flows: Data Transformers
import akka.stream.scaladsl.Flow
// Basic transformation flow
val doubleFlow: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)
// Filter flow
val evenFlow: Flow[Int, Int, NotUsed] = Flow[Int].filter(_ % 2 == 0)
// Map with async operation
val asyncFlow: Flow[String, String, NotUsed] = Flow[String]
.mapAsync(parallelism = 4) { str =>
Future {
Thread.sleep(100) // Simulate async operation
str.toUpperCase
}
}
// Stateful flow with scan
val runningTotalFlow: Flow[Int, Int, NotUsed] = Flow[Int].scan(0)(_ + _)
// Complex transformation with collect
val parseIntFlow: Flow[String, Int, NotUsed] = Flow[String]
.collect {
case s if s.forall(_.isDigit) => s.toInt
}
// Example: Chaining flows
val processingFlow = Flow[String]
.via(parseIntFlow)
.via(evenFlow)
.via(doubleFlow)
.via(runningTotalFlow)
Source(List("1", "2", "3", "4", "5", "invalid", "6"))
.via(processingFlow)
.runForeach(println)
Sinks: Data Consumers
import akka.stream.scaladsl.Sink
// Print each element
val printSink: Sink[Any, Future[Done]] = Sink.foreach(println)
// Collect all elements into a sequence
val collectSink: Sink[Int, Future[Seq[Int]]] = Sink.seq
// Fold/reduce operations
val sumSink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _)
// Take only first n elements
val headSink: Sink[Int, Future[Int]] = Sink.head
// Ignore all elements
val ignoreSink: Sink[Any, Future[Done]] = Sink.ignore
// Custom sink with function
val customSink: Sink[String, Future[Done]] = Sink.foreach { str =>
// Custom processing logic
if (str.nonEmpty) {
println(s"Processing: $str")
}
}
// Conditional sink
def conditionalSink[T](condition: T => Boolean): Sink[T, Future[Done]] =
Sink.foreach { element =>
if (condition(element)) {
println(s"Condition met for: $element")
}
}
// Example: Multiple sinks with broadcast
import akka.stream.scaladsl.Broadcast
val broadcastSink = Sink.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
broadcast.out(0) ~> Sink.foreach(x => println(s"Sink 1: $x"))
broadcast.out(1) ~> Sink.foreach(x => println(s"Sink 2: ${x * x}"))
SinkShape(broadcast.in)
})
Source(1 to 5).runWith(broadcastSink)
Advanced Stream Operations
Grouping and Batching
import akka.stream.scaladsl.{Source, Flow}
import scala.concurrent.duration._
// Group elements into batches
val batchFlow: Flow[Int, Seq[Int], NotUsed] = Flow[Int]
.grouped(3)
// Time-based batching
val timeBasedBatch: Flow[String, Seq[String], NotUsed] = Flow[String]
.groupedWithin(10, 1.second)
// Batch with weight function
val weightedBatch: Flow[String, Seq[String], NotUsed] = Flow[String]
.batch(max = 100, seed = s => List(s)) { (acc, elem) =>
if (acc.map(_.length).sum + elem.length <= 100) acc :+ elem
else List(elem)
}
// Example: Processing batches
Source.tick(0.seconds, 100.millis, "data")
.take(20)
.via(timeBasedBatch)
.runForeach { batch =>
println(s"Processed batch of ${batch.size} elements")
}
Parallelization
// Parallel processing with mapAsyncUnordered
val parallelFlow: Flow[Int, String, NotUsed] = Flow[Int]
.mapAsyncUnordered(parallelism = 4) { num =>
Future {
Thread.sleep(scala.util.Random.nextInt(1000))
s"Processed: $num"
}
}
// Parallel processing with specific execution context
implicit val processingEC: ExecutionContext =
ExecutionContext.fromExecutor(java.util.concurrent.Executors.newFixedThreadPool(8))
val cpuIntensiveFlow: Flow[Int, Int, NotUsed] = Flow[Int]
.mapAsync(4) { num =>
Future {
// Simulate CPU-intensive work
(1 to 1000000).sum + num
}(processingEC)
}
// Example: Compare ordered vs unordered processing
Source(1 to 10)
.via(parallelFlow)
.runForeach(println)
Error Handling and Resilience
import akka.stream.Supervision.{Resume, Stop, Restart}
import akka.stream.ActorAttributes
// Flow that might fail
val flakyFlow: Flow[Int, String, NotUsed] = Flow[Int]
.map { num =>
if (num % 3 == 0) throw new RuntimeException(s"Error processing $num")
s"Processed: $num"
}
// Error handling with supervision strategy
val resilientFlow = flakyFlow
.withAttributes(ActorAttributes.supervisionStrategy {
case _: RuntimeException => Resume // Skip failed elements
case _ => Stop
})
// Recover with default values
val recoverFlow: Flow[Int, String, NotUsed] = Flow[Int]
.map { num =>
if (num % 3 == 0) throw new RuntimeException(s"Error processing $num")
s"Processed: $num"
}
.recover {
case ex: RuntimeException => s"Error: ${ex.getMessage}"
}
// Retry logic
def retryFlow[In, Out](
flow: Flow[In, Out, _],
maxRetries: Int
): Flow[In, Out, NotUsed] = {
Flow[In].mapAsync(1) { input =>
def attempt(retriesLeft: Int): Future[Out] = {
Source.single(input)
.via(flow)
.runWith(Sink.head)
.recoverWith {
case ex if retriesLeft > 0 =>
Thread.sleep(100) // Backoff
attempt(retriesLeft - 1)
case ex => Future.failed(ex)
}
}
attempt(maxRetries)
}
}
// Example: Error handling in action
Source(1 to 10)
.via(resilientFlow)
.runForeach(println)
Backpressure Handling
// Buffer with overflow strategy
import akka.stream.OverflowStrategy
val bufferedFlow: Flow[Int, Int, NotUsed] = Flow[Int]
.buffer(size = 100, OverflowStrategy.backpressure)
// Dropping elements when buffer is full
val droppingFlow: Flow[Int, Int, NotUsed] = Flow[Int]
.buffer(size = 10, OverflowStrategy.dropHead)
// Conflate - combine elements under backpressure
val conflateFlow: Flow[Int, Int, NotUsed] = Flow[Int]
.conflateWithSeed(identity)(_ + _)
// Expand - create multiple elements from one
val expandFlow: Flow[Int, Int, NotUsed] = Flow[Int]
.expand(Iterator.continually(_))
// Example: Demonstrating backpressure
val slowSink = Sink.foreach[Int] { num =>
Thread.sleep(1000) // Slow consumer
println(s"Consumed: $num")
}
Source.tick(0.seconds, 100.millis, 1)
.scan(0)(_ + _)
.via(bufferedFlow)
.runWith(slowSink)
Graph DSL for Complex Stream Topologies
Fan-Out Patterns
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Broadcast, Balance}
import akka.stream.{ClosedShape, UniformFanOutShape}
// Broadcast: Send each element to all outputs
val broadcastGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val source = Source(1 to 10)
val broadcast = builder.add(Broadcast[Int](3))
val sink1 = Sink.foreach[Int](x => println(s"Sink1: $x"))
val sink2 = Sink.foreach[Int](x => println(s"Sink2: ${x * x}"))
val sink3 = Sink.foreach[Int](x => println(s"Sink3: ${x * x * x}"))
source ~> broadcast
broadcast.out(0) ~> sink1
broadcast.out(1) ~> sink2
broadcast.out(2) ~> sink3
ClosedShape
})
// Balance: Distribute elements across outputs
val balanceGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val source = Source(1 to 100)
val balance = builder.add(Balance[Int](3))
val slowSink = Flow[Int].map { x =>
Thread.sleep(100)
println(s"Slow: $x")
}.to(Sink.ignore)
val fastSink = Flow[Int].map { x =>
println(s"Fast: $x")
}.to(Sink.ignore)
source ~> balance
balance.out(0) ~> slowSink
balance.out(1) ~> fastSink
balance.out(2) ~> fastSink
ClosedShape
})
broadcastGraph.run()
Fan-In Patterns
import akka.stream.scaladsl.{Merge, MergePreferred, Zip, ZipWith}
// Merge: Combine multiple sources
val mergeGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val source1 = Source.tick(0.seconds, 1.second, "A")
val source2 = Source.tick(0.seconds, 2.seconds, "B")
val source3 = Source.tick(0.seconds, 3.seconds, "C")
val merge = builder.add(Merge[String](3))
val sink = Sink.foreach(println)
source1 ~> merge
source2 ~> merge
source3 ~> merge
merge ~> sink
ClosedShape
})
// Zip: Combine elements from sources pairwise
val zipGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val numbers = Source(1 to 10)
val letters = Source(List("a", "b", "c", "d", "e"))
val zip = builder.add(Zip[Int, String]())
val sink = Sink.foreach[(Int, String)](println)
numbers ~> zip.in0
letters ~> zip.in1
zip.out ~> sink
ClosedShape
})
// ZipWith: Combine with custom function
val zipWithGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val prices = Source(List(10.0, 20.0, 15.0, 25.0))
val quantities = Source(List(2, 1, 3, 1))
val zipWith = builder.add(ZipWith[Double, Int, Double](_ * _))
val sink = Sink.foreach[Double](total => println(f"Total: $$${total}%.2f"))
prices ~> zipWith.in0
quantities ~> zipWith.in1
zipWith.out ~> sink
ClosedShape
})
zipWithGraph.run()
Real-World Use Cases
File Processing Pipeline
import akka.stream.alpakka.file.scaladsl.Directory
import akka.stream.scaladsl.{FileIO, Framing}
import akka.util.ByteString
import java.nio.file.Paths
// Process CSV files in a directory
val csvProcessingGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val directory = Paths.get("data")
val csvFiles = Directory.ls(directory)
.filter(_.toString.endsWith(".csv"))
val processFile = Flow[java.nio.file.Path]
.flatMapConcat { path =>
FileIO.fromPath(path)
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
.map(_.utf8String)
.drop(1) // Skip header
.map(line => line.split(",").toList)
.filter(_.length >= 3) // Validate columns
}
val processCsvData = Flow[List[String]]
.collect {
case id :: name :: price :: _ if price.matches("""\d+\.\d+""") =>
Product(id, name, price.toDouble)
}
val writeResults = Sink.foreach[Product] { product =>
println(s"Processed: ${product.name} - $${product.price}")
}
csvFiles ~> processFile ~> processCsvData ~> writeResults
ClosedShape
})
case class Product(id: String, name: String, price: Double)
csvProcessingGraph.run()
Real-Time Data Processing
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.stream.scaladsl.{Keep, Source, Sink}
// WebSocket data stream processing
class RealTimeProcessor(implicit system: ActorSystem) {
def processWebSocketStream(url: String): Future[Done] = {
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest(url))
val source = Source.tick(0.seconds, 1.second, TextMessage("ping"))
val sink = Sink.foreach[Message] {
case TextMessage.Strict(text) =>
processMessage(text)
case TextMessage.Streamed(textStream) =>
textStream.runWith(Sink.foreach(chunk => processMessage(chunk)))
case _ =>
println("Received non-text message")
}
val (upgradeResponse, closed) = source
.viaMat(webSocketFlow)(Keep.right)
.toMat(sink)(Keep.both)
.run()
upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status.isSuccess()) {
closed
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
}
private def processMessage(text: String): Unit = {
// Parse and process real-time data
println(s"Processing: $text")
}
}
// Time-windowed aggregation
val windowedAggregation = Flow[Double]
.groupedWithin(100, 5.seconds)
.map { values =>
WindowedStats(
count = values.size,
sum = values.sum,
avg = values.sum / values.size,
min = values.min,
max = values.max
)
}
case class WindowedStats(count: Int, sum: Double, avg: Double, min: Double, max: Double)
// Event sourcing stream
val eventSourcingFlow = Flow[DomainEvent]
.groupBy(maxSubstreams = 100, _.aggregateId)
.scan(AggregateState.empty) { (state, event) =>
state.applyEvent(event)
}
.mergeSubstreams
trait DomainEvent {
def aggregateId: String
}
case class AggregateState(id: String, version: Long, data: Map[String, Any]) {
def applyEvent(event: DomainEvent): AggregateState = {
// Apply event to state
copy(version = version + 1)
}
}
object AggregateState {
def empty: AggregateState = AggregateState("", 0, Map.empty)
}
Database Streaming
import akka.stream.alpakka.slick.scaladsl.{Slick, SlickSession}
import slick.jdbc.PostgresProfile.api._
// Stream database results
class DatabaseStreaming(implicit slickSession: SlickSession) {
case class User(id: Long, name: String, email: String)
class Users(tag: Tag) extends Table[User](tag, "users") {
def id = column[Long]("id", O.PrimaryKey)
def name = column[String]("name")
def email = column[String]("email")
def * = (id, name, email) <> (User.tupled, User.unapply)
}
val users = TableQuery[Users]
// Stream all users with processing
def processAllUsers(): Future[Done] = {
Slick
.source(users.result)
.via(Flow[User].map(user => user.copy(email = user.email.toLowerCase)))
.grouped(100) // Batch updates
.mapAsync(1) { userBatch =>
// Batch update
val updateActions = userBatch.map { user =>
users.filter(_.id === user.id).update(user)
}
slickSession.db.run(DBIO.sequence(updateActions))
}
.runWith(Sink.ignore)
}
// Stream query results
def streamUsersByDomain(domain: String): Source[User, NotUsed] = {
Slick.source(users.filter(_.email like s"%@$domain").result)
}
// Sink for writing to database
def userInsertSink: Sink[User, Future[Done]] = {
Slick.sink(user => users += user)
}
}
Testing Streams
Stream Testing Utilities
import akka.stream.testkit.scaladsl.{TestSource, TestSink}
import akka.testkit.TestKit
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
class StreamSpec extends TestKit(ActorSystem("StreamSpec"))
with WordSpecLike with Matchers with BeforeAndAfterAll {
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}
"A simple stream" should {
"process elements correctly" in {
val source = TestSource.probe[Int]
val sink = TestSink.probe[Int]
val (pub, sub) = source
.map(_ * 2)
.toMat(sink)(Keep.both)
.run()
sub.request(3)
pub.sendNext(1)
pub.sendNext(2)
pub.sendNext(3)
sub.expectNext(2, 4, 6)
pub.sendComplete()
sub.expectComplete()
}
"handle backpressure correctly" in {
val source = TestSource.probe[Int]
val sink = TestSink.probe[Int]
val (pub, sub) = source
.buffer(2, OverflowStrategy.backpressure)
.toMat(sink)(Keep.both)
.run()
// Don't request elements initially
pub.sendNext(1)
pub.sendNext(2)
pub.sendNext(3) // This should buffer
// Now request and verify
sub.request(3)
sub.expectNext(1, 2, 3)
}
"handle errors properly" in {
val source = Source(1 to 5)
val faultyFlow = Flow[Int].map { n =>
if (n == 3) throw new RuntimeException("Test error")
n
}
val result = source
.via(faultyFlow)
.recover {
case _: RuntimeException => -1
}
.runWith(Sink.seq)
whenReady(result) { seq =>
seq should contain(-1)
}
}
}
"Custom flows" should {
"be testable in isolation" in {
val doubleFlow = Flow[Int].map(_ * 2)
val result = Source(List(1, 2, 3))
.via(doubleFlow)
.runWith(Sink.seq)
whenReady(result) { seq =>
seq shouldEqual Seq(2, 4, 6)
}
}
}
}
Performance Optimization
Stream Fusion and Optimization
// Fused operations for better performance
val fusedFlow = Flow[Int]
.map(_ + 1) // These operations
.filter(_ % 2 == 0) // will be fused
.map(_ * 2) // into a single stage
// Async boundaries for parallelization
val asyncFlow = Flow[Int]
.map(_ + 1)
.async // Async boundary
.map(heavyComputation)
.async // Another async boundary
.filter(_ > 0)
def heavyComputation(n: Int): Int = {
Thread.sleep(10)
n * n
}
// Batch processing for efficiency
val batchedProcessing = Flow[String]
.grouped(100)
.mapAsync(4) { batch =>
Future {
// Process batch efficiently
batch.map(processString).filter(_.nonEmpty)
}
}
.mapConcat(identity)
def processString(s: String): String = s.trim.toUpperCase
Memory Management
// Prevent memory leaks with proper resource management
val fileProcessingStream = FileIO.fromPath(Paths.get("large-file.txt"))
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
.map(_.utf8String)
.buffer(1000, OverflowStrategy.backpressure) // Limit memory usage
.mapAsyncUnordered(4)(processLine)
.runWith(Sink.ignore)
def processLine(line: String): Future[Unit] = {
Future {
// Process line
println(line.take(50))
}
}
// Monitoring stream performance
val monitoredFlow = Flow[Int]
.map { elem =>
val start = System.nanoTime()
val result = elem * 2
val duration = (System.nanoTime() - start) / 1000000.0
if (duration > 1.0) println(s"Slow processing: ${duration}ms")
result
}
Conclusion
Akka Streams provides a powerful foundation for building reactive, resilient stream processing applications. Key benefits include:
Reactive Principles:
- Non-blocking, asynchronous processing
- Automatic backpressure handling
- Built-in error recovery and supervision
Composability:
- Reusable stream components
- Graph DSL for complex topologies
- Type-safe stream composition
Performance:
- Stream fusion for optimization
- Configurable parallelism
- Efficient resource utilization
Real-World Applications:
- Large-scale data processing
- Real-time analytics
- Event-driven architectures
- Integration pipelines
Best Practices:
- Design for backpressure from the start
- Use appropriate buffer sizes
- Implement proper error handling
- Monitor stream performance
- Test streams thoroughly
Akka Streams excels in scenarios requiring high-throughput, low-latency data processing with strong guarantees around resource management and system resilience. Its integration with the broader Akka ecosystem makes it an excellent choice for building reactive, distributed applications.
Comments
Be the first to comment on this lesson!