Reactive Streams and Akka Streams: Processing Data Flows

Stream processing is essential for modern applications dealing with large volumes of data, real-time analytics, and event-driven architectures. Akka Streams provides a powerful implementation of the Reactive Streams specification, offering composable, back-pressured stream processing with built-in resilience and scalability.

Understanding Reactive Streams

Core Principles

  • Non-blocking: Streams process data without blocking threads
  • Backpressure: Automatic flow control prevents overwhelming consumers
  • Composability: Stream operations can be combined and reused
  • Resilience: Built-in error handling and recovery mechanisms
  • Resource Management: Automatic cleanup and lifecycle management

Reactive Streams API

The Reactive Streams specification defines four key interfaces:

  • Publisher: Produces data elements
  • Subscriber: Consumes data elements
  • Subscription: Represents the connection between Publisher and Subscriber
  • Processor: Acts as both Publisher and Subscriber

Setting Up Akka Streams

Dependencies

// build.sbt
val AkkaVersion = "2.7.0"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test,
  "com.typesafe.akka" %% "akka-http" % "10.4.0",
  "com.typesafe.akka" %% "akka-stream-alpakka-csv" % "4.0.0",
  "com.typesafe.akka" %% "akka-stream-alpakka-file" % "4.0.0",
  "com.typesafe.akka" %% "akka-stream-alpakka-s3" % "4.0.0",
  "com.lightbend.akka" %% "akka-stream-alpakka-slick" % "4.0.0"
)

Basic Stream Setup

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Source, Sink, Flow}
import akka.stream.ActorMaterializer
import akka.{Done, NotUsed}
import scala.concurrent.{Future, ExecutionContext}
import scala.util.{Success, Failure}

implicit val system: ActorSystem = ActorSystem("StreamSystem")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = system.dispatcher

Basic Stream Components

Sources: Data Producers

import akka.stream.scaladsl.Source
import scala.concurrent.duration._

// Simple source from a collection
val numberSource: Source[Int, NotUsed] = Source(1 to 10)

// Infinite source with tick
val tickSource: Source[String, Cancellable] = 
  Source.tick(0.seconds, 1.second, "tick")

// Source from Future
val futureSource: Source[String, NotUsed] = 
  Source.future(Future.successful("Hello from Future"))

// Source from single element
val singleSource: Source[String, NotUsed] = 
  Source.single("Single Element")

// Source from iterator
val iteratorSource: Source[Int, NotUsed] = 
  Source.fromIterator(() => Iterator.from(1))

// Unfold source - generates elements based on state
val fibonacciSource: Source[BigInt, NotUsed] = Source.unfold((BigInt(0), BigInt(1))) {
  case (a, b) => Some(((b, a + b), a))
}

// Example: Run a simple source
numberSource
  .runForeach(println)
  .onComplete {
    case Success(_) => println("Source completed successfully")
    case Failure(ex) => println(s"Source failed: ${ex.getMessage}")
  }

Flows: Data Transformers

import akka.stream.scaladsl.Flow

// Basic transformation flow
val doubleFlow: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)

// Filter flow
val evenFlow: Flow[Int, Int, NotUsed] = Flow[Int].filter(_ % 2 == 0)

// Map with async operation
val asyncFlow: Flow[String, String, NotUsed] = Flow[String]
  .mapAsync(parallelism = 4) { str =>
    Future {
      Thread.sleep(100) // Simulate async operation
      str.toUpperCase
    }
  }

// Stateful flow with scan
val runningTotalFlow: Flow[Int, Int, NotUsed] = Flow[Int].scan(0)(_ + _)

// Complex transformation with collect
val parseIntFlow: Flow[String, Int, NotUsed] = Flow[String]
  .collect {
    case s if s.forall(_.isDigit) => s.toInt
  }

// Example: Chaining flows
val processingFlow = Flow[String]
  .via(parseIntFlow)
  .via(evenFlow)
  .via(doubleFlow)
  .via(runningTotalFlow)

Source(List("1", "2", "3", "4", "5", "invalid", "6"))
  .via(processingFlow)
  .runForeach(println)

Sinks: Data Consumers

import akka.stream.scaladsl.Sink

// Print each element
val printSink: Sink[Any, Future[Done]] = Sink.foreach(println)

// Collect all elements into a sequence
val collectSink: Sink[Int, Future[Seq[Int]]] = Sink.seq

// Fold/reduce operations
val sumSink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _)

// Take only first n elements
val headSink: Sink[Int, Future[Int]] = Sink.head

