Concurrency with Fibers

The Concurrency Challenge

Imagine you're building a weather dashboard that needs to fetch data from 10 different APIs. The traditional approach:

val api1 = fetchWeatherFromAPI1()  // Takes 2 seconds
val api2 = fetchWeatherFromAPI2()  // Takes 2 seconds
val api3 = fetchWeatherFromAPI3()  // Takes 2 seconds
// ... 7 more APIs
// Total time: 20 seconds!

Or you could use threads and futures, dealing with:

  • Thread pool exhaustion
  • Complex synchronization
  • Hard-to-debug race conditions
  • Resource leaks
  • Unpredictable performance

What if there was a better way?

What are Fibers?

Fibers are lightweight, virtual threads managed by ZIO's runtime. Think of them as extremely efficient green threads:

Traditional Threads:

  • Heavy: ~1MB stack per thread
  • Expensive to create and switch
  • Limited by OS (thousands at most)
  • Preemptively scheduled by the OS

ZIO Fibers:

  • Lightweight: ~200 bytes overhead
  • Cheap to create (millions possible)
  • Scheduled cooperatively by ZIO
  • Type-safe with automatic resource cleanup
// Create a million fibers? No problem!
val millionFibers = ZIO.foreachPar(1 to 1_000_000) { i =>
  ZIO.succeed(i * 2)
}

Try that with OS threads and watch your system crash.

Why Fibers Matter

Fibers solve the concurrency trilemma:

  1. Performance: Run many concurrent operations efficiently
  2. Safety: No data races, automatic cleanup, interruption safety
  3. Simplicity: Composable, easy to reason about

What would you choose: fast but unsafe, or slow but safe? With fibers, you get both.

Creating Concurrent Effects

Forking: Background Execution

The fork operator starts an effect in the background:

import zio._

val longRunning = ZIO.sleep(5.seconds) *> Console.printLine("Done!")

val program = for {
  fiber <- longRunning.fork  // Starts in background, returns immediately
  _     <- Console.printLine("Main thread continues")
  _     <- fiber.join        // Wait for background task
} yield ()

The fiber.join waits for completion. Without it, the program would exit before "Done!" prints.

Fiber Operations

// Fork and get a Fiber handle
val fiber: UIO[Fiber[Throwable, String]] = 
  fetchData().fork

// Join: wait for result
val result: Task[String] = 
  fiber.flatMap(_.join)

// Interrupt: cancel the fiber
val interrupted: UIO[Exit[Throwable, String]] = 
  fiber.flatMap(_.interrupt)

// Await: get the Exit value
val exit: UIO[Exit[Throwable, String]] = 
  fiber.flatMap(_.await)

Daemon Fibers

Daemon fibers don't prevent program termination:

val daemon = for {
  _ <- ZIO.sleep(10.seconds).forever.forkDaemon
  _ <- Console.printLine("Main program exits immediately")
} yield ()
// The sleeping fiber is killed automatically

When would you use a daemon fiber? Background cleanup, monitoring, or logging tasks that shouldn't block shutdown.

Parallel Execution

zipPar: Parallel Composition

Run two effects in parallel and combine results:

val user: Task[User] = fetchUser(userId)
val posts: Task[List[Post]] = fetchPosts(userId)

// Sequential (slow)
val sequential = for {
  u <- user
  p <- posts
} yield (u, p)
// Takes: time(user) + time(posts)

// Parallel (fast!)
val parallel = user.zipPar(posts)
// Takes: max(time(user), time(posts))

foreachPar: Parallel Iteration

Process collections in parallel:

val userIds = List(1, 2, 3, 4, 5)

// Sequential
val sequential = ZIO.foreach(userIds)(fetchUser)
// Processes one by one

// Parallel
val parallel = ZIO.foreachPar(userIds)(fetchUser)
// Processes all at once!

Controlling Parallelism

Too much parallelism can overwhelm resources:

// Process 1000 items, but only 10 at a time
val controlled = ZIO.foreachPar(items)(processItem)
  .withParallelism(10)

// Or use collectAllPar with manual batching
val batched = items.grouped(10).map { batch =>
  ZIO.collectAllPar(batch.map(processItem))
}

Racing Effects

race: First Wins

Sometimes you want the fastest result:

val primaryDB: Task[User] = fetchFromPrimary(userId)
val cacheDB: Task[User] = fetchFromCache(userId)

