Advanced ZIO Patterns and Optimization

By now you've mastered ZIO's fundamentals, error handling, dependencies, concurrency, resources, streams, and testing. It's time to level up with advanced patterns that solve complex real-world problems.

Why Advanced Patterns Matter

Simple applications work with basic effects. But production systems need:

  • Shared mutable state without race conditions
  • Coordination between concurrent operations
  • Rate limiting and backpressure
  • Transactions that compose
  • Performance at scale

ZIO provides powerful abstractions for all of these. Let's explore them.

Refs: Safe Mutable State

The Problem with Mutable State

Traditional mutable variables cause race conditions:

var counter = 0

// Race condition! Multiple fibers accessing the same variable
val program = ZIO.foreachPar(1 to 1000) { _ =>
  ZIO.succeed(counter += 1)
}
// Result is unpredictable, often less than 1000

How do you safely share state between concurrent operations?

Enter Ref

Ref[A] is ZIO's solution for safe, atomic, mutable state:

import zio._

val program = for {
  counter <- Ref.make(0)
  _       <- ZIO.foreachPar(1 to 1000) { _ =>
               counter.update(_ + 1)
             }
  result  <- counter.get
  _       <- Console.printLine(s"Counter: $result")
} yield ()
// Always prints "Counter: 1000" - guaranteed atomicity

Atomic Operations

Ref provides several atomic operations:

for {
  ref <- Ref.make(0)

  // Get current value
  value <- ref.get

  // Set new value
  _ <- ref.set(42)

  // Update atomically
  _ <- ref.update(_ + 1)

  // Update and return new value
  newValue <- ref.updateAndGet(_ * 2)

  // Update and return old value
  oldValue <- ref.getAndUpdate(_ - 10)

  // Modify with custom logic
  _ <- ref.modify { current =>
    val next = current + 1
    (s"Incremented to $next", next)  // (result, newState)
  }
} yield ()

Building a Counter Service

trait Counter {
  def increment: UIO[Unit]
  def decrement: UIO[Unit]
  def get: UIO[Int]
  def reset: UIO[Unit]
}

object Counter {
  def make: UIO[Counter] = 
    Ref.make(0).map { ref =>
      new Counter {
        def increment = ref.update(_ + 1)
        def decrement = ref.update(_ - 1)
        def get = ref.get
        def reset = ref.set(0)
      }
    }
}

// Usage
val program = for {
  counter <- Counter.make
  _       <- counter.increment.repeatN(99)  // Increment 100 times
  value   <- counter.get
  _       <- Console.printLine(s"Value: $value")
} yield ()

State Machines with Ref

sealed trait ConnectionState
case object Disconnected extends ConnectionState
case object Connecting extends ConnectionState
case object Connected extends ConnectionState
case object Error extends ConnectionState

trait ConnectionManager {
  def connect: Task[Unit]
  def disconnect: Task[Unit]
  def getState: UIO[ConnectionState]
}

object ConnectionManager {
  def make: UIO[ConnectionManager] =
    Ref.make[ConnectionState](Disconnected).map { stateRef =>
      new ConnectionManager {
        def connect = stateRef.modify {
          case Disconnected =>
            (actuallyConnect, Connecting)
          case other =>
            (ZIO.unit, other)  // Already connecting/connected
        }.flatten

        def disconnect = stateRef.modify {
          case Connected =>
            (actuallyDisconnect, Disconnected)
          case other =>
            (ZIO.unit, other)
        }.flatten

        def getState = stateRef.get
      }
    }

  private def actuallyConnect: Task[Unit] = ???
  private def actuallyDisconnect: Task[Unit] = ???
}

FiberRef: Fiber-Local State

FiberRef is like ThreadLocal for fibers—each fiber sees its own value:

for {
  fiberRef <- FiberRef.make(0)

  fiber1 <- (for {
    _     <- fiberRef.set(100)
    value <- fiberRef.get
    _     <- Console.printLine(s"Fiber 1: $value")
  } yield ()).fork

  fiber2 <- (for {
    _     <- fiberRef.set(200)
    value <- fiberRef.get
    _     <- Console.printLine(s"Fiber 2: $value")
  } yield ()).fork

  _ <- fiber1.join
  _ <- fiber2.join
} yield ()
// Prints:
// Fiber 1: 100
// Fiber 2: 200

Queues: Producer-Consumer Coordination

What is a Queue?

Queue[A] is a concurrent, asynchronous queue for coordinating between producers and consumers:

import zio._