// Ignore all elements
val ignoreSink: Sink[Any, Future[Done]] = Sink.ignore

// Custom sink with function
val customSink: Sink[String, Future[Done]] = Sink.foreach { str =>
  // Custom processing logic
  if (str.nonEmpty) {
    println(s"Processing: $str")
  }
}

// Conditional sink
def conditionalSink[T](condition: T => Boolean): Sink[T, Future[Done]] = 
  Sink.foreach { element =>
    if (condition(element)) {
      println(s"Condition met for: $element")
    }
  }

// Example: Multiple sinks with broadcast
import akka.stream.scaladsl.Broadcast

val broadcastSink = Sink.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val broadcast = builder.add(Broadcast[Int](2))

  broadcast.out(0) ~> Sink.foreach(x => println(s"Sink 1: $x"))
  broadcast.out(1) ~> Sink.foreach(x => println(s"Sink 2: ${x * x}"))

  SinkShape(broadcast.in)
})

Source(1 to 5).runWith(broadcastSink)

Advanced Stream Operations

Grouping and Batching

import akka.stream.scaladsl.{Source, Flow}
import scala.concurrent.duration._

// Group elements into batches
val batchFlow: Flow[Int, Seq[Int], NotUsed] = Flow[Int]
  .grouped(3)

// Time-based batching
val timeBasedBatch: Flow[String, Seq[String], NotUsed] = Flow[String]
  .groupedWithin(10, 1.second)

// Batch with weight function
val weightedBatch: Flow[String, Seq[String], NotUsed] = Flow[String]
  .batch(max = 100, seed = s => List(s)) { (acc, elem) =>
    if (acc.map(_.length).sum + elem.length <= 100) acc :+ elem
    else List(elem)
  }

// Example: Processing batches
Source.tick(0.seconds, 100.millis, "data")
  .take(20)
  .via(timeBasedBatch)
  .runForeach { batch =>
    println(s"Processed batch of ${batch.size} elements")
  }

Parallelization

// Parallel processing with mapAsyncUnordered
val parallelFlow: Flow[Int, String, NotUsed] = Flow[Int]
  .mapAsyncUnordered(parallelism = 4) { num =>
    Future {
      Thread.sleep(scala.util.Random.nextInt(1000))
      s"Processed: $num"
    }
  }

// Parallel processing with specific execution context
implicit val processingEC: ExecutionContext = 
  ExecutionContext.fromExecutor(java.util.concurrent.Executors.newFixedThreadPool(8))

val cpuIntensiveFlow: Flow[Int, Int, NotUsed] = Flow[Int]
  .mapAsync(4) { num =>
    Future {
      // Simulate CPU-intensive work
      (1 to 1000000).sum + num
    }(processingEC)
  }

// Example: Compare ordered vs unordered processing
Source(1 to 10)
  .via(parallelFlow)
  .runForeach(println)

Error Handling and Resilience

import akka.stream.Supervision.{Resume, Stop, Restart}
import akka.stream.ActorAttributes

// Flow that might fail
val flakyFlow: Flow[Int, String, NotUsed] = Flow[Int]
  .map { num =>
    if (num % 3 == 0) throw new RuntimeException(s"Error processing $num")
    s"Processed: $num"
  }

// Error handling with supervision strategy
val resilientFlow = flakyFlow
  .withAttributes(ActorAttributes.supervisionStrategy {
    case _: RuntimeException => Resume // Skip failed elements
    case _ => Stop
  })

// Recover with default values
val recoverFlow: Flow[Int, String, NotUsed] = Flow[Int]
  .map { num =>
    if (num % 3 == 0) throw new RuntimeException(s"Error processing $num")
    s"Processed: $num"
  }
  .recover {
    case ex: RuntimeException => s"Error: ${ex.getMessage}"
  }

// Retry logic
def retryFlow[In, Out](
  flow: Flow[In, Out, _], 
  maxRetries: Int
): Flow[In, Out, NotUsed] = {
  Flow[In].mapAsync(1) { input =>
    def attempt(retriesLeft: Int): Future[Out] = {
      Source.single(input)
        .via(flow)
        .runWith(Sink.head)
        .recoverWith {
          case ex if retriesLeft > 0 =>
            Thread.sleep(100) // Backoff
            attempt(retriesLeft - 1)
          case ex => Future.failed(ex)
        }
    }
    attempt(maxRetries)
  }
}

// Example: Error handling in action
Source(1 to 10)
  .via(resilientFlow)
  .runForeach(println)