// Use whichever responds first
val fastest: Task[User] = primaryDB.race(cacheDB)

The losing effect is automatically interrupted. No wasted resources!

Timeouts

Prevent operations from running too long:

val risky: Task[String] = fetchFromSlowAPI()

// Fail after 5 seconds
val withTimeout: IO[Option[Throwable], String] = 
  risky.timeout(5.seconds)

// Or provide a fallback
val withFallback: Task[String] = 
  risky.timeoutFail(TimeoutError)(5.seconds)

Interruption: The Secret Sauce

Interruption is ZIO's killer feature. When a fiber is interrupted, it:

  1. Stops execution immediately
  2. Runs all finalizers (cleanup code)
  3. Propagates interruption to child fibers
  4. Never leaves resources leaked

Automatic Interruption

val program = for {
  fiber <- (ZIO.sleep(10.seconds) *> Console.printLine("Done!")).fork
  _     <- ZIO.sleep(2.seconds)
  _     <- fiber.interrupt  // Cancels the sleep
} yield ()
// "Done!" never prints

onInterrupt Hooks

Clean up when interrupted:

val safeResource = 
  acquireResource()
    .onInterrupt(releaseResource())

Uninterruptible Regions

Some operations must complete:

val critical = for {
  _ <- acquireResource()
  _ <- criticalOperation().uninterruptible
  _ <- releaseResource()
} yield ()

Why uninterruptible? Imagine a database transaction—you can't stop halfway through without corrupting data.

Practical Example: Concurrent Web Scraper

Let's build a real web scraper that fetches multiple URLs in parallel:

import zio._
import zio.http._

case class ScrapeResult(url: String, content: String, duration: Duration)

object WebScraper extends ZIOAppDefault {

  def fetchURL(url: String): Task[ScrapeResult] = {
    val startTime = System.currentTimeMillis()

    for {
      response <- Client.request(url)
      body     <- response.body.asString
      endTime  = System.currentTimeMillis()
      duration = Duration.fromMillis(endTime - startTime)
    } yield ScrapeResult(url, body.take(100), duration)
  }

  def scrapeWithTimeout(url: String, timeout: Duration): Task[ScrapeResult] = 
    fetchURL(url)
      .timeout(timeout)
      .flatMap {
        case Some(result) => ZIO.succeed(result)
        case None => ZIO.fail(new Exception(s"Timeout fetching $url"))
      }

  val urls = List(
    "https://example.com",
    "https://example.org",
    "https://example.net"
  )

  val program = for {
    _       <- Console.printLine("Starting parallel scraping...")
    results <- ZIO.foreachPar(urls)(url => scrapeWithTimeout(url, 10.seconds))
    _       <- Console.printLine(s"Scraped ${results.length} URLs")
    _       <- ZIO.foreach(results) { result =>
                 Console.printLine(s"${result.url}: ${result.duration}")
               }
  } yield ()

  def run = program
}

Notice:

  • Fetches all URLs in parallel
  • Each has a timeout
  • Automatic interruption if one fails
  • Clean, composable code

Advanced Fiber Patterns

Racing with fallback

def fetchWithFallback[A](
  primary: Task[A], 
  secondary: Task[A]
): Task[A] = 
  primary.race(secondary)

Parallel validation

def validateAll[A](
  items: List[A], 
  validate: A => Task[Unit]
): Task[Unit] = 
  ZIO.foreachPar(items)(validate).unit

Timeout with cancellation

def withTimeout[A](
  effect: Task[A], 
  duration: Duration
): Task[Option[A]] = 
  effect.timeout(duration)

Fiber Lifecycle and Exit Values

Understanding how fibers complete:

sealed trait Exit[+E, +A]
case class Success[+A](value: A) extends Exit[Nothing, A]
case class Failure[+E](cause: Cause[E]) extends Exit[E, Nothing]

The Cause type tracks:

  • Failures (expected errors)
  • Defects (unexpected exceptions)
  • Interruptions
  • Combinations of the above
val fiber: Task[String] = ???

fiber.await.flatMap {
  case Exit.Success(value) => 
    Console.printLine(s"Succeeded: $value")
  case Exit.Failure(cause) => 
    Console.printLine(s"Failed: ${cause.prettyPrint}")
}

Performance Considerations

When to Use Parallelism

Good candidates:

  • I/O operations (network, disk, database)
  • Independent computations
  • Multiple API calls