val program = for {
  queue <- Queue.bounded[String](10)

  // Producer
  producer = ZIO.foreach(1 to 5) { i =>
    queue.offer(s"Item $i") *>
    Console.printLine(s"Produced: Item $i")
  }

  // Consumer
  consumer = ZIO.foreach(1 to 5) { _ =>
    queue.take.flatMap { item =>
      Console.printLine(s"Consumed: $item")
    }
  }

  // Run in parallel
  _ <- producer.zipPar(consumer)
} yield ()

Queue Operations

for {
  queue <- Queue.bounded[Int](3)

  // Offer (blocking if full)
  _ <- queue.offer(1)

  // OfferAll
  _ <- queue.offerAll(List(2, 3, 4))

  // Take (blocking if empty)
  item <- queue.take

  // TakeAll
  items <- queue.takeAll

  // TakeUpTo
  batch <- queue.takeUpTo(5)

  // Peek without removing
  first <- queue.peek

  // Size
  size <- queue.size

  // Shutdown
  _ <- queue.shutdown
} yield ()

Bounded vs Unbounded Queues

// Bounded: blocks when full (backpressure)
val bounded = Queue.bounded[Int](100)

// Unbounded: never blocks, can grow indefinitely
val unbounded = Queue.unbounded[Int]

// Dropping: drops oldest when full
val dropping = Queue.dropping[Int](100)

// Sliding: drops newest when full
val sliding = Queue.sliding[Int](100)

Which queue type would you use for a high-throughput logging system?

Building a Task Processor

case class Task(id: Int, data: String)

object TaskProcessor {
  def make(
    workers: Int,
    queueSize: Int
  ): ZIO[Any, Nothing, Queue[Task]] = for {
    queue <- Queue.bounded[Task](queueSize)

    worker = queue.take.flatMap { task =>
      processTask(task)
    }.forever

    _ <- ZIO.foreachPar(1 to workers)(_ => worker.fork)
  } yield queue

  private def processTask(task: Task): Task[Unit] = 
    Console.printLine(s"Processing task ${task.id}: ${task.data}") *>
    ZIO.sleep(100.millis)
}

// Usage
val program = for {
  queue <- TaskProcessor.make(workers = 4, queueSize = 10)

  // Submit tasks
  _ <- ZIO.foreach(1 to 20) { i =>
    queue.offer(Task(i, s"Task data $i"))
  }

  _ <- ZIO.sleep(5.seconds)
} yield ()

Hubs: Publish-Subscribe Pattern

What is a Hub?

Hub[A] is like a queue, but messages can be consumed by multiple subscribers:

import zio._

val program = for {
  hub <- Hub.bounded[String](10)

  // Create subscribers
  sub1 <- hub.subscribe
  sub2 <- hub.subscribe

  // Publisher
  publisher = ZIO.foreach(1 to 5) { i =>
    hub.publish(s"Message $i")
  }

  // Subscriber 1
  subscriber1 = sub1.take.flatMap { msg =>
    Console.printLine(s"Sub1 received: $msg")
  }.repeatN(4)

  // Subscriber 2
  subscriber2 = sub2.take.flatMap { msg =>
    Console.printLine(s"Sub2 received: $msg")
  }.repeatN(4)

  // Run all in parallel
  _ <- publisher.zipPar(subscriber1).zipPar(subscriber2)
} yield ()

Event Bus with Hub

sealed trait Event
case class UserLoggedIn(userId: String) extends Event
case class OrderPlaced(orderId: String, amount: Double) extends Event
case class EmailSent(to: String) extends Event

trait EventBus {
  def publish(event: Event): UIO[Unit]
  def subscribe: UIO[Dequeue[Event]]
}

object EventBus {
  def make(capacity: Int): UIO[EventBus] =
    Hub.bounded[Event](capacity).map { hub =>
      new EventBus {
        def publish(event: Event) = 
          hub.publish(event).unit

        def subscribe = 
          hub.subscribe
      }
    }
}

// Usage
val program = for {
  bus <- EventBus.make(100)

  // Analytics subscriber
  analytics <- bus.subscribe
  _ <- analytics.take.flatMap {
    case UserLoggedIn(userId) =>
      Console.printLine(s"Analytics: User $userId logged in")
    case OrderPlaced(orderId, amount) =>
      Console.printLine(s"Analytics: Order $orderId for $$${amount}")
    case _ => ZIO.unit
  }.forever.fork

  // Email subscriber
  emails <- bus.subscribe
  _ <- emails.take.flatMap {
    case OrderPlaced(orderId, _) =>
      Console.printLine(s"Email: Sending confirmation for $orderId")
    case _ => ZIO.unit
  }.forever.fork

  // Publish events
  _ <- bus.publish(UserLoggedIn("user123"))
  _ <- bus.publish(OrderPlaced("order456", 99.99))

  _ <- ZIO.sleep(1.second)
} yield ()

