Resource Management and Scopes
Why Resource Management Matters
Every application uses resources that must be cleaned up: database connections, file handles, network sockets, locks, memory buffers. Fail to release them, and you get:
- Memory leaks: Resources pile up until your app crashes
- Connection exhaustion: Database refuses new connections
- File handle limits: OS prevents opening more files
- Deadlocks: Unreleased locks freeze your application
Traditional resource management is error-prone:
val connection = database.connect()
try {
val data = connection.query("SELECT * FROM users")
data.process()
connection.close() // What if process() throws?
} catch {
case e: Exception =>
connection.close() // What if THIS throws?
throw e
}
Problems:
- Easy to forget cleanup in error paths
- Cleanup code itself can fail
- Interruption might skip cleanup entirely
- Nested resources become complex quickly
What if cleanup was automatic and guaranteed? That's what ZIO provides.
The Bracket Pattern
The bracket pattern guarantees resource cleanup, no matter what:
import zio._
def withResource[R, E, A, B](
acquire: ZIO[R, E, A]
)(use: A => ZIO[R, E, B])(
release: A => UIO[Unit]
): ZIO[R, E, B]
Three phases:
- Acquire: Get the resource
- Use: Do something with it
- Release: Clean up (guaranteed, even on failure/interruption)
ZIO provides this as acquireRelease.
ZIO.acquireRelease
Basic Usage
import zio._
import java.io.{File, FileWriter}
def writeToFile(file: File, content: String): Task[Unit] =
ZIO.acquireRelease(
// Acquire: open the file
acquire = ZIO.attempt(new FileWriter(file))
)(
// Release: close the file (always runs)
release = writer => ZIO.succeed(writer.close())
).flatMap { writer =>
// Use: write content
ZIO.attempt(writer.write(content))
}
The release function always runs, even if:
- Writing fails
- An exception is thrown
- The fiber is interrupted
acquireReleaseWith for Convenience
def readFile(path: String): Task[String] =
ZIO.acquireReleaseWith(
acquire = ZIO.attempt(scala.io.Source.fromFile(path))
)(
release = source => ZIO.succeed(source.close())
)(
use = source => ZIO.attempt(source.mkString)
)
The With variant combines acquire, release, and use in one call—cleaner for simple cases.
Handling Release Failures
import java.sql.Connection
def useConnection[A](f: Connection => Task[A]): Task[A] =
ZIO.acquireRelease(
acquire = ZIO.attempt(getConnection())
)(
release = conn =>
ZIO.attempt(conn.close())
.catchAll(error =>
Console.printLineError(s"Failed to close connection: $error")
)
.orDie // Convert to defect if logging fails
).flatMap(f)
What if closing a connection fails? Log it, but don't fail the whole operation.
Understanding Scopes
A Scope represents a region where resources are acquired and released. Think of it as a context that tracks all resources and ensures they're cleaned up when the scope ends.
import zio._
// Resources are scoped
val scopedResource: ZIO[Scope, Throwable, Connection] =
ZIO.acquireRelease(
acquire = ZIO.attempt(database.connect())
)(
release = conn => ZIO.succeed(conn.close())
)
// Use the scope
val program: Task[String] =
ZIO.scoped {
scopedResource.flatMap { conn =>
ZIO.attempt(conn.query("SELECT version()"))
}
}
// Connection automatically closed here
Key insight: ZIO.scoped creates a scope, runs your effect, and guarantees cleanup when the scope exits.
Multiple Resources in One Scope
def processData: Task[Unit] = ZIO.scoped {
for {
conn <- acquireConnection
reader <- acquireFileReader("input.txt")
writer <- acquireFileWriter("output.txt")
data <- ZIO.attempt(reader.readAll())
rows <- ZIO.attempt(conn.query(s"SELECT * FROM data WHERE id IN ($data)"))
_ <- ZIO.attempt(writer.write(rows.toString))
} yield ()
}
// All three resources cleaned up automatically
All resources are tracked by the scope and released in reverse order (LIFO).
Practical Example: Database Connection
Let's build a safe database wrapper:
import zio._
import java.sql.{Connection, DriverManager, ResultSet, Statement}
object Database {
// Configuration
case class DbConfig(url: String, user: String, password: String)
// Acquire a connection
def connect(config: DbConfig): Task[Connection] =
ZIO.attempt {
DriverManager.getConnection(config.url, config.user, config.password)
}
// Scoped connection that auto-closes
def scopedConnection(config: DbConfig): ZIO[Scope, Throwable, Connection] =
ZIO.acquireRelease(
acquire = connect(config)
)(
release = conn => ZIO.succeed(conn.close())
)
// Execute a query safely
def query(sql: String, config: DbConfig): Task[List[String]] =
ZIO.scoped {
for {
conn <- scopedConnection(config)
stmt <- ZIO.acquireRelease(
acquire = ZIO.attempt(conn.createStatement())
)(
release = st => ZIO.succeed(st.close())
)
rs <- ZIO.acquireRelease(
acquire = ZIO.attempt(stmt.executeQuery(sql))
)(
release = r => ZIO.succeed(r.close())
)
rows <- extractRows(rs)
} yield rows
}
private def extractRows(rs: ResultSet): Task[List[String]] =
ZIO.attempt {
val buffer = scala.collection.mutable.ListBuffer[String]()
while (rs.next()) {
buffer += rs.getString(1)
}
buffer.toList
}
}
// Usage
val program = Database.query(
"SELECT name FROM users WHERE active = true",
DbConfig("jdbc:postgresql://localhost/mydb", "user", "pass")
)
Three resources (connection, statement, result set) all managed automatically!
Resource Pools
For expensive resources like database connections, create a pool:
import zio._
trait ConnectionPool {
def acquire: Task[Connection]
def release(conn: Connection): UIO[Unit]
}
object ConnectionPool {
def make(
config: DbConfig,
size: Int
): ZIO[Scope, Throwable, ConnectionPool] = {
def createConnection: Task[Connection] =
ZIO.attempt(DriverManager.getConnection(
config.url, config.user, config.password
))
for {
// Create a queue to hold available connections
queue <- Queue.bounded[Connection](size)
// Pre-populate with connections
_ <- ZIO.foreachDiscard(1 to size) { _ =>
createConnection.flatMap(queue.offer)
}
// Cleanup: close all connections on scope exit
_ <- ZIO.addFinalizer {
queue.takeAll.flatMap { connections =>
ZIO.foreachDiscard(connections) { conn =>
ZIO.attempt(conn.close()).ignore
}
}
}
} yield new ConnectionPool {
def acquire: Task[Connection] =
queue.take
def release(conn: Connection): UIO[Unit] =
queue.offer(conn).unit
}
}
// Convenience method to use a connection
def withConnection[A](
pool: ConnectionPool
)(use: Connection => Task[A]): Task[A] =
ZIO.acquireRelease(
acquire = pool.acquire
)(
release = conn => pool.release(conn)
).flatMap(use)
}
// Usage
val program = ZIO.scoped {
for {
pool <- ConnectionPool.make(dbConfig, size = 10)
result <- ConnectionPool.withConnection(pool) { conn =>
ZIO.attempt(conn.query("SELECT * FROM users"))
}
} yield result
}
The pool:
- Creates connections up front
- Reuses them instead of creating new ones
- Automatically closes all connections when scope ends
- Handles concurrent access with a queue
Resource Composition
Nested Resources
def processUserData(userId: Int): Task[Unit] = ZIO.scoped {
for {
// Outer resource
conn <- Database.scopedConnection(dbConfig)
// Inner resource depends on outer
userData <- ZIO.scoped {
for {
stmt <- acquireStatement(conn)
data <- executeQuery(stmt, s"SELECT * FROM users WHERE id = $userId")
} yield data
}
// Use the data
_ <- processData(userData)
} yield ()
}
Scopes can be nested. Inner scopes release before outer scopes.
Parallel Resource Usage
def fetchMultipleUsers(ids: List[Int]): Task[List[User]] = ZIO.scoped {
for {
pool <- ConnectionPool.make(dbConfig, size = 10)
users <- ZIO.foreachPar(ids) { id =>
ConnectionPool.withConnection(pool) { conn =>
fetchUser(conn, id)
}
}
} yield users
}
Multiple fibers safely share the connection pool.
Handling Interruption
Resources are released even when interrupted:
def longRunningQuery: Task[String] = ZIO.scoped {
for {
conn <- Database.scopedConnection(dbConfig)
result <- ZIO.attempt(conn.executeQuery("SELECT * FROM huge_table"))
.timeout(5.seconds)
.flatMap {
case Some(data) => ZIO.succeed(data.toString)
case None => ZIO.fail(new TimeoutException("Query timed out"))
}
} yield result
}
// Connection closes even if timeout occurs
The connection closes whether the query succeeds, fails, or times out.
Advanced Pattern: Resource Dependencies
Sometimes one resource depends on another:
case class AppResources(
config: Config,
database: DatabaseConnection,
cache: Cache,
logger: Logger
)
def makeResources: ZIO[Scope, Throwable, AppResources] = for {
config <- loadConfig
logger <- createLogger(config.logLevel)
database <- Database.scopedConnection(config.dbConfig)
cache <- Cache.create(database, config.cacheSize)
} yield AppResources(config, database, cache, logger)
val application = ZIO.scoped {
makeResources.flatMap { resources =>
// Use all resources
runApplication(resources)
}
}
// All resources cleaned up in reverse order
Dependencies are clear, and cleanup happens automatically in the right order.
Common Patterns and Best Practices
Always Use UIO for Release
// Good: release can't fail
ZIO.acquireRelease(
acquire = openFile(path)
)(
release = file => ZIO.succeed(file.close())
)
// Bad: release might fail
ZIO.acquireRelease(
acquire = openFile(path)
)(
release = file => ZIO.attempt(file.close()) // Don't do this
)
Why? Release failures complicate cleanup. Catch exceptions in release and convert to UIO.
Prefer Scoped Over Manual acquireRelease
// More readable
ZIO.scoped {
for {
file <- ZIO.acquireRelease(open)(close)
data <- readFrom(file)
} yield data
}
// vs manual
ZIO.acquireRelease(open)(close).flatMap { file =>
readFrom(file)
}
Resource Acquisition Should Be Fast
// Good: quick acquisition
def acquireConnection: Task[Connection] =
ZIO.attempt(pool.getConnection())
// Bad: slow acquisition
def acquireBadConnection: Task[Connection] =
for {
conn <- connectToDatabase // Might take seconds
_ <- warmUpConnection(conn) // Extra work
_ <- testConnection(conn)
} yield conn
Do expensive initialization in the use phase, not acquisition.
Complete Example: File Processing Pipeline
import zio._
import java.io._
object FileProcessor extends ZIOAppDefault {
def processFiles(inputPath: String, outputPath: String): Task[Unit] =
ZIO.scoped {
for {
reader <- acquireReader(inputPath)
writer <- acquireWriter(outputPath)
_ <- processLines(reader, writer)
_ <- Console.printLine("Processing complete")
} yield ()
}
def acquireReader(path: String): ZIO[Scope, Throwable, BufferedReader] =
ZIO.acquireRelease(
acquire = ZIO.attempt(new BufferedReader(new FileReader(path)))
)(
release = reader =>
ZIO.succeed(reader.close())
.catchAll(e => Console.printLineError(s"Error closing reader: $e"))
.orDie
)
def acquireWriter(path: String): ZIO[Scope, Throwable, BufferedWriter] =
ZIO.acquireRelease(
acquire = ZIO.attempt(new BufferedWriter(new FileWriter(path)))
)(
release = writer =>
ZIO.succeed(writer.close())
.catchAll(e => Console.printLineError(s"Error closing writer: $e"))
.orDie
)
def processLines(reader: BufferedReader, writer: BufferedWriter): Task[Unit] = {
def readLine: Task[Option[String]] =
ZIO.attempt(Option(reader.readLine()))
def writeLine(line: String): Task[Unit] =
ZIO.attempt {
writer.write(line.toUpperCase)
writer.newLine()
}
def loop: Task[Unit] =
readLine.flatMap {
case Some(line) => writeLine(line) *> loop
case None => ZIO.unit
}
loop
}
def run = processFiles("input.txt", "output.txt")
}
Both files automatically close, even if processing fails midway!
Key Takeaways
- acquireRelease guarantees cleanup even on failure or interruption
- Scopes track resources and release them automatically
- Resource pools reuse expensive resources efficiently
- Release functions should be UIO (can't fail)
- Resources release in LIFO order (reverse of acquisition)
- Scoped effects are composable and nest naturally
- Interruption-safe: cleanup happens no matter what
Common Pitfalls to Avoid
Forgetting to Use Scope
// Wrong: no scope, resource might leak
val badProgram =
ZIO.acquireRelease(openFile)(closeFile)
.flatMap(processFile)
// Correct: wrapped in scope
val goodProgram = ZIO.scoped {
ZIO.acquireRelease(openFile)(closeFile)
.flatMap(processFile)
}
Doing Work in Release
// Wrong: expensive work in release
ZIO.acquireRelease(acquire)(resource =>
ZIO.succeed {
resource.flush() // This is work, not cleanup!
resource.close()
}
)
// Correct: do work in use phase
ZIO.scoped {
ZIO.acquireRelease(acquire)(resource =>
ZIO.succeed(resource.close())
).flatMap { resource =>
useResource(resource) *>
ZIO.attempt(resource.flush()) // Flush before cleanup
}
}
Not Handling Release Exceptions
// Wrong: release can throw
ZIO.acquireRelease(acquire)(resource =>
ZIO.attempt(resource.dangerousClose()) // Might throw!
)
// Correct: catch and handle
ZIO.acquireRelease(acquire)(resource =>
ZIO.attempt(resource.dangerousClose())
.catchAll(error =>
Console.printLineError(s"Close failed: $error")
)
.orDie
)
What's Next?
You now understand how to manage resources safely with ZIO. In Lesson 6: Streaming Data with ZIO Streams, you'll learn:
- Processing infinite and large datasets efficiently
- Using ZStream for streaming operations
- Handling backpressure automatically
- Building stream pipelines
- Combining resources with streams
Ready to process data at scale? Let's continue!
Comments
Be the first to comment on this lesson!