Concurrency and Futures: Asynchronous Programming
Introduction
Concurrency is essential for building responsive, scalable applications. Scala provides powerful abstractions for asynchronous programming, with Futures being the primary tool for handling concurrent operations. Futures represent values that may not be available yet but will be computed asynchronously.
This lesson will teach you to work with Futures effectively, understand execution contexts, handle asynchronous operations, and build concurrent applications that can handle multiple tasks simultaneously.
Understanding Futures
Basic Future Operations
import scala.concurrent.{Future, ExecutionContext}
import scala.util.{Success, Failure, Try}
import scala.concurrent.duration._
// Default execution context for examples
implicit val ec: ExecutionContext = ExecutionContext.global
// Creating futures
val immediateFuture = Future.successful(42)
val failedFuture = Future.failed(new RuntimeException("Something went wrong"))
// Future from computation
val computationFuture = Future {
Thread.sleep(1000) // Simulate long-running computation
42 * 2
}
println("Future created - computation running in background")
// Blocking wait (avoid in production code)
try {
val result = scala.concurrent.Await.result(computationFuture, 2.seconds)
println(s"Computation result: $result")
} catch {
case _: java.util.concurrent.TimeoutException =>
println("Computation timed out")
}
// Non-blocking callbacks
val asyncFuture = Future {
Thread.sleep(500)
"Hello from the future!"
}
asyncFuture.onComplete {
case Success(value) => println(s"Success: $value")
case Failure(exception) => println(s"Failed: ${exception.getMessage}")
}
// Transform futures with map
val numberFuture = Future(10)
val doubledFuture = numberFuture.map(_ * 2)
val stringFuture = doubledFuture.map(n => s"The result is $n")
stringFuture.foreach(println)
// Chain futures with flatMap
def fetchUserId(username: String): Future[Int] = Future {
Thread.sleep(100)
if (username.nonEmpty) username.hashCode.abs else throw new IllegalArgumentException("Empty username")
}
def fetchUserProfile(userId: Int): Future[String] = Future {
Thread.sleep(150)
s"Profile for user $userId"
}
val userProfile = for {
userId <- fetchUserId("alice")
profile <- fetchUserProfile(userId)
} yield profile
userProfile.onComplete {
case Success(profile) => println(s"Got profile: $profile")
case Failure(exception) => println(s"Failed to get profile: ${exception.getMessage}")
}
// Error handling with recover
val riskyFuture = Future {
if (math.random() > 0.5) "Success!"
else throw new RuntimeException("Random failure")
}
val recoveredFuture = riskyFuture.recover {
case _: RuntimeException => "Recovered from failure"
}
recoveredFuture.foreach(println)
// Transform failures with recoverWith
val alternativeFuture = riskyFuture.recoverWith {
case _: RuntimeException => Future {
Thread.sleep(100)
"Alternative computation result"
}
}
alternativeFuture.foreach(println)
// Filtering futures
val evenNumberFuture = Future(scala.util.Random.nextInt(10))
val filteredFuture = evenNumberFuture.filter(_ % 2 == 0)
filteredFuture.onComplete {
case Success(value) => println(s"Even number: $value")
case Failure(_) => println("Number was odd")
}
// Transform with andThen (side effects)
val loggedFuture = Future {
val result = 42 * 3
println(s"Computed result: $result")
result
}.andThen {
case Success(value) => println(s"Logging success: $value")
case Failure(exception) => println(s"Logging failure: ${exception.getMessage}")
}
Thread.sleep(2000) // Give futures time to complete
Working with Multiple Futures
// Combining futures with Future.sequence
def fetchData(id: Int): Future[String] = Future {
Thread.sleep(scala.util.Random.nextInt(500))
s"Data for ID $id"
}
val dataFutures = (1 to 5).map(fetchData).toList
val allDataFuture = Future.sequence(dataFutures)
allDataFuture.onComplete {
case Success(allData) =>
println("All data fetched:")
allData.foreach(println)
case Failure(exception) =>
println(s"Failed to fetch all data: ${exception.getMessage}")
}
// Traverse - map and sequence in one operation
val ids = List(1, 2, 3, 4, 5)
val traversedFuture = Future.traverse(ids)(fetchData)
traversedFuture.foreach { results =>
println("Traversed results:")
results.foreach(println)
}
// First completed future with Future.firstCompletedOf
val server1 = Future {
Thread.sleep(300)
"Response from server 1"
}
val server2 = Future {
Thread.sleep(200)
"Response from server 2"
}
val server3 = Future {
Thread.sleep(400)
"Response from server 3"
}
val fastestResponse = Future.firstCompletedOf(List(server1, server2, server3))
fastestResponse.foreach { response =>
println(s"Fastest response: $response")
}
// Combining futures with zip
val future1 = Future {
Thread.sleep(100)
10
}
val future2 = Future {
Thread.sleep(150)
20
}
val combinedFuture = future1.zip(future2)
combinedFuture.foreach { case (a, b) =>
println(s"Combined results: $a + $b = ${a + b}")
}
// Using for-comprehensions with multiple futures
val weatherFuture = Future {
Thread.sleep(200)
"Sunny"
}
val temperatureFuture = Future {
Thread.sleep(100)
25
}
val forecastFuture = for {
weather <- weatherFuture
temperature <- temperatureFuture
} yield s"Weather: $weather, Temperature: ${temperature}°C"
forecastFuture.foreach(println)
// Handling partial failures with Future.allOf and Future.traverse
case class ApiResponse(id: Int, data: Option[String], error: Option[String])
def callApi(id: Int): Future[ApiResponse] = Future {
Thread.sleep(scala.util.Random.nextInt(300))
if (scala.util.Random.nextBoolean()) {
ApiResponse(id, Some(s"Data for $id"), None)
} else {
ApiResponse(id, None, Some(s"Error for $id"))
}
}
val apiCallFutures = (1 to 5).map(callApi).toList
val allResponsesFuture = Future.sequence(apiCallFutures)
allResponsesFuture.foreach { responses =>
val successes = responses.filter(_.data.isDefined)
val failures = responses.filter(_.error.isDefined)
println(s"Successful calls: ${successes.length}")
println(s"Failed calls: ${failures.length}")
successes.foreach(r => println(s" Success ${r.id}: ${r.data.get}"))
failures.foreach(r => println(s" Error ${r.id}: ${r.error.get}"))
}
// Racing futures (timeout pattern)
def withTimeout[T](future: Future[T], timeout: FiniteDuration): Future[T] = {
val timeoutFuture = Future {
Thread.sleep(timeout.toMillis)
throw new java.util.concurrent.TimeoutException(s"Operation timed out after $timeout")
}
Future.firstCompletedOf(List(future, timeoutFuture))
}
val slowOperation = Future {
Thread.sleep(2000)
"Slow result"
}
val timedOperation = withTimeout(slowOperation, 1.second)
timedOperation.onComplete {
case Success(result) => println(s"Completed in time: $result")
case Failure(_: java.util.concurrent.TimeoutException) => println("Operation timed out")
case Failure(exception) => println(s"Operation failed: ${exception.getMessage}")
}
Thread.sleep(3000) // Wait for all operations to complete
Execution Contexts
Understanding and Customizing Execution Contexts
import java.util.concurrent.{Executors, ThreadFactory}
import scala.concurrent.ExecutionContext
// Default execution context
println(s"Default EC: ${ExecutionContext.global}")
// Custom thread pool execution context
val customThreadPool = Executors.newFixedThreadPool(4, new ThreadFactory {
private val counter = new java.util.concurrent.atomic.AtomicInteger(0)
def newThread(r: Runnable): Thread = {
val thread = new Thread(r, s"CustomPool-${counter.incrementAndGet()}")
thread.setDaemon(true)
thread
}
})
implicit val customEC: ExecutionContext = ExecutionContext.fromExecutor(customThreadPool)
// CPU-intensive work
def cpuIntensiveTask(n: Int): Future[Long] = Future {
val threadName = Thread.currentThread().getName
println(s"CPU task $n running on thread: $threadName")
// Simulate CPU-intensive work
var sum = 0L
for (i <- 1 to 1000000) {
sum += i
}
sum
}
// IO-intensive work
def ioIntensiveTask(id: Int): Future[String] = Future {
val threadName = Thread.currentThread().getName
println(s"IO task $id running on thread: $threadName")
// Simulate IO operation
Thread.sleep(500)
s"IO result $id"
}
// Execute tasks on custom thread pool
val cpuTasks = (1 to 4).map(cpuIntensiveTask)
val ioTasks = (1 to 6).map(ioIntensiveTask)
val allTasks = Future.sequence(cpuTasks ++ ioTasks)
allTasks.foreach { results =>
println(s"All tasks completed. Results count: ${results.length}")
}
// Dedicated execution context for blocking operations
val blockingEC = ExecutionContext.fromExecutor(Executors.newCachedThreadPool(new ThreadFactory {
private val counter = new java.util.concurrent.atomic.AtomicInteger(0)
def newThread(r: Runnable): Thread = {
val thread = new Thread(r, s"BlockingPool-${counter.incrementAndGet()}")
thread.setDaemon(true)
thread
}
}))
def blockingOperation(id: Int): Future[String] = Future {
val threadName = Thread.currentThread().getName
println(s"Blocking operation $id on thread: $threadName")
// Simulate blocking IO (file read, database query, etc.)
Thread.sleep(1000)
s"Blocking result $id"
}(blockingEC) // Use specific execution context
// Mix blocking and non-blocking operations
val mixedOperations = for {
nonBlocking <- Future {
Thread.currentThread().getName
}(customEC)
blocking <- blockingOperation(1)
} yield (nonBlocking, blocking)
mixedOperations.foreach { case (nbThread, blockingResult) =>
println(s"Non-blocking on: $nbThread, Blocking result: $blockingResult")
}
// Execution context with custom exception handling
val monitoredEC = ExecutionContext.fromExecutor(
customThreadPool,
exception => {
println(s"Uncaught exception in custom EC: ${exception.getMessage}")
exception.printStackTrace()
}
)
Future {
throw new RuntimeException("Test exception")
}(monitoredEC)
// Using different execution contexts for different types of work
class ServiceWithMultipleExecutors {
private val computeEC = ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors())
)
private val ioEC = ExecutionContext.fromExecutor(
Executors.newCachedThreadPool()
)
def computeHash(data: String): Future[String] = Future {
// CPU intensive hash computation
Thread.sleep(100) // Simulate computation
s"hash-${data.hashCode}"
}(computeEC)
def saveToDatabase(data: String): Future[String] = Future {
// IO intensive database operation
Thread.sleep(300) // Simulate IO
s"saved-$data"
}(ioEC)
def processData(data: String): Future[String] = {
for {
hash <- computeHash(data)
saved <- saveToDatabase(hash)
} yield saved
}(computeEC) // Use compute EC for orchestration
}
val service = new ServiceWithMultipleExecutors()
service.processData("example-data").foreach(println)
Thread.sleep(2000)
// Clean up thread pools
customThreadPool.shutdown()
blockingEC.asInstanceOf[ExecutionContext].execute(() =>
blockingEC.asInstanceOf[java.util.concurrent.ExecutorService].shutdown()
)
Promises and Advanced Patterns
Working with Promises
import scala.concurrent.Promise
import scala.util.Random
// Basic Promise usage
val promise = Promise[String]()
val futureFromPromise = promise.future
// Complete the promise in another thread
Future {
Thread.sleep(1000)
if (Random.nextBoolean()) {
promise.success("Promise completed successfully!")
} else {
promise.failure(new RuntimeException("Promise failed!"))
}
}
futureFromPromise.onComplete {
case Success(value) => println(s"Promise result: $value")
case Failure(exception) => println(s"Promise failed: ${exception.getMessage}")
}
// Promise-based callback conversion
def callbackBasedApi(callback: (String, Option[Exception]) => Unit): Unit = {
// Simulate async callback-based API
Future {
Thread.sleep(500)
if (Random.nextBoolean()) {
callback("Callback result", None)
} else {
callback(null, Some(new RuntimeException("Callback error")))
}
}
}
def promiseBasedWrapper(): Future[String] = {
val promise = Promise[String]()
callbackBasedApi { (result, error) =>
error match {
case Some(exception) => promise.failure(exception)
case None => promise.success(result)
}
}
promise.future
}
promiseBasedWrapper().foreach(println)
// Promise-based timeout implementation
def promiseWithTimeout[T](computation: () => T, timeout: FiniteDuration): Future[T] = {
val promise = Promise[T]()
// Start computation
Future {
try {
val result = computation()
promise.trySuccess(result)
} catch {
case exception: Exception =>
promise.tryFailure(exception)
}
}
// Start timeout
Future {
Thread.sleep(timeout.toMillis)
promise.tryFailure(new java.util.concurrent.TimeoutException(s"Computation timed out after $timeout"))
}
promise.future
}
val timeoutComputation = promiseWithTimeout(() => {
Thread.sleep(2000)
"Long computation result"
}, 1.second)
timeoutComputation.onComplete {
case Success(result) => println(s"Computation completed: $result")
case Failure(_: java.util.concurrent.TimeoutException) => println("Computation timed out")
case Failure(exception) => println(s"Computation failed: ${exception.getMessage}")
}
// Producer-consumer pattern with Promise
class MessageQueue[T] {
private var queue = List.empty[T]
private var waitingConsumers = List.empty[Promise[T]]
def produce(item: T): Unit = synchronized {
waitingConsumers match {
case promise :: rest =>
promise.success(item)
waitingConsumers = rest
case Nil =>
queue = queue :+ item
}
}
def consume(): Future[T] = synchronized {
queue match {
case head :: tail =>
queue = tail
Future.successful(head)
case Nil =>
val promise = Promise[T]()
waitingConsumers = waitingConsumers :+ promise
promise.future
}
}
}
val messageQueue = new MessageQueue[String]()
// Start consumers
val consumer1 = messageQueue.consume().map(msg => s"Consumer1 got: $msg")
val consumer2 = messageQueue.consume().map(msg => s"Consumer2 got: $msg")
consumer1.foreach(println)
consumer2.foreach(println)
// Produce messages
Future {
Thread.sleep(500)
messageQueue.produce("Message 1")
Thread.sleep(300)
messageQueue.produce("Message 2")
}
// Circuit breaker pattern
class CircuitBreaker(maxFailures: Int, resetTimeout: FiniteDuration) {
private var state: CircuitBreakerState = Closed
private var failureCount = 0
private var lastFailureTime = 0L
sealed trait CircuitBreakerState
case object Closed extends CircuitBreakerState
case object Open extends CircuitBreakerState
case object HalfOpen extends CircuitBreakerState
def execute[T](operation: () => Future[T]): Future[T] = {
state match {
case Closed =>
val result = operation()
result.onComplete {
case Success(_) =>
synchronized { failureCount = 0 }
case Failure(_) =>
synchronized {
failureCount += 1
lastFailureTime = System.currentTimeMillis()
if (failureCount >= maxFailures) {
state = Open
}
}
}
result
case Open =>
if (System.currentTimeMillis() - lastFailureTime > resetTimeout.toMillis) {
synchronized { state = HalfOpen }
execute(operation)
} else {
Future.failed(new RuntimeException("Circuit breaker is open"))
}
case HalfOpen =>
val result = operation()
result.onComplete {
case Success(_) =>
synchronized {
state = Closed
failureCount = 0
}
case Failure(_) =>
synchronized {
state = Open
lastFailureTime = System.currentTimeMillis()
}
}
result
}
}
}
val circuitBreaker = new CircuitBreaker(maxFailures = 3, resetTimeout = 2.seconds)
def unreliableService(): Future[String] = Future {
if (Random.nextDouble() < 0.7) { // 70% failure rate
throw new RuntimeException("Service failure")
}
"Service success"
}
// Test circuit breaker
(1 to 10).foreach { i =>
circuitBreaker.execute(() => unreliableService()).onComplete {
case Success(result) => println(s"Call $i: $result")
case Failure(exception) => println(s"Call $i failed: ${exception.getMessage}")
}
Thread.sleep(300)
}
Thread.sleep(5000) // Wait for operations to complete
Real-World Concurrent Patterns
Building Concurrent Applications
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.ConcurrentMap
import scala.concurrent.stm._
// Thread-safe counter using AtomicInteger
class AtomicCounter {
private val count = new AtomicInteger(0)
def increment(): Int = count.incrementAndGet()
def decrement(): Int = count.decrementAndGet()
def get(): Int = count.get()
}
val counter = new AtomicCounter()
// Multiple futures incrementing counter concurrently
val incrementFutures = (1 to 100).map { i =>
Future {
Thread.sleep(Random.nextInt(10))
counter.increment()
}
}
Future.sequence(incrementFutures).foreach { results =>
println(s"Final counter value: ${counter.get()}")
println(s"Expected: 100, Actual: ${counter.get()}")
}
// Concurrent map operations
val sharedMap: ConcurrentMap[String, Int] = scala.collection.concurrent.TrieMap.empty
val mapOperations = (1 to 50).map { i =>
Future {
val key = s"key${i % 10}"
sharedMap.put(key, i)
sharedMap.get(key)
}
}
Future.sequence(mapOperations).foreach { results =>
println(s"Map size: ${sharedMap.size}")
println(s"Map contents: ${sharedMap.toMap}")
}
// Parallel data processing pipeline
case class ProcessingJob(id: Int, data: String)
case class ProcessedResult(id: Int, result: String, processedBy: String)
class DataProcessor {
def stage1(job: ProcessingJob): Future[ProcessingJob] = Future {
Thread.sleep(Random.nextInt(100))
job.copy(data = job.data.toUpperCase)
}
def stage2(job: ProcessingJob): Future[ProcessingJob] = Future {
Thread.sleep(Random.nextInt(100))
job.copy(data = job.data.reverse)
}
def stage3(job: ProcessingJob): Future[ProcessedResult] = Future {
Thread.sleep(Random.nextInt(100))
ProcessedResult(job.id, job.data, Thread.currentThread().getName)
}
def processJob(job: ProcessingJob): Future[ProcessedResult] = {
for {
stage1Result <- stage1(job)
stage2Result <- stage2(stage1Result)
finalResult <- stage3(stage2Result)
} yield finalResult
}
}
val processor = new DataProcessor()
val jobs = (1 to 10).map(i => ProcessingJob(i, s"data-$i"))
val processingFutures = jobs.map(processor.processJob)
val allResults = Future.sequence(processingFutures)
allResults.foreach { results =>
println("Processing results:")
results.foreach { result =>
println(s" Job ${result.id}: ${result.result} (by ${result.processedBy})")
}
}
// Batch processing with configurable parallelism
def processBatchWithConcurrency[T, R](
items: List[T],
processor: T => Future[R],
maxConcurrency: Int
): Future[List[R]] = {
def processChunk(chunk: List[T]): Future[List[R]] = {
Future.sequence(chunk.map(processor))
}
val chunks = items.grouped(maxConcurrency).toList
// Process chunks sequentially, but items within each chunk in parallel
chunks.foldLeft(Future.successful(List.empty[R])) { (accFuture, chunk) =>
for {
acc <- accFuture
chunkResults <- processChunk(chunk)
} yield acc ++ chunkResults
}
}
def expensiveOperation(id: Int): Future[String] = Future {
Thread.sleep(200) // Simulate expensive operation
s"Result for $id"
}
val batchItems = (1 to 20).toList
val batchResults = processBatchWithConcurrency(batchItems, expensiveOperation, maxConcurrency = 5)
batchResults.foreach { results =>
println(s"Batch processing completed. Results: ${results.length}")
}
// Fan-out / Fan-in pattern
case class WorkItem(id: Int, data: String)
case class WorkResult(id: Int, result: String, worker: String)
class WorkerPool(workerCount: Int) {
private val workers = (1 to workerCount).map(id => s"Worker-$id")
def distributeWork(items: List[WorkItem]): Future[List[WorkResult]] = {
val workDistribution = items.zipWithIndex.groupBy(_._2 % workerCount)
val workerFutures = workDistribution.map { case (workerIndex, workItems) =>
val worker = workers(workerIndex)
Future.sequence(workItems.map { case (item, _) =>
processWork(item, worker)
})
}.toList
// Fan-in: combine all worker results
Future.sequence(workerFutures).map(_.flatten)
}
private def processWork(item: WorkItem, worker: String): Future[WorkResult] = Future {
Thread.sleep(Random.nextInt(200)) // Simulate work
WorkResult(item.id, s"Processed: ${item.data}", worker)
}
}
val workerPool = new WorkerPool(3)
val workItems = (1 to 15).map(i => WorkItem(i, s"task-$i")).toList
val distributedWork = workerPool.distributeWork(workItems)
distributedWork.foreach { results =>
println("Distributed work results:")
results.sortBy(_.id).foreach { result =>
println(s" Task ${result.id}: ${result.result} (by ${result.worker})")
}
}
// Rate limiting with futures
class RateLimiter(maxRequests: Int, per: FiniteDuration) {
private val timestamps = new java.util.concurrent.ConcurrentLinkedQueue[Long]()
def execute[T](operation: () => Future[T]): Future[T] = {
val now = System.currentTimeMillis()
val windowStart = now - per.toMillis
// Remove old timestamps
while (!timestamps.isEmpty && timestamps.peek() < windowStart) {
timestamps.poll()
}
if (timestamps.size() < maxRequests) {
timestamps.offer(now)
operation()
} else {
Future.failed(new RuntimeException("Rate limit exceeded"))
}
}
}
val rateLimiter = new RateLimiter(maxRequests = 5, per = 1.second)
// Test rate limiter
val rateLimitedTasks = (1 to 10).map { i =>
rateLimiter.execute(() => Future {
s"Task $i executed at ${System.currentTimeMillis()}"
})
}
rateLimitedTasks.foreach(_.onComplete {
case Success(result) => println(s"✓ $result")
case Failure(exception) => println(s"✗ ${exception.getMessage}")
})
Thread.sleep(8000) // Wait for all operations
Testing and Debugging Futures
Testing Strategies
import scala.concurrent.{Future, Promise}
import scala.util.{Success, Failure}
// Testing with Await
class UserService {
def findUser(id: Int): Future[Option[String]] = Future {
Thread.sleep(100)
if (id > 0) Some(s"User-$id") else None
}
def updateUser(id: Int, name: String): Future[Boolean] = Future {
Thread.sleep(200)
id > 0 && name.nonEmpty
}
}
// Synchronous testing (not recommended for production)
def testUserServiceSync(): Unit = {
val service = new UserService()
try {
val user = Await.result(service.findUser(1), 1.second)
assert(user.contains("User-1"))
val updated = Await.result(service.updateUser(1, "Alice"), 1.second)
assert(updated)
println("✓ Synchronous tests passed")
} catch {
case exception: Exception =>
println(s"✗ Test failed: ${exception.getMessage}")
}
}
testUserServiceSync()
// Asynchronous testing patterns
def testUserServiceAsync(): Future[Unit] = {
val service = new UserService()
for {
user <- service.findUser(1)
_ = assert(user.contains("User-1"))
updated <- service.updateUser(1, "Alice")
_ = assert(updated)
invalidUser <- service.findUser(-1)
_ = assert(invalidUser.isEmpty)
} yield {
println("✓ Asynchronous tests passed")
}
}
testUserServiceAsync().recover {
case exception => println(s"✗ Async test failed: ${exception.getMessage}")
}
// Mock futures for testing
trait DatabaseService {
def saveUser(user: String): Future[Int]
def getUser(id: Int): Future[Option[String]]
}
class MockDatabaseService extends DatabaseService {
private var users = Map.empty[Int, String]
private var nextId = 1
def saveUser(user: String): Future[Int] = Future.successful {
val id = nextId
users = users + (id -> user)
nextId += 1
id
}
def getUser(id: Int): Future[Option[String]] = Future.successful {
users.get(id)
}
}
class UserController(db: DatabaseService) {
def createUser(name: String): Future[Int] = {
if (name.trim.isEmpty) {
Future.failed(new IllegalArgumentException("Name cannot be empty"))
} else {
db.saveUser(name)
}
}
def getUser(id: Int): Future[Option[String]] = {
db.getUser(id)
}
}
// Test with mock
def testUserController(): Future[Unit] = {
val mockDb = new MockDatabaseService()
val controller = new UserController(mockDb)
for {
userId <- controller.createUser("Alice")
_ = assert(userId == 1)
user <- controller.getUser(userId)
_ = assert(user.contains("Alice"))
_ <- controller.createUser("").recover {
case _: IllegalArgumentException => -1
}
} yield {
println("✓ Controller tests passed")
}
}
testUserController()
// Testing error scenarios
def testErrorHandling(): Future[Unit] = {
def flakyService(shouldFail: Boolean): Future[String] = {
if (shouldFail) {
Future.failed(new RuntimeException("Service unavailable"))
} else {
Future.successful("Service response")
}
}
def serviceWithRetry(maxRetries: Int): Future[String] = {
def attempt(retriesLeft: Int): Future[String] = {
flakyService(retriesLeft > 0).recoverWith {
case _ if retriesLeft > 0 =>
println(s"Retrying... $retriesLeft attempts left")
attempt(retriesLeft - 1)
case exception =>
Future.failed(exception)
}
}
attempt(maxRetries)
}
serviceWithRetry(3).map { result =>
println(s"✓ Service succeeded: $result")
}.recover {
case exception =>
println(s"✗ Service failed after retries: ${exception.getMessage}")
}
}
testErrorHandling()
// Debugging helpers
def debugFuture[T](name: String)(future: Future[T]): Future[T] = {
println(s"[$name] Future created at ${System.currentTimeMillis()}")
future.andThen {
case Success(value) =>
println(s"[$name] Completed successfully at ${System.currentTimeMillis()}: $value")
case Failure(exception) =>
println(s"[$name] Failed at ${System.currentTimeMillis()}: ${exception.getMessage}")
}
}
// Example usage of debug helper
val debuggedOperation = debugFuture("UserLookup") {
Future {
Thread.sleep(300)
"User data"
}
}
debuggedOperation.foreach(_ => ())
// Timeout testing
def testWithTimeout[T](future: Future[T], timeout: FiniteDuration): Future[T] = {
val timeoutFuture = Future {
Thread.sleep(timeout.toMillis)
throw new java.util.concurrent.TimeoutException(s"Operation timed out after $timeout")
}
Future.firstCompletedOf(List(future, timeoutFuture))
}
val slowOperation = Future {
Thread.sleep(2000)
"Slow result"
}
testWithTimeout(slowOperation, 1.second).onComplete {
case Success(result) => println(s"Operation completed: $result")
case Failure(_: java.util.concurrent.TimeoutException) => println("Operation timed out")
case Failure(exception) => println(s"Operation failed: ${exception.getMessage}")
}
Thread.sleep(5000) // Wait for all tests to complete
Summary
In this lesson, you've mastered concurrency and asynchronous programming with Futures:
✅ Future Basics: Creating, transforming, and composing futures
✅ Error Handling: Recovering from failures and timeout patterns
✅ Execution Contexts: Understanding and customizing thread pools
✅ Promises: Converting callbacks and advanced coordination
✅ Concurrent Patterns: Real-world application architectures
✅ Testing: Strategies for testing asynchronous code
✅ Best Practices: Performance and debugging techniques
Futures provide a powerful foundation for building responsive, scalable applications that can handle multiple concurrent operations efficiently.
What's Next
In the next lesson, we'll explore parallel collections in Scala, learning how to leverage multi-core processors for data-parallel computations and optimize performance for computational workloads.
Comments
Be the first to comment on this lesson!