Streaming Data with ZIO Streams

Why Streams?

Imagine you need to process a 10GB log file. Loading it entirely into memory would crash your application. Or consider processing real-time stock prices—there's no "end" to the data. How do you handle this?

// This crashes with OutOfMemoryError
val allLines = Source.fromFile("huge-log.txt").getLines().toList
val filtered = allLines.filter(_.contains("ERROR"))

Traditional collections load everything into memory. Streams process data piece by piece, handling infinite or large datasets efficiently.

What are ZIO Streams?

ZStream is ZIO's solution for processing data incrementally. It's like a ZIO effect, but instead of producing one value, it produces a stream of values over time.

ZStream[R, E, A]

Just like ZIO[R, E, A]:

  • R - Environment requirements
  • E - Error type
  • A - Element type produced by the stream

Key benefits:

  • Constant memory usage regardless of stream size
  • Backpressure handling built-in
  • Resource safety with automatic cleanup
  • Composable like all ZIO effects
  • Type-safe error handling

Creating Streams

From Collections

import zio._
import zio.stream._

// Finite stream from values
val numbers: ZStream[Any, Nothing, Int] = 
  ZStream(1, 2, 3, 4, 5)

// From a List
val fromList: ZStream[Any, Nothing, String] = 
  ZStream.fromIterable(List("a", "b", "c"))

// From a Range
val range: ZStream[Any, Nothing, Int] = 
  ZStream.fromIterable(1 to 1000000)

Infinite Streams

// Repeat a value forever
val infinite: ZStream[Any, Nothing, Int] = 
  ZStream.repeat(42)

// Generate incrementing numbers
val counter: ZStream[Any, Nothing, Long] = 
  ZStream.iterate(0L)(_ + 1)

// Tick every second
val ticker: ZStream[Any, Nothing, Long] = 
  ZStream.tick(1.second)

Can you process infinite data with a for-loop? This is where streams shine.

From Effects

// Single effect to stream
val singleEffect: ZStream[Any, Throwable, String] = 
  ZStream.fromZIO(Console.readLine)

// Repeat an effect
val readings: ZStream[Any, Throwable, String] = 
  ZStream.repeatZIO(Console.readLine)

// From a file (resource-safe)
val fileLines: ZStream[Any, Throwable, String] = 
  ZStream.fromFileName("data.txt")

From Chunks

ZIO Streams use Chunks for efficient batch processing:

val chunked: ZStream[Any, Nothing, Int] = 
  ZStream.fromChunk(Chunk(1, 2, 3, 4, 5))

// Custom chunk sizes
val batched: ZStream[Any, Nothing, Chunk[Int]] = 
  ZStream.fromIterable(1 to 100).grouped(10)

Basic Stream Operations

Mapping and Filtering

val numbers = ZStream.fromIterable(1 to 10)

// Transform each element
val doubled: ZStream[Any, Nothing, Int] = 
  numbers.map(_ * 2)

// Filter elements
val evens: ZStream[Any, Nothing, Int] = 
  numbers.filter(_ % 2 == 0)

// Map and filter combined
val evenDoubled: ZStream[Any, Nothing, Int] = 
  numbers.filter(_ % 2 == 0).map(_ * 2)

Taking and Dropping

val stream = ZStream.fromIterable(1 to 100)

// Take first 10 elements
val first10: ZStream[Any, Nothing, Int] = 
  stream.take(10)

// Drop first 10, take rest
val skip10: ZStream[Any, Nothing, Int] = 
  stream.drop(10)

// Take while condition holds
val lessThan50: ZStream[Any, Nothing, Int] = 
  stream.takeWhile(_ < 50)

// Drop while condition holds
val from50: ZStream[Any, Nothing, Int] = 
  stream.dropWhile(_ < 50)

Collecting

val stream = ZStream.fromIterable(1 to 20)

// Collect with partial function
val evens: ZStream[Any, Nothing, Int] = 
  stream.collect { case n if n % 2 == 0 => n }

// Collect and transform
val evenStrings: ZStream[Any, Nothing, String] = 
  stream.collect { 
    case n if n % 2 == 0 => s"Even: $n" 
  }

Running Streams

Streams are descriptions like effects. You need to run them:

import zio._
import zio.stream._

object StreamExample extends ZIOAppDefault {