Backpressure Handling

// Buffer with overflow strategy
import akka.stream.OverflowStrategy

val bufferedFlow: Flow[Int, Int, NotUsed] = Flow[Int]
  .buffer(size = 100, OverflowStrategy.backpressure)

// Dropping elements when buffer is full
val droppingFlow: Flow[Int, Int, NotUsed] = Flow[Int]
  .buffer(size = 10, OverflowStrategy.dropHead)

// Conflate - combine elements under backpressure
val conflateFlow: Flow[Int, Int, NotUsed] = Flow[Int]
  .conflateWithSeed(identity)(_ + _)

// Expand - create multiple elements from one
val expandFlow: Flow[Int, Int, NotUsed] = Flow[Int]
  .expand(Iterator.continually(_))

// Example: Demonstrating backpressure
val slowSink = Sink.foreach[Int] { num =>
  Thread.sleep(1000) // Slow consumer
  println(s"Consumed: $num")
}

Source.tick(0.seconds, 100.millis, 1)
  .scan(0)(_ + _)
  .via(bufferedFlow)
  .runWith(slowSink)

Graph DSL for Complex Stream Topologies

Fan-Out Patterns

import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Broadcast, Balance}
import akka.stream.{ClosedShape, UniformFanOutShape}

// Broadcast: Send each element to all outputs
val broadcastGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val source = Source(1 to 10)
  val broadcast = builder.add(Broadcast[Int](3))
  val sink1 = Sink.foreach[Int](x => println(s"Sink1: $x"))
  val sink2 = Sink.foreach[Int](x => println(s"Sink2: ${x * x}"))
  val sink3 = Sink.foreach[Int](x => println(s"Sink3: ${x * x * x}"))

  source ~> broadcast
  broadcast.out(0) ~> sink1
  broadcast.out(1) ~> sink2
  broadcast.out(2) ~> sink3

  ClosedShape
})

// Balance: Distribute elements across outputs
val balanceGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val source = Source(1 to 100)
  val balance = builder.add(Balance[Int](3))

  val slowSink = Flow[Int].map { x =>
    Thread.sleep(100)
    println(s"Slow: $x")
  }.to(Sink.ignore)

  val fastSink = Flow[Int].map { x =>
    println(s"Fast: $x")
  }.to(Sink.ignore)

  source ~> balance
  balance.out(0) ~> slowSink
  balance.out(1) ~> fastSink
  balance.out(2) ~> fastSink

  ClosedShape
})

broadcastGraph.run()

Fan-In Patterns

import akka.stream.scaladsl.{Merge, MergePreferred, Zip, ZipWith}

// Merge: Combine multiple sources
val mergeGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val source1 = Source.tick(0.seconds, 1.second, "A")
  val source2 = Source.tick(0.seconds, 2.seconds, "B")
  val source3 = Source.tick(0.seconds, 3.seconds, "C")

  val merge = builder.add(Merge[String](3))
  val sink = Sink.foreach(println)

  source1 ~> merge
  source2 ~> merge
  source3 ~> merge
  merge ~> sink

  ClosedShape
})

// Zip: Combine elements from sources pairwise
val zipGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val numbers = Source(1 to 10)
  val letters = Source(List("a", "b", "c", "d", "e"))

  val zip = builder.add(Zip[Int, String]())
  val sink = Sink.foreach[(Int, String)](println)

  numbers ~> zip.in0
  letters ~> zip.in1
  zip.out ~> sink

  ClosedShape
})

// ZipWith: Combine with custom function
val zipWithGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val prices = Source(List(10.0, 20.0, 15.0, 25.0))
  val quantities = Source(List(2, 1, 3, 1))

  val zipWith = builder.add(ZipWith[Double, Int, Double](_ * _))
  val sink = Sink.foreach[Double](total => println(f"Total: $$${total}%.2f"))

  prices ~> zipWith.in0
  quantities ~> zipWith.in1
  zipWith.out ~> sink

  ClosedShape
})

zipWithGraph.run()

Real-World Use Cases

File Processing Pipeline

import akka.stream.alpakka.file.scaladsl.Directory
import akka.stream.scaladsl.{FileIO, Framing}
import akka.util.ByteString
import java.nio.file.Paths