Semaphores: Resource Limiting

What is a Semaphore?

Semaphore controls access to a limited resource through permits:

import zio._

// Only 3 concurrent operations allowed
val program = for {
  semaphore <- Semaphore.make(permits = 3)

  task = semaphore.withPermit {
    Console.printLine("Working...") *>
    ZIO.sleep(1.second) *>
    Console.printLine("Done!")
  }

  // Start 10 tasks, but only 3 run concurrently
  _ <- ZIO.foreachPar(1 to 10)(_ => task)
} yield ()

Rate Limiting API Calls

trait ApiClient {
  def makeRequest(url: String): Task[String]
}

object ApiClient {
  def make(maxConcurrent: Int): UIO[ApiClient] =
    Semaphore.make(maxConcurrent).map { semaphore =>
      new ApiClient {
        def makeRequest(url: String) = 
          semaphore.withPermit {
            Console.printLine(s"Requesting: $url") *>
            ZIO.sleep(500.millis) *>
            ZIO.succeed(s"Response from $url")
          }
      }
    }
}

// Usage
val program = for {
  client <- ApiClient.make(maxConcurrent = 2)

  urls = List(
    "https://api.example.com/users",
    "https://api.example.com/orders",
    "https://api.example.com/products",
    "https://api.example.com/reviews",
    "https://api.example.com/analytics"
  )

  // Only 2 requests at a time
  results <- ZIO.foreachPar(urls)(client.makeRequest)
  _       <- Console.printLine(s"Got ${results.size} responses")
} yield ()

Permit Management

for {
  semaphore <- Semaphore.make(5)

  // Acquire permits
  _ <- semaphore.acquire
  _ <- semaphore.acquireN(2)

  // Release permits
  _ <- semaphore.release
  _ <- semaphore.releaseN(2)

  // Check available permits
  available <- semaphore.available

  // Use permits safely
  _ <- semaphore.withPermit {
    // Automatically acquired and released
    performOperation
  }

  // Multiple permits
  _ <- semaphore.withPermits(3) {
    performExpensiveOperation
  }
} yield ()

STM: Software Transactional Memory

The Problem with Refs and Locks

What if you need to update multiple shared variables atomically?

// Not atomic! Race condition between reads and writes
for {
  accountA <- Ref.make(1000)
  accountB <- Ref.make(500)

  balanceA <- accountA.get
  balanceB <- accountB.get

  // Another fiber could modify accounts here!

  _ <- accountA.set(balanceA - 100)
  _ <- accountB.set(balanceB + 100)
} yield ()

STM to the Rescue

STM provides composable transactions that are atomic, isolated, and can retry:

import zio._
import zio.stm._

case class Account(balance: TRef[Int])

def transfer(from: Account, to: Account, amount: Int): UIO[Unit] =
  STM.atomically {
    for {
      fromBalance <- from.balance.get
      _           <- STM.check(fromBalance >= amount)  // Retry if insufficient
      _           <- from.balance.update(_ - amount)
      _           <- to.balance.update(_ + amount)
    } yield ()
  }

// Usage
val program = for {
  accountA <- TRef.make(1000).commit
  accountB <- TRef.make(500).commit

  accA = Account(accountA)
  accB = Account(accountB)

  // Multiple concurrent transfers - all atomic!
  _ <- ZIO.foreachPar(1 to 10) { _ =>
    transfer(accA, accB, 50)
  }

  finalA <- accountA.get.commit
  finalB <- accountB.get.commit

  _ <- Console.printLine(s"Account A: $finalA")
  _ <- Console.printLine(s"Account B: $finalB")
} yield ()

STM Operations

import zio.stm._

for {
  ref <- TRef.make(0).commit

  // Get and set
  value <- ref.get.commit
  _     <- ref.set(42).commit

  // Update
  _ <- ref.update(_ + 1).commit

  // Modify with result
  result <- ref.modify { current =>
    (s"Old value: $current", current + 1)
  }.commit

  // Check condition (retry if false)
  _ <- STM.check(value > 10).commit

  // Compose transactions
  transaction = for {
    v1 <- ref.get
    _  <- ref.set(v1 * 2)
    v2 <- ref.get
  } yield v2

  finalValue <- transaction.commit
} yield ()