  val program = for {
    // Run and collect to List
    list <- ZStream.fromIterable(1 to 5)
              .map(_ * 2)
              .runCollect
    _ <- Console.printLine(s"Result: $list")

    // Run and fold
    sum <- ZStream.fromIterable(1 to 100)
             .runSum
    _ <- Console.printLine(s"Sum: $sum")

    // Run with a sink
    _ <- ZStream.fromIterable(1 to 5)
           .map(_.toString)
           .run(ZSink.foreach(line => Console.printLine(line)))
  } yield ()

  def run = program
}

Stream Composition

Concatenation

val stream1 = ZStream(1, 2, 3)
val stream2 = ZStream(4, 5, 6)

// Combine sequentially
val combined: ZStream[Any, Nothing, Int] = 
  stream1 ++ stream2  // Outputs: 1, 2, 3, 4, 5, 6

Merging (Concurrent)

val stream1 = ZStream.fromIterable(1 to 5)
val stream2 = ZStream.fromIterable(6 to 10)

// Merge concurrently - order not guaranteed
val merged: ZStream[Any, Nothing, Int] = 
  stream1.merge(stream2)

// Merge with custom strategy
val mergedWith: ZStream[Any, Nothing, Int] = 
  stream1.mergeWith(stream2)((a, b) => a min b)

Zipping

val numbers = ZStream.fromIterable(1 to 5)
val letters = ZStream.fromIterable(List("a", "b", "c"))

// Zip - stops at shortest stream
val zipped: ZStream[Any, Nothing, (Int, String)] = 
  numbers.zip(letters)  // (1,"a"), (2,"b"), (3,"c")

// Zip with function
val combined: ZStream[Any, Nothing, String] = 
  numbers.zipWith(letters)((n, l) => s"$n-$l")

Interleaving

val stream1 = ZStream(1, 2, 3)
val stream2 = ZStream(10, 20, 30)

// Alternate elements
val interleaved: ZStream[Any, Nothing, Int] = 
  stream1.interleave(stream2)  // 1, 10, 2, 20, 3, 30

Sinks: Consuming Streams

A ZSink consumes a stream and produces a result:

import zio.stream._

val stream = ZStream.fromIterable(1 to 100)

// Collect to a collection
val toList: ZIO[Any, Nothing, Chunk[Int]] = 
  stream.runCollect

// Sum all elements
val sum: ZIO[Any, Nothing, Int] = 
  stream.runSum

// Count elements
val count: ZIO[Any, Nothing, Long] = 
  stream.runCount

// Take first N
val first10: ZIO[Any, Nothing, Chunk[Int]] = 
  stream.run(ZSink.take(10))

// Custom sink
val customSink: ZSink[Any, Nothing, Int, Nothing, Int] = 
  ZSink.foldLeft(0)((sum, n: Int) => sum + n)

val result: ZIO[Any, Nothing, Int] = 
  stream.run(customSink)

Common Sinks

// Foreach with effect
val printAll: ZIO[Any, Throwable, Unit] = 
  stream.run(ZSink.foreach(n => Console.printLine(n)))

// Drain - ignore all elements
val drain: ZIO[Any, Nothing, Unit] = 
  stream.runDrain

// Head - take first element
val first: ZIO[Any, Option[Nothing], Int] = 
  stream.runHead

// Last element
val last: ZIO[Any, Option[Nothing], Int] = 
  stream.runLast

Practical Example: Log File Processor

import zio._
import zio.stream._

case class LogEntry(timestamp: String, level: String, message: String)

object LogProcessor extends ZIOAppDefault {

  def parseLine(line: String): Option[LogEntry] = {
    val parts = line.split("\\|")
    if (parts.length == 3)
      Some(LogEntry(parts(0).trim, parts(1).trim, parts(2).trim))
    else
      None
  }

  val processLogs = for {
    // Read file as stream
    errorCount <- ZStream.fromFileName("app.log")
      // Parse each line
      .mapZIO(line => ZIO.succeed(parseLine(line)))
      // Keep only successful parses
      .collectSome
      // Filter ERROR level only
      .filter(_.level == "ERROR")
      // Log each error
      .tap(entry => Console.printLine(s"ERROR: ${entry.message}"))
      // Count them
      .runCount

    _ <- Console.printLine(s"Total errors: $errorCount")
  } yield ()

  def run = processLogs
}

Memory usage? Constant! It processes one line at a time.

Grouping and Windowing