Poor candidates:

  • CPU-bound tasks competing for cores
  • Sequential dependencies
  • Trivial operations (overhead > benefit)

Fiber Overhead

Fibers are cheap but not free:

// Efficient
ZIO.foreachPar(urls)(fetch)

// Wasteful
ZIO.foreachPar(1 to 1000)(n => ZIO.succeed(n + 1))

Use parallelism when the work justifies the overhead.

Common Pitfalls and Solutions

Pitfall 1: Forgetting to join

// BAD: Fiber leaks
val leak = someEffect.fork
// Effect runs but result is lost

// GOOD: Join or interrupt
val proper = for {
  fiber  <- someEffect.fork
  result <- fiber.join
} yield result

Pitfall 2: Too much parallelism

// BAD: Overwhelms system
ZIO.foreachPar(1 to 100000)(fetchUser)

// GOOD: Control parallelism
ZIO.foreachPar(1 to 100000)(fetchUser).withParallelism(50)

Pitfall 3: Blocking operations

// BAD: Blocks fiber thread
ZIO.attempt(Thread.sleep(1000))

// GOOD: Use ZIO.sleep
ZIO.sleep(1.second)

Testing Concurrent Code

ZIO Test makes testing concurrent code straightforward:

import zio.test._

suite("Concurrent operations") {
  test("parallel fetch completes faster than sequential") {
    for {
      start     <- Clock.instant
      _         <- ZIO.foreachPar(List(1,2,3))(_ => ZIO.sleep(1.second))
      end       <- Clock.instant
      duration  = Duration.between(start, end)
      _         <- assertTrue(duration.toMillis < 2000)
    } yield ()
  }
}

Real-World Example: Parallel Data Pipeline

case class DataRecord(id: Int, data: String)

object DataPipeline extends ZIOAppDefault {

  def extract(id: Int): Task[String] = 
    ZIO.attempt(s"Data from database for $id")
      .delay(100.millis)

  def transform(raw: String): Task[String] = 
    ZIO.attempt(raw.toUpperCase)
      .delay(50.millis)

  def load(data: String): Task[Unit] = 
    Console.printLine(s"Loaded: $data")
      .delay(75.millis)

  def processRecord(id: Int): Task[Unit] = for {
    raw         <- extract(id)
    transformed <- transform(raw)
    _           <- load(transformed)
  } yield ()

  val program = for {
    _     <- Console.printLine("Starting ETL pipeline...")
    start <- Clock.instant
    _     <- ZIO.foreachPar(1 to 100)(processRecord)
    end   <- Clock.instant
    duration = Duration.between(start, end)
    _     <- Console.printLine(s"Processed 100 records in ${duration.toMillis}ms")
  } yield ()

  def run = program
}

Sequential would take ~22.5 seconds. Parallel? Under a second!

Key Takeaways

  • Fibers are lightweight, virtual threads managed by ZIO
  • fork starts background execution, returns a Fiber handle
  • zipPar, foreachPar enable parallel composition
  • race returns the first successful result
  • Interruption is automatic and safe—no resource leaks
  • timeout prevents operations from running too long
  • Control parallelism with withParallelism
  • Fibers are cheap but not free—use them for I/O and independent work

Common Questions

Q: How many fibers can I create?

Millions! Limited only by memory. Each fiber has minimal overhead.

Q: Do fibers use multiple CPU cores?

Yes. ZIO's runtime uses a thread pool that leverages all available cores. Fibers are scheduled across threads automatically.

Q: What's the difference between fork and async?

fork is for ZIO effects. For wrapping callbacks, use ZIO.async.

Q: Can fibers deadlock?

It's much harder than with threads, but possible if you create circular dependencies. ZIO's type system and structured concurrency help prevent this.

What's Next?

You've mastered concurrent programming with fibers! In Lesson 5: Resource Management and Scopes, you'll learn how to:

  • Guarantee resource cleanup with acquireRelease
  • Use scopes for complex resource lifecycles
  • Build connection pools
  • Handle nested resources safely
  • Ensure cleanup even during interruption

Ready to build rock-solid resource management? Let's continue!

Additional Resources

Practice Exercise

Build a parallel file processor that:

  1. Reads all .txt files from a directory
  2. Processes each file in parallel (count words)
  3. Has a 5-second timeout per file
  4. Handles failures gracefully
  5. Reports results with duration

Bonus: Add a progress indicator using Ref!