TQueue and TMap

import zio.stm._

// Transactional Queue
val queueExample = for {
  queue <- TQueue.bounded[Int](10).commit

  transaction = for {
    _ <- queue.offer(1)
    _ <- queue.offer(2)
    x <- queue.take
    y <- queue.take
  } yield x + y

  result <- transaction.commit
} yield result

// Transactional Map
val mapExample = for {
  map <- TMap.empty[String, Int].commit

  transaction = for {
    _     <- map.put("key1", 100)
    _     <- map.put("key2", 200)
    value <- map.get("key1")
    _     <- map.delete("key2")
  } yield value

  result <- transaction.commit
} yield result

Retry and OrElse

import zio.stm._

// Retry until condition is met
def waitForStock(inventory: TRef[Int], amount: Int): UIO[Unit] =
  STM.atomically {
    for {
      stock <- inventory.get
      _     <- STM.check(stock >= amount)  // Retry if not enough
      _     <- inventory.update(_ - amount)
    } yield ()
  }

// OrElse: try first, fallback to second
def transferOrLog(
  from: TRef[Int], 
  to: TRef[Int], 
  amount: Int,
  log: TQueue[String]
): UIO[Unit] = STM.atomically {
  val transfer = for {
    balance <- from.get
    _       <- STM.check(balance >= amount)
    _       <- from.update(_ - amount)
    _       <- to.update(_ + amount)
  } yield ()

  val logFailure = log.offer(s"Transfer failed: insufficient funds")

  transfer.orElse(logFailure)
}

Performance Optimization

Caching with cached

Avoid recomputing expensive operations:

import zio._

def expensiveComputation(id: Int): Task[String] =
  ZIO.sleep(2.seconds) *>
  ZIO.succeed(s"Result for $id")

val program = for {
  // Cache for 1 minute
  cached <- expensiveComputation(123).cached(1.minute)

  // First call: takes 2 seconds
  start1 <- Clock.currentTime(TimeUnit.MILLISECONDS)
  _      <- cached
  end1   <- Clock.currentTime(TimeUnit.MILLISECONDS)
  _      <- Console.printLine(s"First call: ${end1 - start1}ms")

  // Second call: instant (cached)
  start2 <- Clock.currentTime(TimeUnit.MILLISECONDS)
  _      <- cached
  end2   <- Clock.currentTime(TimeUnit.MILLISECONDS)
  _      <- Console.printLine(s"Second call: ${end2 - start2}ms")
} yield ()

Memoization

Cache results per input:

trait UserService {
  def getUser(id: Int): Task[User]
}

object UserService {
  def make: UIO[UserService] = for {
    cache <- Ref.make(Map.empty[Int, User])
  } yield new UserService {
    def getUser(id: Int) = 
      cache.get.flatMap { map =>
        map.get(id) match {
          case Some(user) => 
            ZIO.succeed(user)  // Cache hit
          case None =>
            fetchFromDatabase(id).flatMap { user =>
              cache.update(_ + (id -> user)).as(user)  // Cache miss
            }
        }
      }
  }

  private def fetchFromDatabase(id: Int): Task[User] = ???
}

Batching Operations

Combine multiple operations into one:

// Bad: N database queries
def getUsersSequential(ids: List[Int]): Task[List[User]] =
  ZIO.foreach(ids)(id => fetchUser(id))

// Good: 1 database query
def getUsersBatched(ids: List[Int]): Task[List[User]] =
  fetchUsers(ids)  // Single batch query

// Even better: automatic batching
trait UserService {
  def getUser(id: Int): Task[User]
  def getUsers(ids: List[Int]): Task[List[User]]
}

object UserService {
  def makeBatching(batchSize: Int, window: Duration): UIO[UserService] = 
    for {
      queue <- Queue.bounded[(Int, Promise[Nothing, User])](100)

      batcher = queue.takeBetween(1, batchSize)
        .timeout(window)
        .flatMap { requests =>
          val ids = requests.map(_._1)
          fetchUsers(ids).flatMap { users =>
            ZIO.foreach(requests.zip(users)) { case ((_, promise), user) =>
              promise.succeed(user)
            }
          }
        }.forever.fork

      _ <- batcher
    } yield new UserService {
      def getUser(id: Int) = for {
        promise <- Promise.make[Nothing, User]
        _       <- queue.offer((id, promise))
        user    <- promise.await
      } yield user

      def getUsers(ids: List[Int]) = 
        ZIO.foreach(ids)(getUser)
    }