Grouping by Key

case class User(id: Int, name: String, country: String)

val users = ZStream(
  User(1, "Alice", "USA"),
  User(2, "Bob", "UK"),
  User(3, "Charlie", "USA"),
  User(4, "Diana", "UK")
)

// Group by country
val grouped: ZStream[Any, Nothing, (String, Chunk[User])] = 
  users.groupByKey(_.country) { case (country, userStream) =>
    userStream.runCollect.map(users => (country, users))
  }

Sliding Windows

val numbers = ZStream.fromIterable(1 to 10)

// Sliding window of size 3
val windows: ZStream[Any, Nothing, Chunk[Int]] = 
  numbers.sliding(3)
// Output: [1,2,3], [2,3,4], [3,4,5], ...

// Moving average
val movingAvg: ZStream[Any, Nothing, Double] = 
  numbers.sliding(3).map { chunk =>
    chunk.sum.toDouble / chunk.size
  }

Grouped Chunks

val stream = ZStream.fromIterable(1 to 100)

// Process in batches of 10
val batched: ZStream[Any, Nothing, Chunk[Int]] = 
  stream.grouped(10)

batched.foreach { batch =>
  Console.printLine(s"Processing batch: $batch")
}

Backpressure and Buffering

Understanding Backpressure

What happens when producer is faster than consumer?

// Fast producer
val producer = ZStream.fromIterable(1 to 1000000)

// Slow consumer (simulated)
val slowProcess = (n: Int) => 
  ZIO.sleep(10.millis) *> Console.printLine(n)

// Without buffering - producer waits
val program1 = producer.foreach(slowProcess)

// With buffering - producer continues
val program2 = producer
  .buffer(100)  // Buffer up to 100 elements
  .foreach(slowProcess)

Buffering Strategies

val stream = ZStream.fromIterable(1 to 1000)

// Drop oldest when buffer full
val dropOldest: ZStream[Any, Nothing, Int] = 
  stream.buffer(100)

// Drop newest when buffer full  
val dropNewest: ZStream[Any, Nothing, Int] = 
  stream.buffer(100)

// Slide - keep last N elements
val sliding: ZStream[Any, Nothing, Int] = 
  stream.bufferSliding(100)

Transforming with flatMap

// Each number produces a stream
val nested: ZStream[Any, Nothing, Int] = 
  ZStream.fromIterable(1 to 3).flatMap { n =>
    ZStream.fromIterable(1 to n)
  }
// Output: 1, 1, 2, 1, 2, 3

// Practical: fetch user details
def fetchUserDetails(id: Int): ZStream[Any, Throwable, String] = 
  ZStream.fromZIO(
    ZIO.attempt(s"Details for user $id")
  )

val userIds = ZStream.fromIterable(1 to 5)
val allDetails: ZStream[Any, Throwable, String] = 
  userIds.flatMap(id => fetchUserDetails(id))

Error Handling in Streams

val stream = ZStream.fromIterable(1 to 10)

// Handle errors
val safe: ZStream[Any, Nothing, Either[String, Int]] = 
  stream
    .map(n => if (n == 5) throw new Exception("Boom!") else n)
    .catchAll(e => ZStream.succeed(-1))
    .either

// Retry on failure
val resilient: ZStream[Any, Throwable, String] = 
  ZStream.fromZIO(
    ZIO.attempt(scala.io.Source.fromFile("data.txt").mkString)
      .retry(Schedule.recurs(3))
  )

// Or else - fallback stream
val withFallback: ZStream[Any, Nothing, Int] = 
  ZStream.fail(new Exception("Failed"))
    .orElse(ZStream(1, 2, 3))

Real-World Example: Real-Time Metrics Aggregator

import zio._
import zio.stream._

case class Metric(name: String, value: Double, timestamp: Long)

object MetricsAggregator extends ZIOAppDefault {

  // Simulate incoming metrics
  def generateMetrics: ZStream[Any, Nothing, Metric] = 
    ZStream.repeatZIO(
      for {
        name  <- ZIO.succeed(scala.util.Random.shuffle(List("cpu", "memory", "disk")).head)
        value <- ZIO.succeed(scala.util.Random.nextDouble() * 100)
        time  <- Clock.currentTime(java.util.concurrent.TimeUnit.MILLISECONDS)
      } yield Metric(name, value, time)
    ).schedule(Schedule.fixed(100.millis))

