Streaming Data with ZIO Streams
Why Streams?
Imagine you need to process a 10GB log file. Loading it entirely into memory would crash your application. Or consider processing real-time stock prices—there's no "end" to the data. How do you handle this?
// This crashes with OutOfMemoryError
val allLines = Source.fromFile("huge-log.txt").getLines().toList
val filtered = allLines.filter(_.contains("ERROR"))
Traditional collections load everything into memory. Streams process data piece by piece, handling infinite or large datasets efficiently.
What are ZIO Streams?
ZStream is ZIO's solution for processing data incrementally. It's like a ZIO effect, but instead of producing one value, it produces a stream of values over time.
ZStream[R, E, A]
Just like ZIO[R, E, A]:
- R - Environment requirements
- E - Error type
- A - Element type produced by the stream
Key benefits:
- Constant memory usage regardless of stream size
- Backpressure handling built-in
- Resource safety with automatic cleanup
- Composable like all ZIO effects
- Type-safe error handling
Creating Streams
From Collections
import zio._
import zio.stream._
// Finite stream from values
val numbers: ZStream[Any, Nothing, Int] =
ZStream(1, 2, 3, 4, 5)
// From a List
val fromList: ZStream[Any, Nothing, String] =
ZStream.fromIterable(List("a", "b", "c"))
// From a Range
val range: ZStream[Any, Nothing, Int] =
ZStream.fromIterable(1 to 1000000)
Infinite Streams
// Repeat a value forever
val infinite: ZStream[Any, Nothing, Int] =
ZStream.repeat(42)
// Generate incrementing numbers
val counter: ZStream[Any, Nothing, Long] =
ZStream.iterate(0L)(_ + 1)
// Tick every second
val ticker: ZStream[Any, Nothing, Long] =
ZStream.tick(1.second)
Can you process infinite data with a for-loop? This is where streams shine.
From Effects
// Single effect to stream
val singleEffect: ZStream[Any, Throwable, String] =
ZStream.fromZIO(Console.readLine)
// Repeat an effect
val readings: ZStream[Any, Throwable, String] =
ZStream.repeatZIO(Console.readLine)
// From a file (resource-safe)
val fileLines: ZStream[Any, Throwable, String] =
ZStream.fromFileName("data.txt")
From Chunks
ZIO Streams use Chunks for efficient batch processing:
val chunked: ZStream[Any, Nothing, Int] =
ZStream.fromChunk(Chunk(1, 2, 3, 4, 5))
// Custom chunk sizes
val batched: ZStream[Any, Nothing, Chunk[Int]] =
ZStream.fromIterable(1 to 100).grouped(10)
Basic Stream Operations
Mapping and Filtering
val numbers = ZStream.fromIterable(1 to 10)
// Transform each element
val doubled: ZStream[Any, Nothing, Int] =
numbers.map(_ * 2)
// Filter elements
val evens: ZStream[Any, Nothing, Int] =
numbers.filter(_ % 2 == 0)
// Map and filter combined
val evenDoubled: ZStream[Any, Nothing, Int] =
numbers.filter(_ % 2 == 0).map(_ * 2)
Taking and Dropping
val stream = ZStream.fromIterable(1 to 100)
// Take first 10 elements
val first10: ZStream[Any, Nothing, Int] =
stream.take(10)
// Drop first 10, take rest
val skip10: ZStream[Any, Nothing, Int] =
stream.drop(10)
// Take while condition holds
val lessThan50: ZStream[Any, Nothing, Int] =
stream.takeWhile(_ < 50)
// Drop while condition holds
val from50: ZStream[Any, Nothing, Int] =
stream.dropWhile(_ < 50)
Collecting
val stream = ZStream.fromIterable(1 to 20)
// Collect with partial function
val evens: ZStream[Any, Nothing, Int] =
stream.collect { case n if n % 2 == 0 => n }
// Collect and transform
val evenStrings: ZStream[Any, Nothing, String] =
stream.collect {
case n if n % 2 == 0 => s"Even: $n"
}
Running Streams
Streams are descriptions like effects. You need to run them:
import zio._
import zio.stream._
object StreamExample extends ZIOAppDefault {
val program = for {
// Run and collect to List
list <- ZStream.fromIterable(1 to 5)
.map(_ * 2)
.runCollect
_ <- Console.printLine(s"Result: $list")
// Run and fold
sum <- ZStream.fromIterable(1 to 100)
.runSum
_ <- Console.printLine(s"Sum: $sum")
// Run with a sink
_ <- ZStream.fromIterable(1 to 5)
.map(_.toString)
.run(ZSink.foreach(line => Console.printLine(line)))
} yield ()
def run = program
}
Stream Composition
Concatenation
val stream1 = ZStream(1, 2, 3)
val stream2 = ZStream(4, 5, 6)
// Combine sequentially
val combined: ZStream[Any, Nothing, Int] =
stream1 ++ stream2 // Outputs: 1, 2, 3, 4, 5, 6
Merging (Concurrent)
val stream1 = ZStream.fromIterable(1 to 5)
val stream2 = ZStream.fromIterable(6 to 10)
// Merge concurrently - order not guaranteed
val merged: ZStream[Any, Nothing, Int] =
stream1.merge(stream2)
// Merge with custom strategy
val mergedWith: ZStream[Any, Nothing, Int] =
stream1.mergeWith(stream2)((a, b) => a min b)
Zipping
val numbers = ZStream.fromIterable(1 to 5)
val letters = ZStream.fromIterable(List("a", "b", "c"))
// Zip - stops at shortest stream
val zipped: ZStream[Any, Nothing, (Int, String)] =
numbers.zip(letters) // (1,"a"), (2,"b"), (3,"c")
// Zip with function
val combined: ZStream[Any, Nothing, String] =
numbers.zipWith(letters)((n, l) => s"$n-$l")
Interleaving
val stream1 = ZStream(1, 2, 3)
val stream2 = ZStream(10, 20, 30)
// Alternate elements
val interleaved: ZStream[Any, Nothing, Int] =
stream1.interleave(stream2) // 1, 10, 2, 20, 3, 30
Sinks: Consuming Streams
A ZSink consumes a stream and produces a result:
import zio.stream._
val stream = ZStream.fromIterable(1 to 100)
// Collect to a collection
val toList: ZIO[Any, Nothing, Chunk[Int]] =
stream.runCollect
// Sum all elements
val sum: ZIO[Any, Nothing, Int] =
stream.runSum
// Count elements
val count: ZIO[Any, Nothing, Long] =
stream.runCount
// Take first N
val first10: ZIO[Any, Nothing, Chunk[Int]] =
stream.run(ZSink.take(10))
// Custom sink
val customSink: ZSink[Any, Nothing, Int, Nothing, Int] =
ZSink.foldLeft(0)((sum, n: Int) => sum + n)
val result: ZIO[Any, Nothing, Int] =
stream.run(customSink)
Common Sinks
// Foreach with effect
val printAll: ZIO[Any, Throwable, Unit] =
stream.run(ZSink.foreach(n => Console.printLine(n)))
// Drain - ignore all elements
val drain: ZIO[Any, Nothing, Unit] =
stream.runDrain
// Head - take first element
val first: ZIO[Any, Option[Nothing], Int] =
stream.runHead
// Last element
val last: ZIO[Any, Option[Nothing], Int] =
stream.runLast
Practical Example: Log File Processor
import zio._
import zio.stream._
case class LogEntry(timestamp: String, level: String, message: String)
object LogProcessor extends ZIOAppDefault {
def parseLine(line: String): Option[LogEntry] = {
val parts = line.split("\\|")
if (parts.length == 3)
Some(LogEntry(parts(0).trim, parts(1).trim, parts(2).trim))
else
None
}
val processLogs = for {
// Read file as stream
errorCount <- ZStream.fromFileName("app.log")
// Parse each line
.mapZIO(line => ZIO.succeed(parseLine(line)))
// Keep only successful parses
.collectSome
// Filter ERROR level only
.filter(_.level == "ERROR")
// Log each error
.tap(entry => Console.printLine(s"ERROR: ${entry.message}"))
// Count them
.runCount
_ <- Console.printLine(s"Total errors: $errorCount")
} yield ()
def run = processLogs
}
Memory usage? Constant! It processes one line at a time.
Grouping and Windowing
Grouping by Key
case class User(id: Int, name: String, country: String)
val users = ZStream(
User(1, "Alice", "USA"),
User(2, "Bob", "UK"),
User(3, "Charlie", "USA"),
User(4, "Diana", "UK")
)
// Group by country
val grouped: ZStream[Any, Nothing, (String, Chunk[User])] =
users.groupByKey(_.country) { case (country, userStream) =>
userStream.runCollect.map(users => (country, users))
}
Sliding Windows
val numbers = ZStream.fromIterable(1 to 10)
// Sliding window of size 3
val windows: ZStream[Any, Nothing, Chunk[Int]] =
numbers.sliding(3)
// Output: [1,2,3], [2,3,4], [3,4,5], ...
// Moving average
val movingAvg: ZStream[Any, Nothing, Double] =
numbers.sliding(3).map { chunk =>
chunk.sum.toDouble / chunk.size
}
Grouped Chunks
val stream = ZStream.fromIterable(1 to 100)
// Process in batches of 10
val batched: ZStream[Any, Nothing, Chunk[Int]] =
stream.grouped(10)
batched.foreach { batch =>
Console.printLine(s"Processing batch: $batch")
}
Backpressure and Buffering
Understanding Backpressure
What happens when producer is faster than consumer?
// Fast producer
val producer = ZStream.fromIterable(1 to 1000000)
// Slow consumer (simulated)
val slowProcess = (n: Int) =>
ZIO.sleep(10.millis) *> Console.printLine(n)
// Without buffering - producer waits
val program1 = producer.foreach(slowProcess)
// With buffering - producer continues
val program2 = producer
.buffer(100) // Buffer up to 100 elements
.foreach(slowProcess)
Buffering Strategies
val stream = ZStream.fromIterable(1 to 1000)
// Drop oldest when buffer full
val dropOldest: ZStream[Any, Nothing, Int] =
stream.buffer(100)
// Drop newest when buffer full
val dropNewest: ZStream[Any, Nothing, Int] =
stream.buffer(100)
// Slide - keep last N elements
val sliding: ZStream[Any, Nothing, Int] =
stream.bufferSliding(100)
Transforming with flatMap
// Each number produces a stream
val nested: ZStream[Any, Nothing, Int] =
ZStream.fromIterable(1 to 3).flatMap { n =>
ZStream.fromIterable(1 to n)
}
// Output: 1, 1, 2, 1, 2, 3
// Practical: fetch user details
def fetchUserDetails(id: Int): ZStream[Any, Throwable, String] =
ZStream.fromZIO(
ZIO.attempt(s"Details for user $id")
)
val userIds = ZStream.fromIterable(1 to 5)
val allDetails: ZStream[Any, Throwable, String] =
userIds.flatMap(id => fetchUserDetails(id))
Error Handling in Streams
val stream = ZStream.fromIterable(1 to 10)
// Handle errors
val safe: ZStream[Any, Nothing, Either[String, Int]] =
stream
.map(n => if (n == 5) throw new Exception("Boom!") else n)
.catchAll(e => ZStream.succeed(-1))
.either
// Retry on failure
val resilient: ZStream[Any, Throwable, String] =
ZStream.fromZIO(
ZIO.attempt(scala.io.Source.fromFile("data.txt").mkString)
.retry(Schedule.recurs(3))
)
// Or else - fallback stream
val withFallback: ZStream[Any, Nothing, Int] =
ZStream.fail(new Exception("Failed"))
.orElse(ZStream(1, 2, 3))
Real-World Example: Real-Time Metrics Aggregator
import zio._
import zio.stream._
case class Metric(name: String, value: Double, timestamp: Long)
object MetricsAggregator extends ZIOAppDefault {
// Simulate incoming metrics
def generateMetrics: ZStream[Any, Nothing, Metric] =
ZStream.repeatZIO(
for {
name <- ZIO.succeed(scala.util.Random.shuffle(List("cpu", "memory", "disk")).head)
value <- ZIO.succeed(scala.util.Random.nextDouble() * 100)
time <- Clock.currentTime(java.util.concurrent.TimeUnit.MILLISECONDS)
} yield Metric(name, value, time)
).schedule(Schedule.fixed(100.millis))
// Aggregate over 5-second windows
val processMetrics =
generateMetrics
.take(100) // Process 100 metrics
.groupByKey(_.name) { case (metricName, metricStream) =>
metricStream
.map(_.value)
.sliding(50) // Last 50 values
.map { values =>
val avg = values.sum / values.size
(metricName, avg)
}
}
.foreach { case (name, avg) =>
Console.printLine(f"$name: $avg%.2f")
}
def run = processMetrics
}
This processes unlimited metrics with constant memory!
Stream Composition Patterns
Pipeline Pattern
def readFile(path: String): ZStream[Any, Throwable, String] =
ZStream.fromFileName(path)
def parseJSON(line: String): Task[User] = ???
def validateUser(user: User): IO[String, User] = ???
def saveToDatabase(user: User): Task[Unit] = ???
// Build pipeline
val pipeline =
readFile("users.json")
.mapZIO(parseJSON)
.mapZIO(validateUser)
.mapZIO(saveToDatabase)
.runDrain
Fan-Out Pattern
val source = ZStream.fromIterable(1 to 100)
// Process same data multiple ways
val pipeline1 = source.map(_ * 2).runSum
val pipeline2 = source.filter(_ % 2 == 0).runCount
val pipeline3 = source.map(_.toString).runCollect
// Run all in parallel
val results = pipeline1.zipPar(pipeline2).zipPar(pipeline3)
Advanced: Custom Sinks
// Custom sink that collects to Set
def toSet[A]: ZSink[Any, Nothing, A, Nothing, Set[A]] =
ZSink.foldLeft(Set.empty[A])((set, elem) => set + elem)
val uniqueNumbers = ZStream(1, 2, 2, 3, 3, 4)
.run(toSet) // Result: Set(1, 2, 3, 4)
// Sink that writes to file
def writeToFile(path: String): ZSink[Any, Throwable, String, Nothing, Long] =
ZSink.foreach { line =>
ZIO.attempt {
val writer = new java.io.FileWriter(path, true)
try writer.write(line + "\n")
finally writer.close()
}
}
Performance Tips
Chunking for Efficiency
// Process in chunks for better performance
val efficient = ZStream.fromIterable(1 to 1000000)
.rechunk(1000) // Process 1000 at a time
.mapChunks { chunk =>
// Batch operations are faster
chunk.map(_ * 2)
}
Parallel Processing
val urls = ZStream.fromIterable(List(
"http://api1.com",
"http://api2.com",
"http://api3.com"
))
// Fetch in parallel with limit
val results = urls
.mapZIOPar(4) { url => // Max 4 concurrent
fetchData(url)
}
.runCollect
Resource Management
// Streams with resources clean up automatically
val safeStream = ZStream.acquireReleaseWith(
acquire = ZIO.attempt(scala.io.Source.fromFile("data.txt"))
)(
release = source => ZIO.succeed(source.close())
)(
use = source => ZStream.fromIterator(source.getLines())
)
Key Takeaways
- ZStream processes data incrementally with constant memory
- Backpressure is handled automatically
- Composition is powerful - combine streams like LEGO blocks
- Type safety extends to streaming operations
- Resource management is automatic and safe
- Sinks provide flexible ways to consume streams
- Error handling works just like ZIO effects
- Performance is excellent with proper chunking
Common Patterns Summary
// Read → Transform → Write
ZStream.fromFileName("input.txt")
.map(transform)
.run(ZSink.toFile("output.txt"))
// Aggregate over time windows
stream.grouped(100).map(chunk => chunk.sum)
// Parallel processing with limit
stream.mapZIOPar(8)(expensiveOperation)
// Merge multiple sources
stream1.merge(stream2).merge(stream3)
// Error recovery
stream.catchAll(handleError).retry(schedule)
What's Next?
You now know how to process large and infinite datasets efficiently with ZIO Streams. In Lesson 7: Testing ZIO Applications, you'll learn how to:
- Write comprehensive tests with ZIO Test
- Test streams and concurrent code
- Create mock services and test layers
- Use property-based testing
- Build a complete test suite
Ready to ensure your code works correctly? Let's learn to test!
Comments
Be the first to comment on this lesson!