  private def fetchUsers(ids: List[Int]): Task[List[User]] = ???
}

Effect Fusion

ZIO automatically fuses compatible effects for better performance:

// These get fused into a single operation
val fused = effect1
  .map(_ + 1)
  .map(_ * 2)
  .map(_.toString)
// Compiled to: effect1.map(x => ((x + 1) * 2).toString)

Avoiding Unnecessary Allocations

// Bad: creates intermediate lists
val inefficient = ZIO.foreach(1 to 1000000) { i =>
  ZIO.succeed(i * 2)
}

// Better: use foreachDiscard if you don't need results
val efficient = ZIO.foreachDiscard(1 to 1000000) { i =>
  process(i)
}

// Bad: builds intermediate collections
val wasteful = list
  .map(transform1)
  .map(transform2)
  .filter(predicate)

// Better: use view for lazy evaluation
val optimized = list.view
  .map(transform1)
  .map(transform2)
  .filter(predicate)
  .toList

Building a High-Performance Task Queue

Let's combine everything into a production-ready system:

import zio._
import zio.stm._

case class Task(id: String, priority: Int, data: String)

trait TaskQueue {
  def submit(task: Task): UIO[Unit]
  def stats: UIO[QueueStats]
}

case class QueueStats(
  queued: Int,
  processing: Int,
  completed: Int,
  failed: Int
)

object TaskQueue {
  def make(
    workers: Int,
    maxConcurrent: Int,
    queueSize: Int
  ): UIO[TaskQueue] = for {
    // Priority queue using STM
    queue <- TQueue.bounded[Task](queueSize).commit

    // Rate limiting
    semaphore <- Semaphore.make(maxConcurrent)

    // Statistics
    stats <- TRef.make(QueueStats(0, 0, 0, 0)).commit

    // Worker fiber
    worker = queue.take.commit.flatMap { task =>
      semaphore.withPermit {
        stats.update(s => s.copy(
          queued = s.queued - 1,
          processing = s.processing + 1
        )).commit *>
        processTask(task).foldZIO(
          _ => stats.update(s => s.copy(
            processing = s.processing - 1,
            failed = s.failed + 1
          )).commit,
          _ => stats.update(s => s.copy(
            processing = s.processing - 1,
            completed = s.completed + 1
          )).commit
        )
      }
    }.forever

    // Start workers
    _ <- ZIO.foreachPar(1 to workers)(_ => worker.fork)

  } yield new TaskQueue {
    def submit(task: Task) = 
      queue.offer(task).commit *>
      stats.update(s => s.copy(queued = s.queued + 1)).commit

    def stats = 
      TaskQueue.this.stats.get.commit
  }

  private def processTask(task: Task): Task[Unit] =
    Console.printLine(s"Processing ${task.id}") *>
    ZIO.sleep(100.millis)
}

// Usage
val program = for {
  queue <- TaskQueue.make(
    workers = 4,
    maxConcurrent = 10,
    queueSize = 100
  )

  // Submit tasks
  _ <- ZIO.foreachPar(1 to 50) { i =>
    queue.submit(Task(
      id = s"task-$i",
      priority = i % 3,
      data = s"Data $i"
    ))
  }

  // Monitor stats
  _ <- (for {
    s <- queue.stats
    _ <- Console.printLine(
      s"Queued: ${s.queued}, " +
      s"Processing: ${s.processing}, " +
      s"Completed: ${s.completed}, " +
      s"Failed: ${s.failed}"
    )
  } yield ()).delay(500.millis).repeatN(10)
} yield ()

Key Takeaways

  • Ref provides safe, atomic mutable state
  • FiberRef offers fiber-local state like ThreadLocal
  • Queue enables producer-consumer patterns with backpressure
  • Hub implements publish-subscribe for multiple consumers
  • Semaphore limits concurrent access to resources
  • STM enables composable, atomic transactions
  • Caching and batching optimize performance
  • Combine these patterns to build robust, high-performance systems

Common Patterns

Rate Limiting: Use Semaphore to limit concurrent operations Caching: Use Ref with expiration logic or .cached Coordination: Use Queue or Hub for inter-fiber communication Atomic Updates: Use STM when updating multiple values State Machines: Use Ref.modify with pattern matching

What's Next?

In Lesson 9: ZIO in Production, you'll learn how to deploy ZIO applications with logging, metrics, configuration management, and monitoring. We'll take everything you've learned and make it production-ready.

Additional Resources

Ready to deploy to production? Let's continue!