  // Aggregate over 5-second windows
  val processMetrics = 
    generateMetrics
      .take(100)  // Process 100 metrics
      .groupByKey(_.name) { case (metricName, metricStream) =>
        metricStream
          .map(_.value)
          .sliding(50)  // Last 50 values
          .map { values =>
            val avg = values.sum / values.size
            (metricName, avg)
          }
      }
      .foreach { case (name, avg) =>
        Console.printLine(f"$name: $avg%.2f")
      }

  def run = processMetrics
}

This processes unlimited metrics with constant memory!

Stream Composition Patterns

Pipeline Pattern

def readFile(path: String): ZStream[Any, Throwable, String] = 
  ZStream.fromFileName(path)

def parseJSON(line: String): Task[User] = ???

def validateUser(user: User): IO[String, User] = ???

def saveToDatabase(user: User): Task[Unit] = ???

// Build pipeline
val pipeline = 
  readFile("users.json")
    .mapZIO(parseJSON)
    .mapZIO(validateUser)
    .mapZIO(saveToDatabase)
    .runDrain

Fan-Out Pattern

val source = ZStream.fromIterable(1 to 100)

// Process same data multiple ways
val pipeline1 = source.map(_ * 2).runSum
val pipeline2 = source.filter(_ % 2 == 0).runCount  
val pipeline3 = source.map(_.toString).runCollect

// Run all in parallel
val results = pipeline1.zipPar(pipeline2).zipPar(pipeline3)

Advanced: Custom Sinks

// Custom sink that collects to Set
def toSet[A]: ZSink[Any, Nothing, A, Nothing, Set[A]] = 
  ZSink.foldLeft(Set.empty[A])((set, elem) => set + elem)

val uniqueNumbers = ZStream(1, 2, 2, 3, 3, 4)
  .run(toSet)  // Result: Set(1, 2, 3, 4)

// Sink that writes to file
def writeToFile(path: String): ZSink[Any, Throwable, String, Nothing, Long] = 
  ZSink.foreach { line =>
    ZIO.attempt {
      val writer = new java.io.FileWriter(path, true)
      try writer.write(line + "\n")
      finally writer.close()
    }
  }

Performance Tips

Chunking for Efficiency

// Process in chunks for better performance
val efficient = ZStream.fromIterable(1 to 1000000)
  .rechunk(1000)  // Process 1000 at a time
  .mapChunks { chunk =>
    // Batch operations are faster
    chunk.map(_ * 2)
  }

Parallel Processing

val urls = ZStream.fromIterable(List(
  "http://api1.com",
  "http://api2.com",
  "http://api3.com"
))

// Fetch in parallel with limit
val results = urls
  .mapZIOPar(4) { url =>  // Max 4 concurrent
    fetchData(url)
  }
  .runCollect

Resource Management

// Streams with resources clean up automatically
val safeStream = ZStream.acquireReleaseWith(
  acquire = ZIO.attempt(scala.io.Source.fromFile("data.txt"))
)(
  release = source => ZIO.succeed(source.close())
)(
  use = source => ZStream.fromIterator(source.getLines())
)

Key Takeaways

  • ZStream processes data incrementally with constant memory
  • Backpressure is handled automatically
  • Composition is powerful - combine streams like LEGO blocks
  • Type safety extends to streaming operations
  • Resource management is automatic and safe
  • Sinks provide flexible ways to consume streams
  • Error handling works just like ZIO effects
  • Performance is excellent with proper chunking

Common Patterns Summary

// Read → Transform → Write
ZStream.fromFileName("input.txt")
  .map(transform)
  .run(ZSink.toFile("output.txt"))

// Aggregate over time windows
stream.grouped(100).map(chunk => chunk.sum)

// Parallel processing with limit
stream.mapZIOPar(8)(expensiveOperation)

// Merge multiple sources
stream1.merge(stream2).merge(stream3)

// Error recovery
stream.catchAll(handleError).retry(schedule)

What's Next?

You now know how to process large and infinite datasets efficiently with ZIO Streams. In Lesson 7: Testing ZIO Applications, you'll learn how to:

  • Write comprehensive tests with ZIO Test
  • Test streams and concurrent code
  • Create mock services and test layers
  • Use property-based testing
  • Build a complete test suite

Ready to ensure your code works correctly? Let's learn to test!

Additional Resources