Concurrency with Fibers
The Concurrency Challenge
Imagine you're building a weather dashboard that needs to fetch data from 10 different APIs. The traditional approach:
val api1 = fetchWeatherFromAPI1() // Takes 2 seconds
val api2 = fetchWeatherFromAPI2() // Takes 2 seconds
val api3 = fetchWeatherFromAPI3() // Takes 2 seconds
// ... 7 more APIs
// Total time: 20 seconds!
Or you could use threads and futures, dealing with:
- Thread pool exhaustion
- Complex synchronization
- Hard-to-debug race conditions
- Resource leaks
- Unpredictable performance
What if there was a better way?
What are Fibers?
Fibers are lightweight, virtual threads managed by ZIO's runtime. Think of them as extremely efficient green threads:
Traditional Threads:
- Heavy: ~1MB stack per thread
- Expensive to create and switch
- Limited by OS (thousands at most)
- Preemptively scheduled by the OS
ZIO Fibers:
- Lightweight: ~200 bytes overhead
- Cheap to create (millions possible)
- Scheduled cooperatively by ZIO
- Type-safe with automatic resource cleanup
// Create a million fibers? No problem!
val millionFibers = ZIO.foreachPar(1 to 1_000_000) { i =>
ZIO.succeed(i * 2)
}
Try that with OS threads and watch your system crash.
Why Fibers Matter
Fibers solve the concurrency trilemma:
- Performance: Run many concurrent operations efficiently
- Safety: No data races, automatic cleanup, interruption safety
- Simplicity: Composable, easy to reason about
What would you choose: fast but unsafe, or slow but safe? With fibers, you get both.
Creating Concurrent Effects
Forking: Background Execution
The fork operator starts an effect in the background:
import zio._
val longRunning = ZIO.sleep(5.seconds) *> Console.printLine("Done!")
val program = for {
fiber <- longRunning.fork // Starts in background, returns immediately
_ <- Console.printLine("Main thread continues")
_ <- fiber.join // Wait for background task
} yield ()
The fiber.join waits for completion. Without it, the program would exit before "Done!" prints.
Fiber Operations
// Fork and get a Fiber handle
val fiber: UIO[Fiber[Throwable, String]] =
fetchData().fork
// Join: wait for result
val result: Task[String] =
fiber.flatMap(_.join)
// Interrupt: cancel the fiber
val interrupted: UIO[Exit[Throwable, String]] =
fiber.flatMap(_.interrupt)
// Await: get the Exit value
val exit: UIO[Exit[Throwable, String]] =
fiber.flatMap(_.await)
Daemon Fibers
Daemon fibers don't prevent program termination:
val daemon = for {
_ <- ZIO.sleep(10.seconds).forever.forkDaemon
_ <- Console.printLine("Main program exits immediately")
} yield ()
// The sleeping fiber is killed automatically
When would you use a daemon fiber? Background cleanup, monitoring, or logging tasks that shouldn't block shutdown.
Parallel Execution
zipPar: Parallel Composition
Run two effects in parallel and combine results:
val user: Task[User] = fetchUser(userId)
val posts: Task[List[Post]] = fetchPosts(userId)
// Sequential (slow)
val sequential = for {
u <- user
p <- posts
} yield (u, p)
// Takes: time(user) + time(posts)
// Parallel (fast!)
val parallel = user.zipPar(posts)
// Takes: max(time(user), time(posts))
foreachPar: Parallel Iteration
Process collections in parallel:
val userIds = List(1, 2, 3, 4, 5)
// Sequential
val sequential = ZIO.foreach(userIds)(fetchUser)
// Processes one by one
// Parallel
val parallel = ZIO.foreachPar(userIds)(fetchUser)
// Processes all at once!
Controlling Parallelism
Too much parallelism can overwhelm resources:
// Process 1000 items, but only 10 at a time
val controlled = ZIO.foreachPar(items)(processItem)
.withParallelism(10)
// Or use collectAllPar with manual batching
val batched = items.grouped(10).map { batch =>
ZIO.collectAllPar(batch.map(processItem))
}
Racing Effects
race: First Wins
Sometimes you want the fastest result:
val primaryDB: Task[User] = fetchFromPrimary(userId)
val cacheDB: Task[User] = fetchFromCache(userId)
// Use whichever responds first
val fastest: Task[User] = primaryDB.race(cacheDB)
The losing effect is automatically interrupted. No wasted resources!
Timeouts
Prevent operations from running too long:
val risky: Task[String] = fetchFromSlowAPI()
// Fail after 5 seconds
val withTimeout: IO[Option[Throwable], String] =
risky.timeout(5.seconds)
// Or provide a fallback
val withFallback: Task[String] =
risky.timeoutFail(TimeoutError)(5.seconds)
Interruption: The Secret Sauce
Interruption is ZIO's killer feature. When a fiber is interrupted, it:
- Stops execution immediately
- Runs all finalizers (cleanup code)
- Propagates interruption to child fibers
- Never leaves resources leaked
Automatic Interruption
val program = for {
fiber <- (ZIO.sleep(10.seconds) *> Console.printLine("Done!")).fork
_ <- ZIO.sleep(2.seconds)
_ <- fiber.interrupt // Cancels the sleep
} yield ()
// "Done!" never prints
onInterrupt Hooks
Clean up when interrupted:
val safeResource =
acquireResource()
.onInterrupt(releaseResource())
Uninterruptible Regions
Some operations must complete:
val critical = for {
_ <- acquireResource()
_ <- criticalOperation().uninterruptible
_ <- releaseResource()
} yield ()
Why uninterruptible? Imagine a database transaction—you can't stop halfway through without corrupting data.
Practical Example: Concurrent Web Scraper
Let's build a real web scraper that fetches multiple URLs in parallel:
import zio._
import zio.http._
case class ScrapeResult(url: String, content: String, duration: Duration)
object WebScraper extends ZIOAppDefault {
def fetchURL(url: String): Task[ScrapeResult] = {
val startTime = System.currentTimeMillis()
for {
response <- Client.request(url)
body <- response.body.asString
endTime = System.currentTimeMillis()
duration = Duration.fromMillis(endTime - startTime)
} yield ScrapeResult(url, body.take(100), duration)
}
def scrapeWithTimeout(url: String, timeout: Duration): Task[ScrapeResult] =
fetchURL(url)
.timeout(timeout)
.flatMap {
case Some(result) => ZIO.succeed(result)
case None => ZIO.fail(new Exception(s"Timeout fetching $url"))
}
val urls = List(
"https://example.com",
"https://example.org",
"https://example.net"
)
val program = for {
_ <- Console.printLine("Starting parallel scraping...")
results <- ZIO.foreachPar(urls)(url => scrapeWithTimeout(url, 10.seconds))
_ <- Console.printLine(s"Scraped ${results.length} URLs")
_ <- ZIO.foreach(results) { result =>
Console.printLine(s"${result.url}: ${result.duration}")
}
} yield ()
def run = program
}
Notice:
- Fetches all URLs in parallel
- Each has a timeout
- Automatic interruption if one fails
- Clean, composable code
Advanced Fiber Patterns
Racing with fallback
def fetchWithFallback[A](
primary: Task[A],
secondary: Task[A]
): Task[A] =
primary.race(secondary)
Parallel validation
def validateAll[A](
items: List[A],
validate: A => Task[Unit]
): Task[Unit] =
ZIO.foreachPar(items)(validate).unit
Timeout with cancellation
def withTimeout[A](
effect: Task[A],
duration: Duration
): Task[Option[A]] =
effect.timeout(duration)
Fiber Lifecycle and Exit Values
Understanding how fibers complete:
sealed trait Exit[+E, +A]
case class Success[+A](value: A) extends Exit[Nothing, A]
case class Failure[+E](cause: Cause[E]) extends Exit[E, Nothing]
The Cause type tracks:
- Failures (expected errors)
- Defects (unexpected exceptions)
- Interruptions
- Combinations of the above
val fiber: Task[String] = ???
fiber.await.flatMap {
case Exit.Success(value) =>
Console.printLine(s"Succeeded: $value")
case Exit.Failure(cause) =>
Console.printLine(s"Failed: ${cause.prettyPrint}")
}
Performance Considerations
When to Use Parallelism
Good candidates:
- I/O operations (network, disk, database)
- Independent computations
- Multiple API calls
Poor candidates:
- CPU-bound tasks competing for cores
- Sequential dependencies
- Trivial operations (overhead > benefit)
Fiber Overhead
Fibers are cheap but not free:
// Efficient
ZIO.foreachPar(urls)(fetch)
// Wasteful
ZIO.foreachPar(1 to 1000)(n => ZIO.succeed(n + 1))
Use parallelism when the work justifies the overhead.
Common Pitfalls and Solutions
Pitfall 1: Forgetting to join
// BAD: Fiber leaks
val leak = someEffect.fork
// Effect runs but result is lost
// GOOD: Join or interrupt
val proper = for {
fiber <- someEffect.fork
result <- fiber.join
} yield result
Pitfall 2: Too much parallelism
// BAD: Overwhelms system
ZIO.foreachPar(1 to 100000)(fetchUser)
// GOOD: Control parallelism
ZIO.foreachPar(1 to 100000)(fetchUser).withParallelism(50)
Pitfall 3: Blocking operations
// BAD: Blocks fiber thread
ZIO.attempt(Thread.sleep(1000))
// GOOD: Use ZIO.sleep
ZIO.sleep(1.second)
Testing Concurrent Code
ZIO Test makes testing concurrent code straightforward:
import zio.test._
suite("Concurrent operations") {
test("parallel fetch completes faster than sequential") {
for {
start <- Clock.instant
_ <- ZIO.foreachPar(List(1,2,3))(_ => ZIO.sleep(1.second))
end <- Clock.instant
duration = Duration.between(start, end)
_ <- assertTrue(duration.toMillis < 2000)
} yield ()
}
}
Real-World Example: Parallel Data Pipeline
case class DataRecord(id: Int, data: String)
object DataPipeline extends ZIOAppDefault {
def extract(id: Int): Task[String] =
ZIO.attempt(s"Data from database for $id")
.delay(100.millis)
def transform(raw: String): Task[String] =
ZIO.attempt(raw.toUpperCase)
.delay(50.millis)
def load(data: String): Task[Unit] =
Console.printLine(s"Loaded: $data")
.delay(75.millis)
def processRecord(id: Int): Task[Unit] = for {
raw <- extract(id)
transformed <- transform(raw)
_ <- load(transformed)
} yield ()
val program = for {
_ <- Console.printLine("Starting ETL pipeline...")
start <- Clock.instant
_ <- ZIO.foreachPar(1 to 100)(processRecord)
end <- Clock.instant
duration = Duration.between(start, end)
_ <- Console.printLine(s"Processed 100 records in ${duration.toMillis}ms")
} yield ()
def run = program
}
Sequential would take ~22.5 seconds. Parallel? Under a second!
Key Takeaways
- Fibers are lightweight, virtual threads managed by ZIO
- fork starts background execution, returns a Fiber handle
- zipPar, foreachPar enable parallel composition
- race returns the first successful result
- Interruption is automatic and safe—no resource leaks
- timeout prevents operations from running too long
- Control parallelism with withParallelism
- Fibers are cheap but not free—use them for I/O and independent work
Common Questions
Q: How many fibers can I create?
Millions! Limited only by memory. Each fiber has minimal overhead.
Q: Do fibers use multiple CPU cores?
Yes. ZIO's runtime uses a thread pool that leverages all available cores. Fibers are scheduled across threads automatically.
Q: What's the difference between fork and async?
fork is for ZIO effects. For wrapping callbacks, use ZIO.async.
Q: Can fibers deadlock?
It's much harder than with threads, but possible if you create circular dependencies. ZIO's type system and structured concurrency help prevent this.
What's Next?
You've mastered concurrent programming with fibers! In Lesson 5: Resource Management and Scopes, you'll learn how to:
- Guarantee resource cleanup with
acquireRelease - Use scopes for complex resource lifecycles
- Build connection pools
- Handle nested resources safely
- Ensure cleanup even during interruption
Ready to build rock-solid resource management? Let's continue!
Additional Resources
- ZIO Concurrency Documentation
- Fiber Internals Blog Post
- ZIO Runtime and Fibers
- Structured Concurrency
Practice Exercise
Build a parallel file processor that:
- Reads all
.txtfiles from a directory - Processes each file in parallel (count words)
- Has a 5-second timeout per file
- Handles failures gracefully
- Reports results with duration
Bonus: Add a progress indicator using Ref!
Comments
Be the first to comment on this lesson!