// Process CSV files in a directory
val csvProcessingGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val directory = Paths.get("data")
  val csvFiles = Directory.ls(directory)
    .filter(_.toString.endsWith(".csv"))

  val processFile = Flow[java.nio.file.Path]
    .flatMapConcat { path =>
      FileIO.fromPath(path)
        .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
        .map(_.utf8String)
        .drop(1) // Skip header
        .map(line => line.split(",").toList)
        .filter(_.length >= 3) // Validate columns
    }

  val processCsvData = Flow[List[String]]
    .collect {
      case id :: name :: price :: _ if price.matches("""\d+\.\d+""") =>
        Product(id, name, price.toDouble)
    }

  val writeResults = Sink.foreach[Product] { product =>
    println(s"Processed: ${product.name} - $${product.price}")
  }

  csvFiles ~> processFile ~> processCsvData ~> writeResults

  ClosedShape
})

case class Product(id: String, name: String, price: Double)

csvProcessingGraph.run()

Real-Time Data Processing

import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.stream.scaladsl.{Keep, Source, Sink}

// WebSocket data stream processing
class RealTimeProcessor(implicit system: ActorSystem) {

  def processWebSocketStream(url: String): Future[Done] = {
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest(url))

    val source = Source.tick(0.seconds, 1.second, TextMessage("ping"))

    val sink = Sink.foreach[Message] {
      case TextMessage.Strict(text) =>
        processMessage(text)
      case TextMessage.Streamed(textStream) =>
        textStream.runWith(Sink.foreach(chunk => processMessage(chunk)))
      case _ =>
        println("Received non-text message")
    }

    val (upgradeResponse, closed) = source
      .viaMat(webSocketFlow)(Keep.right)
      .toMat(sink)(Keep.both)
      .run()

    upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status.isSuccess()) {
        closed
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }
  }

  private def processMessage(text: String): Unit = {
    // Parse and process real-time data
    println(s"Processing: $text")
  }
}

// Time-windowed aggregation
val windowedAggregation = Flow[Double]
  .groupedWithin(100, 5.seconds)
  .map { values =>
    WindowedStats(
      count = values.size,
      sum = values.sum,
      avg = values.sum / values.size,
      min = values.min,
      max = values.max
    )
  }

case class WindowedStats(count: Int, sum: Double, avg: Double, min: Double, max: Double)

// Event sourcing stream
val eventSourcingFlow = Flow[DomainEvent]
  .groupBy(maxSubstreams = 100, _.aggregateId)
  .scan(AggregateState.empty) { (state, event) =>
    state.applyEvent(event)
  }
  .mergeSubstreams

trait DomainEvent {
  def aggregateId: String
}

case class AggregateState(id: String, version: Long, data: Map[String, Any]) {
  def applyEvent(event: DomainEvent): AggregateState = {
    // Apply event to state
    copy(version = version + 1)
  }
}

object AggregateState {
  def empty: AggregateState = AggregateState("", 0, Map.empty)
}

Database Streaming

import akka.stream.alpakka.slick.scaladsl.{Slick, SlickSession}
import slick.jdbc.PostgresProfile.api._

// Stream database results
class DatabaseStreaming(implicit slickSession: SlickSession) {

  case class User(id: Long, name: String, email: String)

  class Users(tag: Tag) extends Table[User](tag, "users") {
    def id = column[Long]("id", O.PrimaryKey)
    def name = column[String]("name")
    def email = column[String]("email")
    def * = (id, name, email) <> (User.tupled, User.unapply)
  }

  val users = TableQuery[Users]

  // Stream all users with processing
  def processAllUsers(): Future[Done] = {
    Slick
      .source(users.result)
      .via(Flow[User].map(user => user.copy(email = user.email.toLowerCase)))
      .grouped(100) // Batch updates
      .mapAsync(1) { userBatch =>
        // Batch update
        val updateActions = userBatch.map { user =>
          users.filter(_.id === user.id).update(user)
        }
        slickSession.db.run(DBIO.sequence(updateActions))
      }
      .runWith(Sink.ignore)
  }

  // Stream query results
  def streamUsersByDomain(domain: String): Source[User, NotUsed] = {
    Slick.source(users.filter(_.email like s"%@$domain").result)
  }

  // Sink for writing to database
  def userInsertSink: Sink[User, Future[Done]] = {
    Slick.sink(user => users += user)
  }
}

Testing Streams

Stream Testing Utilities

import akka.stream.testkit.scaladsl.{TestSource, TestSink}
import akka.testkit.TestKit
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}

