Advanced ZIO Patterns and Optimization
By now you've mastered ZIO's fundamentals, error handling, dependencies, concurrency, resources, streams, and testing. It's time to level up with advanced patterns that solve complex real-world problems.
Why Advanced Patterns Matter
Simple applications work with basic effects. But production systems need:
- Shared mutable state without race conditions
- Coordination between concurrent operations
- Rate limiting and backpressure
- Transactions that compose
- Performance at scale
ZIO provides powerful abstractions for all of these. Let's explore them.
Refs: Safe Mutable State
The Problem with Mutable State
Traditional mutable variables cause race conditions:
var counter = 0
// Race condition! Multiple fibers accessing the same variable
val program = ZIO.foreachPar(1 to 1000) { _ =>
ZIO.succeed(counter += 1)
}
// Result is unpredictable, often less than 1000
How do you safely share state between concurrent operations?
Enter Ref
Ref[A] is ZIO's solution for safe, atomic, mutable state:
import zio._
val program = for {
counter <- Ref.make(0)
_ <- ZIO.foreachPar(1 to 1000) { _ =>
counter.update(_ + 1)
}
result <- counter.get
_ <- Console.printLine(s"Counter: $result")
} yield ()
// Always prints "Counter: 1000" - guaranteed atomicity
Atomic Operations
Ref provides several atomic operations:
for {
ref <- Ref.make(0)
// Get current value
value <- ref.get
// Set new value
_ <- ref.set(42)
// Update atomically
_ <- ref.update(_ + 1)
// Update and return new value
newValue <- ref.updateAndGet(_ * 2)
// Update and return old value
oldValue <- ref.getAndUpdate(_ - 10)
// Modify with custom logic
_ <- ref.modify { current =>
val next = current + 1
(s"Incremented to $next", next) // (result, newState)
}
} yield ()
Building a Counter Service
trait Counter {
def increment: UIO[Unit]
def decrement: UIO[Unit]
def get: UIO[Int]
def reset: UIO[Unit]
}
object Counter {
def make: UIO[Counter] =
Ref.make(0).map { ref =>
new Counter {
def increment = ref.update(_ + 1)
def decrement = ref.update(_ - 1)
def get = ref.get
def reset = ref.set(0)
}
}
}
// Usage
val program = for {
counter <- Counter.make
_ <- counter.increment.repeatN(99) // Increment 100 times
value <- counter.get
_ <- Console.printLine(s"Value: $value")
} yield ()
State Machines with Ref
sealed trait ConnectionState
case object Disconnected extends ConnectionState
case object Connecting extends ConnectionState
case object Connected extends ConnectionState
case object Error extends ConnectionState
trait ConnectionManager {
def connect: Task[Unit]
def disconnect: Task[Unit]
def getState: UIO[ConnectionState]
}
object ConnectionManager {
def make: UIO[ConnectionManager] =
Ref.make[ConnectionState](Disconnected).map { stateRef =>
new ConnectionManager {
def connect = stateRef.modify {
case Disconnected =>
(actuallyConnect, Connecting)
case other =>
(ZIO.unit, other) // Already connecting/connected
}.flatten
def disconnect = stateRef.modify {
case Connected =>
(actuallyDisconnect, Disconnected)
case other =>
(ZIO.unit, other)
}.flatten
def getState = stateRef.get
}
}
private def actuallyConnect: Task[Unit] = ???
private def actuallyDisconnect: Task[Unit] = ???
}
FiberRef: Fiber-Local State
FiberRef is like ThreadLocal for fibers—each fiber sees its own value:
for {
fiberRef <- FiberRef.make(0)
fiber1 <- (for {
_ <- fiberRef.set(100)
value <- fiberRef.get
_ <- Console.printLine(s"Fiber 1: $value")
} yield ()).fork
fiber2 <- (for {
_ <- fiberRef.set(200)
value <- fiberRef.get
_ <- Console.printLine(s"Fiber 2: $value")
} yield ()).fork
_ <- fiber1.join
_ <- fiber2.join
} yield ()
// Prints:
// Fiber 1: 100
// Fiber 2: 200
Queues: Producer-Consumer Coordination
What is a Queue?
Queue[A] is a concurrent, asynchronous queue for coordinating between producers and consumers:
import zio._
val program = for {
queue <- Queue.bounded[String](10)
// Producer
producer = ZIO.foreach(1 to 5) { i =>
queue.offer(s"Item $i") *>
Console.printLine(s"Produced: Item $i")
}
// Consumer
consumer = ZIO.foreach(1 to 5) { _ =>
queue.take.flatMap { item =>
Console.printLine(s"Consumed: $item")
}
}
// Run in parallel
_ <- producer.zipPar(consumer)
} yield ()
Queue Operations
for {
queue <- Queue.bounded[Int](3)
// Offer (blocking if full)
_ <- queue.offer(1)
// OfferAll
_ <- queue.offerAll(List(2, 3, 4))
// Take (blocking if empty)
item <- queue.take
// TakeAll
items <- queue.takeAll
// TakeUpTo
batch <- queue.takeUpTo(5)
// Peek without removing
first <- queue.peek
// Size
size <- queue.size
// Shutdown
_ <- queue.shutdown
} yield ()
Bounded vs Unbounded Queues
// Bounded: blocks when full (backpressure)
val bounded = Queue.bounded[Int](100)
// Unbounded: never blocks, can grow indefinitely
val unbounded = Queue.unbounded[Int]
// Dropping: drops oldest when full
val dropping = Queue.dropping[Int](100)
// Sliding: drops newest when full
val sliding = Queue.sliding[Int](100)
Which queue type would you use for a high-throughput logging system?
Building a Task Processor
case class Task(id: Int, data: String)
object TaskProcessor {
def make(
workers: Int,
queueSize: Int
): ZIO[Any, Nothing, Queue[Task]] = for {
queue <- Queue.bounded[Task](queueSize)
worker = queue.take.flatMap { task =>
processTask(task)
}.forever
_ <- ZIO.foreachPar(1 to workers)(_ => worker.fork)
} yield queue
private def processTask(task: Task): Task[Unit] =
Console.printLine(s"Processing task ${task.id}: ${task.data}") *>
ZIO.sleep(100.millis)
}
// Usage
val program = for {
queue <- TaskProcessor.make(workers = 4, queueSize = 10)
// Submit tasks
_ <- ZIO.foreach(1 to 20) { i =>
queue.offer(Task(i, s"Task data $i"))
}
_ <- ZIO.sleep(5.seconds)
} yield ()
Hubs: Publish-Subscribe Pattern
What is a Hub?
Hub[A] is like a queue, but messages can be consumed by multiple subscribers:
import zio._
val program = for {
hub <- Hub.bounded[String](10)
// Create subscribers
sub1 <- hub.subscribe
sub2 <- hub.subscribe
// Publisher
publisher = ZIO.foreach(1 to 5) { i =>
hub.publish(s"Message $i")
}
// Subscriber 1
subscriber1 = sub1.take.flatMap { msg =>
Console.printLine(s"Sub1 received: $msg")
}.repeatN(4)
// Subscriber 2
subscriber2 = sub2.take.flatMap { msg =>
Console.printLine(s"Sub2 received: $msg")
}.repeatN(4)
// Run all in parallel
_ <- publisher.zipPar(subscriber1).zipPar(subscriber2)
} yield ()
Event Bus with Hub
sealed trait Event
case class UserLoggedIn(userId: String) extends Event
case class OrderPlaced(orderId: String, amount: Double) extends Event
case class EmailSent(to: String) extends Event
trait EventBus {
def publish(event: Event): UIO[Unit]
def subscribe: UIO[Dequeue[Event]]
}
object EventBus {
def make(capacity: Int): UIO[EventBus] =
Hub.bounded[Event](capacity).map { hub =>
new EventBus {
def publish(event: Event) =
hub.publish(event).unit
def subscribe =
hub.subscribe
}
}
}
// Usage
val program = for {
bus <- EventBus.make(100)
// Analytics subscriber
analytics <- bus.subscribe
_ <- analytics.take.flatMap {
case UserLoggedIn(userId) =>
Console.printLine(s"Analytics: User $userId logged in")
case OrderPlaced(orderId, amount) =>
Console.printLine(s"Analytics: Order $orderId for $$${amount}")
case _ => ZIO.unit
}.forever.fork
// Email subscriber
emails <- bus.subscribe
_ <- emails.take.flatMap {
case OrderPlaced(orderId, _) =>
Console.printLine(s"Email: Sending confirmation for $orderId")
case _ => ZIO.unit
}.forever.fork
// Publish events
_ <- bus.publish(UserLoggedIn("user123"))
_ <- bus.publish(OrderPlaced("order456", 99.99))
_ <- ZIO.sleep(1.second)
} yield ()
Semaphores: Resource Limiting
What is a Semaphore?
Semaphore controls access to a limited resource through permits:
import zio._
// Only 3 concurrent operations allowed
val program = for {
semaphore <- Semaphore.make(permits = 3)
task = semaphore.withPermit {
Console.printLine("Working...") *>
ZIO.sleep(1.second) *>
Console.printLine("Done!")
}
// Start 10 tasks, but only 3 run concurrently
_ <- ZIO.foreachPar(1 to 10)(_ => task)
} yield ()
Rate Limiting API Calls
trait ApiClient {
def makeRequest(url: String): Task[String]
}
object ApiClient {
def make(maxConcurrent: Int): UIO[ApiClient] =
Semaphore.make(maxConcurrent).map { semaphore =>
new ApiClient {
def makeRequest(url: String) =
semaphore.withPermit {
Console.printLine(s"Requesting: $url") *>
ZIO.sleep(500.millis) *>
ZIO.succeed(s"Response from $url")
}
}
}
}
// Usage
val program = for {
client <- ApiClient.make(maxConcurrent = 2)
urls = List(
"https://api.example.com/users",
"https://api.example.com/orders",
"https://api.example.com/products",
"https://api.example.com/reviews",
"https://api.example.com/analytics"
)
// Only 2 requests at a time
results <- ZIO.foreachPar(urls)(client.makeRequest)
_ <- Console.printLine(s"Got ${results.size} responses")
} yield ()
Permit Management
for {
semaphore <- Semaphore.make(5)
// Acquire permits
_ <- semaphore.acquire
_ <- semaphore.acquireN(2)
// Release permits
_ <- semaphore.release
_ <- semaphore.releaseN(2)
// Check available permits
available <- semaphore.available
// Use permits safely
_ <- semaphore.withPermit {
// Automatically acquired and released
performOperation
}
// Multiple permits
_ <- semaphore.withPermits(3) {
performExpensiveOperation
}
} yield ()
STM: Software Transactional Memory
The Problem with Refs and Locks
What if you need to update multiple shared variables atomically?
// Not atomic! Race condition between reads and writes
for {
accountA <- Ref.make(1000)
accountB <- Ref.make(500)
balanceA <- accountA.get
balanceB <- accountB.get
// Another fiber could modify accounts here!
_ <- accountA.set(balanceA - 100)
_ <- accountB.set(balanceB + 100)
} yield ()
STM to the Rescue
STM provides composable transactions that are atomic, isolated, and can retry:
import zio._
import zio.stm._
case class Account(balance: TRef[Int])
def transfer(from: Account, to: Account, amount: Int): UIO[Unit] =
STM.atomically {
for {
fromBalance <- from.balance.get
_ <- STM.check(fromBalance >= amount) // Retry if insufficient
_ <- from.balance.update(_ - amount)
_ <- to.balance.update(_ + amount)
} yield ()
}
// Usage
val program = for {
accountA <- TRef.make(1000).commit
accountB <- TRef.make(500).commit
accA = Account(accountA)
accB = Account(accountB)
// Multiple concurrent transfers - all atomic!
_ <- ZIO.foreachPar(1 to 10) { _ =>
transfer(accA, accB, 50)
}
finalA <- accountA.get.commit
finalB <- accountB.get.commit
_ <- Console.printLine(s"Account A: $finalA")
_ <- Console.printLine(s"Account B: $finalB")
} yield ()
STM Operations
import zio.stm._
for {
ref <- TRef.make(0).commit
// Get and set
value <- ref.get.commit
_ <- ref.set(42).commit
// Update
_ <- ref.update(_ + 1).commit
// Modify with result
result <- ref.modify { current =>
(s"Old value: $current", current + 1)
}.commit
// Check condition (retry if false)
_ <- STM.check(value > 10).commit
// Compose transactions
transaction = for {
v1 <- ref.get
_ <- ref.set(v1 * 2)
v2 <- ref.get
} yield v2
finalValue <- transaction.commit
} yield ()
TQueue and TMap
import zio.stm._
// Transactional Queue
val queueExample = for {
queue <- TQueue.bounded[Int](10).commit
transaction = for {
_ <- queue.offer(1)
_ <- queue.offer(2)
x <- queue.take
y <- queue.take
} yield x + y
result <- transaction.commit
} yield result
// Transactional Map
val mapExample = for {
map <- TMap.empty[String, Int].commit
transaction = for {
_ <- map.put("key1", 100)
_ <- map.put("key2", 200)
value <- map.get("key1")
_ <- map.delete("key2")
} yield value
result <- transaction.commit
} yield result
Retry and OrElse
import zio.stm._
// Retry until condition is met
def waitForStock(inventory: TRef[Int], amount: Int): UIO[Unit] =
STM.atomically {
for {
stock <- inventory.get
_ <- STM.check(stock >= amount) // Retry if not enough
_ <- inventory.update(_ - amount)
} yield ()
}
// OrElse: try first, fallback to second
def transferOrLog(
from: TRef[Int],
to: TRef[Int],
amount: Int,
log: TQueue[String]
): UIO[Unit] = STM.atomically {
val transfer = for {
balance <- from.get
_ <- STM.check(balance >= amount)
_ <- from.update(_ - amount)
_ <- to.update(_ + amount)
} yield ()
val logFailure = log.offer(s"Transfer failed: insufficient funds")
transfer.orElse(logFailure)
}
Performance Optimization
Caching with cached
Avoid recomputing expensive operations:
import zio._
def expensiveComputation(id: Int): Task[String] =
ZIO.sleep(2.seconds) *>
ZIO.succeed(s"Result for $id")
val program = for {
// Cache for 1 minute
cached <- expensiveComputation(123).cached(1.minute)
// First call: takes 2 seconds
start1 <- Clock.currentTime(TimeUnit.MILLISECONDS)
_ <- cached
end1 <- Clock.currentTime(TimeUnit.MILLISECONDS)
_ <- Console.printLine(s"First call: ${end1 - start1}ms")
// Second call: instant (cached)
start2 <- Clock.currentTime(TimeUnit.MILLISECONDS)
_ <- cached
end2 <- Clock.currentTime(TimeUnit.MILLISECONDS)
_ <- Console.printLine(s"Second call: ${end2 - start2}ms")
} yield ()
Memoization
Cache results per input:
trait UserService {
def getUser(id: Int): Task[User]
}
object UserService {
def make: UIO[UserService] = for {
cache <- Ref.make(Map.empty[Int, User])
} yield new UserService {
def getUser(id: Int) =
cache.get.flatMap { map =>
map.get(id) match {
case Some(user) =>
ZIO.succeed(user) // Cache hit
case None =>
fetchFromDatabase(id).flatMap { user =>
cache.update(_ + (id -> user)).as(user) // Cache miss
}
}
}
}
private def fetchFromDatabase(id: Int): Task[User] = ???
}
Batching Operations
Combine multiple operations into one:
// Bad: N database queries
def getUsersSequential(ids: List[Int]): Task[List[User]] =
ZIO.foreach(ids)(id => fetchUser(id))
// Good: 1 database query
def getUsersBatched(ids: List[Int]): Task[List[User]] =
fetchUsers(ids) // Single batch query
// Even better: automatic batching
trait UserService {
def getUser(id: Int): Task[User]
def getUsers(ids: List[Int]): Task[List[User]]
}
object UserService {
def makeBatching(batchSize: Int, window: Duration): UIO[UserService] =
for {
queue <- Queue.bounded[(Int, Promise[Nothing, User])](100)
batcher = queue.takeBetween(1, batchSize)
.timeout(window)
.flatMap { requests =>
val ids = requests.map(_._1)
fetchUsers(ids).flatMap { users =>
ZIO.foreach(requests.zip(users)) { case ((_, promise), user) =>
promise.succeed(user)
}
}
}.forever.fork
_ <- batcher
} yield new UserService {
def getUser(id: Int) = for {
promise <- Promise.make[Nothing, User]
_ <- queue.offer((id, promise))
user <- promise.await
} yield user
def getUsers(ids: List[Int]) =
ZIO.foreach(ids)(getUser)
}
private def fetchUsers(ids: List[Int]): Task[List[User]] = ???
}
Effect Fusion
ZIO automatically fuses compatible effects for better performance:
// These get fused into a single operation
val fused = effect1
.map(_ + 1)
.map(_ * 2)
.map(_.toString)
// Compiled to: effect1.map(x => ((x + 1) * 2).toString)
Avoiding Unnecessary Allocations
// Bad: creates intermediate lists
val inefficient = ZIO.foreach(1 to 1000000) { i =>
ZIO.succeed(i * 2)
}
// Better: use foreachDiscard if you don't need results
val efficient = ZIO.foreachDiscard(1 to 1000000) { i =>
process(i)
}
// Bad: builds intermediate collections
val wasteful = list
.map(transform1)
.map(transform2)
.filter(predicate)
// Better: use view for lazy evaluation
val optimized = list.view
.map(transform1)
.map(transform2)
.filter(predicate)
.toList
Building a High-Performance Task Queue
Let's combine everything into a production-ready system:
import zio._
import zio.stm._
case class Task(id: String, priority: Int, data: String)
trait TaskQueue {
def submit(task: Task): UIO[Unit]
def stats: UIO[QueueStats]
}
case class QueueStats(
queued: Int,
processing: Int,
completed: Int,
failed: Int
)
object TaskQueue {
def make(
workers: Int,
maxConcurrent: Int,
queueSize: Int
): UIO[TaskQueue] = for {
// Priority queue using STM
queue <- TQueue.bounded[Task](queueSize).commit
// Rate limiting
semaphore <- Semaphore.make(maxConcurrent)
// Statistics
stats <- TRef.make(QueueStats(0, 0, 0, 0)).commit
// Worker fiber
worker = queue.take.commit.flatMap { task =>
semaphore.withPermit {
stats.update(s => s.copy(
queued = s.queued - 1,
processing = s.processing + 1
)).commit *>
processTask(task).foldZIO(
_ => stats.update(s => s.copy(
processing = s.processing - 1,
failed = s.failed + 1
)).commit,
_ => stats.update(s => s.copy(
processing = s.processing - 1,
completed = s.completed + 1
)).commit
)
}
}.forever
// Start workers
_ <- ZIO.foreachPar(1 to workers)(_ => worker.fork)
} yield new TaskQueue {
def submit(task: Task) =
queue.offer(task).commit *>
stats.update(s => s.copy(queued = s.queued + 1)).commit
def stats =
TaskQueue.this.stats.get.commit
}
private def processTask(task: Task): Task[Unit] =
Console.printLine(s"Processing ${task.id}") *>
ZIO.sleep(100.millis)
}
// Usage
val program = for {
queue <- TaskQueue.make(
workers = 4,
maxConcurrent = 10,
queueSize = 100
)
// Submit tasks
_ <- ZIO.foreachPar(1 to 50) { i =>
queue.submit(Task(
id = s"task-$i",
priority = i % 3,
data = s"Data $i"
))
}
// Monitor stats
_ <- (for {
s <- queue.stats
_ <- Console.printLine(
s"Queued: ${s.queued}, " +
s"Processing: ${s.processing}, " +
s"Completed: ${s.completed}, " +
s"Failed: ${s.failed}"
)
} yield ()).delay(500.millis).repeatN(10)
} yield ()
Key Takeaways
- Ref provides safe, atomic mutable state
- FiberRef offers fiber-local state like ThreadLocal
- Queue enables producer-consumer patterns with backpressure
- Hub implements publish-subscribe for multiple consumers
- Semaphore limits concurrent access to resources
- STM enables composable, atomic transactions
- Caching and batching optimize performance
- Combine these patterns to build robust, high-performance systems
Common Patterns
Rate Limiting: Use Semaphore to limit concurrent operations
Caching: Use Ref with expiration logic or .cached
Coordination: Use Queue or Hub for inter-fiber communication
Atomic Updates: Use STM when updating multiple values
State Machines: Use Ref.modify with pattern matching
What's Next?
In Lesson 9: ZIO in Production, you'll learn how to deploy ZIO applications with logging, metrics, configuration management, and monitoring. We'll take everything you've learned and make it production-ready.
Additional Resources
Ready to deploy to production? Let's continue!
Comments
Be the first to comment on this lesson!