class StreamSpec extends TestKit(ActorSystem("StreamSpec"))
  with WordSpecLike with Matchers with BeforeAndAfterAll {

  override def afterAll(): Unit = {
    TestKit.shutdownActorSystem(system)
  }

  "A simple stream" should {
    "process elements correctly" in {
      val source = TestSource.probe[Int]
      val sink = TestSink.probe[Int]

      val (pub, sub) = source
        .map(_ * 2)
        .toMat(sink)(Keep.both)
        .run()

      sub.request(3)
      pub.sendNext(1)
      pub.sendNext(2)
      pub.sendNext(3)

      sub.expectNext(2, 4, 6)

      pub.sendComplete()
      sub.expectComplete()
    }

    "handle backpressure correctly" in {
      val source = TestSource.probe[Int]
      val sink = TestSink.probe[Int]

      val (pub, sub) = source
        .buffer(2, OverflowStrategy.backpressure)
        .toMat(sink)(Keep.both)
        .run()

      // Don't request elements initially
      pub.sendNext(1)
      pub.sendNext(2)
      pub.sendNext(3) // This should buffer

      // Now request and verify
      sub.request(3)
      sub.expectNext(1, 2, 3)
    }

    "handle errors properly" in {
      val source = Source(1 to 5)
      val faultyFlow = Flow[Int].map { n =>
        if (n == 3) throw new RuntimeException("Test error")
        n
      }

      val result = source
        .via(faultyFlow)
        .recover {
          case _: RuntimeException => -1
        }
        .runWith(Sink.seq)

      whenReady(result) { seq =>
        seq should contain(-1)
      }
    }
  }

  "Custom flows" should {
    "be testable in isolation" in {
      val doubleFlow = Flow[Int].map(_ * 2)

      val result = Source(List(1, 2, 3))
        .via(doubleFlow)
        .runWith(Sink.seq)

      whenReady(result) { seq =>
        seq shouldEqual Seq(2, 4, 6)
      }
    }
  }
}

Performance Optimization

Stream Fusion and Optimization

// Fused operations for better performance
val fusedFlow = Flow[Int]
  .map(_ + 1)        // These operations
  .filter(_ % 2 == 0) // will be fused
  .map(_ * 2)        // into a single stage

// Async boundaries for parallelization
val asyncFlow = Flow[Int]
  .map(_ + 1)
  .async // Async boundary
  .map(heavyComputation)
  .async // Another async boundary
  .filter(_ > 0)

def heavyComputation(n: Int): Int = {
  Thread.sleep(10)
  n * n
}

// Batch processing for efficiency
val batchedProcessing = Flow[String]
  .grouped(100)
  .mapAsync(4) { batch =>
    Future {
      // Process batch efficiently
      batch.map(processString).filter(_.nonEmpty)
    }
  }
  .mapConcat(identity)

def processString(s: String): String = s.trim.toUpperCase

Memory Management

// Prevent memory leaks with proper resource management
val fileProcessingStream = FileIO.fromPath(Paths.get("large-file.txt"))
  .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
  .map(_.utf8String)
  .buffer(1000, OverflowStrategy.backpressure) // Limit memory usage
  .mapAsyncUnordered(4)(processLine)
  .runWith(Sink.ignore)

def processLine(line: String): Future[Unit] = {
  Future {
    // Process line
    println(line.take(50))
  }
}

// Monitoring stream performance
val monitoredFlow = Flow[Int]
  .map { elem =>
    val start = System.nanoTime()
    val result = elem * 2
    val duration = (System.nanoTime() - start) / 1000000.0
    if (duration > 1.0) println(s"Slow processing: ${duration}ms")
    result
  }

Conclusion

Akka Streams provides a powerful foundation for building reactive, resilient stream processing applications. Key benefits include:

Reactive Principles:

  • Non-blocking, asynchronous processing
  • Automatic backpressure handling
  • Built-in error recovery and supervision

Composability:

  • Reusable stream components
  • Graph DSL for complex topologies
  • Type-safe stream composition

Performance:

  • Stream fusion for optimization
  • Configurable parallelism
  • Efficient resource utilization

Real-World Applications:

  • Large-scale data processing
  • Real-time analytics
  • Event-driven architectures
  • Integration pipelines

Best Practices:

  • Design for backpressure from the start
  • Use appropriate buffer sizes
  • Implement proper error handling
  • Monitor stream performance
  • Test streams thoroughly

Akka Streams excels in scenarios requiring high-throughput, low-latency data processing with strong guarantees around resource management and system resilience. Its integration with the broader Akka ecosystem makes it an excellent choice for building reactive, distributed applications.