Concurrency and Futures: Asynchronous Programming

Introduction

Concurrency is essential for building responsive, scalable applications. Scala provides powerful abstractions for asynchronous programming, with Futures being the primary tool for handling concurrent operations. Futures represent values that may not be available yet but will be computed asynchronously.

This lesson will teach you to work with Futures effectively, understand execution contexts, handle asynchronous operations, and build concurrent applications that can handle multiple tasks simultaneously.

Understanding Futures

Basic Future Operations

import scala.concurrent.{Future, ExecutionContext}
import scala.util.{Success, Failure, Try}
import scala.concurrent.duration._

// Default execution context for examples
implicit val ec: ExecutionContext = ExecutionContext.global

// Creating futures
val immediateFuture = Future.successful(42)
val failedFuture = Future.failed(new RuntimeException("Something went wrong"))

// Future from computation
val computationFuture = Future {
  Thread.sleep(1000)  // Simulate long-running computation
  42 * 2
}

println("Future created - computation running in background")

// Blocking wait (avoid in production code)
try {
  val result = scala.concurrent.Await.result(computationFuture, 2.seconds)
  println(s"Computation result: $result")
} catch {
  case _: java.util.concurrent.TimeoutException => 
    println("Computation timed out")
}

// Non-blocking callbacks
val asyncFuture = Future {
  Thread.sleep(500)
  "Hello from the future!"
}

asyncFuture.onComplete {
  case Success(value) => println(s"Success: $value")
  case Failure(exception) => println(s"Failed: ${exception.getMessage}")
}

// Transform futures with map
val numberFuture = Future(10)
val doubledFuture = numberFuture.map(_ * 2)
val stringFuture = doubledFuture.map(n => s"The result is $n")

stringFuture.foreach(println)

// Chain futures with flatMap
def fetchUserId(username: String): Future[Int] = Future {
  Thread.sleep(100)
  if (username.nonEmpty) username.hashCode.abs else throw new IllegalArgumentException("Empty username")
}

def fetchUserProfile(userId: Int): Future[String] = Future {
  Thread.sleep(150)
  s"Profile for user $userId"
}

val userProfile = for {
  userId <- fetchUserId("alice")
  profile <- fetchUserProfile(userId)
} yield profile

userProfile.onComplete {
  case Success(profile) => println(s"Got profile: $profile")
  case Failure(exception) => println(s"Failed to get profile: ${exception.getMessage}")
}

// Error handling with recover
val riskyFuture = Future {
  if (math.random() > 0.5) "Success!"
  else throw new RuntimeException("Random failure")
}

val recoveredFuture = riskyFuture.recover {
  case _: RuntimeException => "Recovered from failure"
}

recoveredFuture.foreach(println)

// Transform failures with recoverWith
val alternativeFuture = riskyFuture.recoverWith {
  case _: RuntimeException => Future {
    Thread.sleep(100)
    "Alternative computation result"
  }
}

alternativeFuture.foreach(println)

// Filtering futures
val evenNumberFuture = Future(scala.util.Random.nextInt(10))
val filteredFuture = evenNumberFuture.filter(_ % 2 == 0)

filteredFuture.onComplete {
  case Success(value) => println(s"Even number: $value")
  case Failure(_) => println("Number was odd")
}

// Transform with andThen (side effects)
val loggedFuture = Future {
  val result = 42 * 3
  println(s"Computed result: $result")
  result
}.andThen {
  case Success(value) => println(s"Logging success: $value")
  case Failure(exception) => println(s"Logging failure: ${exception.getMessage}")
}

Thread.sleep(2000)  // Give futures time to complete

Working with Multiple Futures

// Combining futures with Future.sequence
def fetchData(id: Int): Future[String] = Future {
  Thread.sleep(scala.util.Random.nextInt(500))
  s"Data for ID $id"
}

val dataFutures = (1 to 5).map(fetchData).toList
val allDataFuture = Future.sequence(dataFutures)

allDataFuture.onComplete {
  case Success(allData) => 
    println("All data fetched:")
    allData.foreach(println)
  case Failure(exception) => 
    println(s"Failed to fetch all data: ${exception.getMessage}")
}

// Traverse - map and sequence in one operation
val ids = List(1, 2, 3, 4, 5)
val traversedFuture = Future.traverse(ids)(fetchData)

traversedFuture.foreach { results =>
  println("Traversed results:")
  results.foreach(println)
}

// First completed future with Future.firstCompletedOf
val server1 = Future {
  Thread.sleep(300)
  "Response from server 1"
}

val server2 = Future {
  Thread.sleep(200)
  "Response from server 2"
}

val server3 = Future {
  Thread.sleep(400)
  "Response from server 3"
}

val fastestResponse = Future.firstCompletedOf(List(server1, server2, server3))

fastestResponse.foreach { response =>
  println(s"Fastest response: $response")
}

// Combining futures with zip
val future1 = Future {
  Thread.sleep(100)
  10
}

val future2 = Future {
  Thread.sleep(150)
  20
}

val combinedFuture = future1.zip(future2)

combinedFuture.foreach { case (a, b) =>
  println(s"Combined results: $a + $b = ${a + b}")
}

// Using for-comprehensions with multiple futures
val weatherFuture = Future {
  Thread.sleep(200)
  "Sunny"
}

val temperatureFuture = Future {
  Thread.sleep(100)
  25
}

val forecastFuture = for {
  weather <- weatherFuture
  temperature <- temperatureFuture
} yield s"Weather: $weather, Temperature: ${temperature}°C"

forecastFuture.foreach(println)

// Handling partial failures with Future.allOf and Future.traverse
case class ApiResponse(id: Int, data: Option[String], error: Option[String])

def callApi(id: Int): Future[ApiResponse] = Future {
  Thread.sleep(scala.util.Random.nextInt(300))
  if (scala.util.Random.nextBoolean()) {
    ApiResponse(id, Some(s"Data for $id"), None)
  } else {
    ApiResponse(id, None, Some(s"Error for $id"))
  }
}

val apiCallFutures = (1 to 5).map(callApi).toList
val allResponsesFuture = Future.sequence(apiCallFutures)

allResponsesFuture.foreach { responses =>
  val successes = responses.filter(_.data.isDefined)
  val failures = responses.filter(_.error.isDefined)

  println(s"Successful calls: ${successes.length}")
  println(s"Failed calls: ${failures.length}")

  successes.foreach(r => println(s"  Success ${r.id}: ${r.data.get}"))
  failures.foreach(r => println(s"  Error ${r.id}: ${r.error.get}"))
}

// Racing futures (timeout pattern)
def withTimeout[T](future: Future[T], timeout: FiniteDuration): Future[T] = {
  val timeoutFuture = Future {
    Thread.sleep(timeout.toMillis)
    throw new java.util.concurrent.TimeoutException(s"Operation timed out after $timeout")
  }

  Future.firstCompletedOf(List(future, timeoutFuture))
}

val slowOperation = Future {
  Thread.sleep(2000)
  "Slow result"
}

val timedOperation = withTimeout(slowOperation, 1.second)

timedOperation.onComplete {
  case Success(result) => println(s"Completed in time: $result")
  case Failure(_: java.util.concurrent.TimeoutException) => println("Operation timed out")
  case Failure(exception) => println(s"Operation failed: ${exception.getMessage}")
}

Thread.sleep(3000)  // Wait for all operations to complete

Execution Contexts

Understanding and Customizing Execution Contexts

import java.util.concurrent.{Executors, ThreadFactory}
import scala.concurrent.ExecutionContext

// Default execution context
println(s"Default EC: ${ExecutionContext.global}")

// Custom thread pool execution context
val customThreadPool = Executors.newFixedThreadPool(4, new ThreadFactory {
  private val counter = new java.util.concurrent.atomic.AtomicInteger(0)
  def newThread(r: Runnable): Thread = {
    val thread = new Thread(r, s"CustomPool-${counter.incrementAndGet()}")
    thread.setDaemon(true)
    thread
  }
})

implicit val customEC: ExecutionContext = ExecutionContext.fromExecutor(customThreadPool)

// CPU-intensive work
def cpuIntensiveTask(n: Int): Future[Long] = Future {
  val threadName = Thread.currentThread().getName
  println(s"CPU task $n running on thread: $threadName")

  // Simulate CPU-intensive work
  var sum = 0L
  for (i <- 1 to 1000000) {
    sum += i
  }
  sum
}

// IO-intensive work  
def ioIntensiveTask(id: Int): Future[String] = Future {
  val threadName = Thread.currentThread().getName
  println(s"IO task $id running on thread: $threadName")

  // Simulate IO operation
  Thread.sleep(500)
  s"IO result $id"
}

// Execute tasks on custom thread pool
val cpuTasks = (1 to 4).map(cpuIntensiveTask)
val ioTasks = (1 to 6).map(ioIntensiveTask)

val allTasks = Future.sequence(cpuTasks ++ ioTasks)

allTasks.foreach { results =>
  println(s"All tasks completed. Results count: ${results.length}")
}

// Dedicated execution context for blocking operations
val blockingEC = ExecutionContext.fromExecutor(Executors.newCachedThreadPool(new ThreadFactory {
  private val counter = new java.util.concurrent.atomic.AtomicInteger(0)
  def newThread(r: Runnable): Thread = {
    val thread = new Thread(r, s"BlockingPool-${counter.incrementAndGet()}")
    thread.setDaemon(true)
    thread
  }
}))

def blockingOperation(id: Int): Future[String] = Future {
  val threadName = Thread.currentThread().getName
  println(s"Blocking operation $id on thread: $threadName")

  // Simulate blocking IO (file read, database query, etc.)
  Thread.sleep(1000)
  s"Blocking result $id"
}(blockingEC)  // Use specific execution context

// Mix blocking and non-blocking operations
val mixedOperations = for {
  nonBlocking <- Future {
    Thread.currentThread().getName
  }(customEC)
  blocking <- blockingOperation(1)
} yield (nonBlocking, blocking)

mixedOperations.foreach { case (nbThread, blockingResult) =>
  println(s"Non-blocking on: $nbThread, Blocking result: $blockingResult")
}

// Execution context with custom exception handling
val monitoredEC = ExecutionContext.fromExecutor(
  customThreadPool,
  exception => {
    println(s"Uncaught exception in custom EC: ${exception.getMessage}")
    exception.printStackTrace()
  }
)

Future {
  throw new RuntimeException("Test exception")
}(monitoredEC)

// Using different execution contexts for different types of work
class ServiceWithMultipleExecutors {
  private val computeEC = ExecutionContext.fromExecutor(
    Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors())
  )

  private val ioEC = ExecutionContext.fromExecutor(
    Executors.newCachedThreadPool()
  )

  def computeHash(data: String): Future[String] = Future {
    // CPU intensive hash computation
    Thread.sleep(100)  // Simulate computation
    s"hash-${data.hashCode}"
  }(computeEC)

  def saveToDatabase(data: String): Future[String] = Future {
    // IO intensive database operation
    Thread.sleep(300)  // Simulate IO
    s"saved-$data"
  }(ioEC)

  def processData(data: String): Future[String] = {
    for {
      hash <- computeHash(data)
      saved <- saveToDatabase(hash)
    } yield saved
  }(computeEC)  // Use compute EC for orchestration
}

val service = new ServiceWithMultipleExecutors()
service.processData("example-data").foreach(println)

Thread.sleep(2000)

// Clean up thread pools
customThreadPool.shutdown()
blockingEC.asInstanceOf[ExecutionContext].execute(() => 
  blockingEC.asInstanceOf[java.util.concurrent.ExecutorService].shutdown()
)

Promises and Advanced Patterns

Working with Promises

import scala.concurrent.Promise
import scala.util.Random

// Basic Promise usage
val promise = Promise[String]()
val futureFromPromise = promise.future

// Complete the promise in another thread
Future {
  Thread.sleep(1000)
  if (Random.nextBoolean()) {
    promise.success("Promise completed successfully!")
  } else {
    promise.failure(new RuntimeException("Promise failed!"))
  }
}

futureFromPromise.onComplete {
  case Success(value) => println(s"Promise result: $value")
  case Failure(exception) => println(s"Promise failed: ${exception.getMessage}")
}

// Promise-based callback conversion
def callbackBasedApi(callback: (String, Option[Exception]) => Unit): Unit = {
  // Simulate async callback-based API
  Future {
    Thread.sleep(500)
    if (Random.nextBoolean()) {
      callback("Callback result", None)
    } else {
      callback(null, Some(new RuntimeException("Callback error")))
    }
  }
}

def promiseBasedWrapper(): Future[String] = {
  val promise = Promise[String]()

  callbackBasedApi { (result, error) =>
    error match {
      case Some(exception) => promise.failure(exception)
      case None => promise.success(result)
    }
  }

  promise.future
}

promiseBasedWrapper().foreach(println)

// Promise-based timeout implementation
def promiseWithTimeout[T](computation: () => T, timeout: FiniteDuration): Future[T] = {
  val promise = Promise[T]()

  // Start computation
  Future {
    try {
      val result = computation()
      promise.trySuccess(result)
    } catch {
      case exception: Exception =>
        promise.tryFailure(exception)
    }
  }

  // Start timeout
  Future {
    Thread.sleep(timeout.toMillis)
    promise.tryFailure(new java.util.concurrent.TimeoutException(s"Computation timed out after $timeout"))
  }

  promise.future
}

val timeoutComputation = promiseWithTimeout(() => {
  Thread.sleep(2000)
  "Long computation result"
}, 1.second)

timeoutComputation.onComplete {
  case Success(result) => println(s"Computation completed: $result")
  case Failure(_: java.util.concurrent.TimeoutException) => println("Computation timed out")
  case Failure(exception) => println(s"Computation failed: ${exception.getMessage}")
}

// Producer-consumer pattern with Promise
class MessageQueue[T] {
  private var queue = List.empty[T]
  private var waitingConsumers = List.empty[Promise[T]]

  def produce(item: T): Unit = synchronized {
    waitingConsumers match {
      case promise :: rest =>
        promise.success(item)
        waitingConsumers = rest
      case Nil =>
        queue = queue :+ item
    }
  }

  def consume(): Future[T] = synchronized {
    queue match {
      case head :: tail =>
        queue = tail
        Future.successful(head)
      case Nil =>
        val promise = Promise[T]()
        waitingConsumers = waitingConsumers :+ promise
        promise.future
    }
  }
}

val messageQueue = new MessageQueue[String]()

// Start consumers
val consumer1 = messageQueue.consume().map(msg => s"Consumer1 got: $msg")
val consumer2 = messageQueue.consume().map(msg => s"Consumer2 got: $msg")

consumer1.foreach(println)
consumer2.foreach(println)

// Produce messages
Future {
  Thread.sleep(500)
  messageQueue.produce("Message 1")
  Thread.sleep(300)
  messageQueue.produce("Message 2")
}

// Circuit breaker pattern
class CircuitBreaker(maxFailures: Int, resetTimeout: FiniteDuration) {
  private var state: CircuitBreakerState = Closed
  private var failureCount = 0
  private var lastFailureTime = 0L

  sealed trait CircuitBreakerState
  case object Closed extends CircuitBreakerState
  case object Open extends CircuitBreakerState
  case object HalfOpen extends CircuitBreakerState

  def execute[T](operation: () => Future[T]): Future[T] = {
    state match {
      case Closed =>
        val result = operation()
        result.onComplete {
          case Success(_) => 
            synchronized { failureCount = 0 }
          case Failure(_) => 
            synchronized {
              failureCount += 1
              lastFailureTime = System.currentTimeMillis()
              if (failureCount >= maxFailures) {
                state = Open
              }
            }
        }
        result

      case Open =>
        if (System.currentTimeMillis() - lastFailureTime > resetTimeout.toMillis) {
          synchronized { state = HalfOpen }
          execute(operation)
        } else {
          Future.failed(new RuntimeException("Circuit breaker is open"))
        }

      case HalfOpen =>
        val result = operation()
        result.onComplete {
          case Success(_) => 
            synchronized { 
              state = Closed
              failureCount = 0
            }
          case Failure(_) => 
            synchronized { 
              state = Open
              lastFailureTime = System.currentTimeMillis()
            }
        }
        result
    }
  }
}

val circuitBreaker = new CircuitBreaker(maxFailures = 3, resetTimeout = 2.seconds)

def unreliableService(): Future[String] = Future {
  if (Random.nextDouble() < 0.7) {  // 70% failure rate
    throw new RuntimeException("Service failure")
  }
  "Service success"
}

// Test circuit breaker
(1 to 10).foreach { i =>
  circuitBreaker.execute(() => unreliableService()).onComplete {
    case Success(result) => println(s"Call $i: $result")
    case Failure(exception) => println(s"Call $i failed: ${exception.getMessage}")
  }
  Thread.sleep(300)
}

Thread.sleep(5000)  // Wait for operations to complete

Real-World Concurrent Patterns

Building Concurrent Applications

import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.ConcurrentMap
import scala.concurrent.stm._

// Thread-safe counter using AtomicInteger
class AtomicCounter {
  private val count = new AtomicInteger(0)

  def increment(): Int = count.incrementAndGet()
  def decrement(): Int = count.decrementAndGet()
  def get(): Int = count.get()
}

val counter = new AtomicCounter()

// Multiple futures incrementing counter concurrently
val incrementFutures = (1 to 100).map { i =>
  Future {
    Thread.sleep(Random.nextInt(10))
    counter.increment()
  }
}

Future.sequence(incrementFutures).foreach { results =>
  println(s"Final counter value: ${counter.get()}")
  println(s"Expected: 100, Actual: ${counter.get()}")
}

// Concurrent map operations
val sharedMap: ConcurrentMap[String, Int] = scala.collection.concurrent.TrieMap.empty

val mapOperations = (1 to 50).map { i =>
  Future {
    val key = s"key${i % 10}"
    sharedMap.put(key, i)
    sharedMap.get(key)
  }
}

Future.sequence(mapOperations).foreach { results =>
  println(s"Map size: ${sharedMap.size}")
  println(s"Map contents: ${sharedMap.toMap}")
}

// Parallel data processing pipeline
case class ProcessingJob(id: Int, data: String)
case class ProcessedResult(id: Int, result: String, processedBy: String)

class DataProcessor {
  def stage1(job: ProcessingJob): Future[ProcessingJob] = Future {
    Thread.sleep(Random.nextInt(100))
    job.copy(data = job.data.toUpperCase)
  }

  def stage2(job: ProcessingJob): Future[ProcessingJob] = Future {
    Thread.sleep(Random.nextInt(100))
    job.copy(data = job.data.reverse)
  }

  def stage3(job: ProcessingJob): Future[ProcessedResult] = Future {
    Thread.sleep(Random.nextInt(100))
    ProcessedResult(job.id, job.data, Thread.currentThread().getName)
  }

  def processJob(job: ProcessingJob): Future[ProcessedResult] = {
    for {
      stage1Result <- stage1(job)
      stage2Result <- stage2(stage1Result)
      finalResult <- stage3(stage2Result)
    } yield finalResult
  }
}

val processor = new DataProcessor()
val jobs = (1 to 10).map(i => ProcessingJob(i, s"data-$i"))

val processingFutures = jobs.map(processor.processJob)
val allResults = Future.sequence(processingFutures)

allResults.foreach { results =>
  println("Processing results:")
  results.foreach { result =>
    println(s"  Job ${result.id}: ${result.result} (by ${result.processedBy})")
  }
}

// Batch processing with configurable parallelism
def processBatchWithConcurrency[T, R](
  items: List[T], 
  processor: T => Future[R], 
  maxConcurrency: Int
): Future[List[R]] = {

  def processChunk(chunk: List[T]): Future[List[R]] = {
    Future.sequence(chunk.map(processor))
  }

  val chunks = items.grouped(maxConcurrency).toList

  // Process chunks sequentially, but items within each chunk in parallel
  chunks.foldLeft(Future.successful(List.empty[R])) { (accFuture, chunk) =>
    for {
      acc <- accFuture
      chunkResults <- processChunk(chunk)
    } yield acc ++ chunkResults
  }
}

def expensiveOperation(id: Int): Future[String] = Future {
  Thread.sleep(200)  // Simulate expensive operation
  s"Result for $id"
}

val batchItems = (1 to 20).toList
val batchResults = processBatchWithConcurrency(batchItems, expensiveOperation, maxConcurrency = 5)

batchResults.foreach { results =>
  println(s"Batch processing completed. Results: ${results.length}")
}

// Fan-out / Fan-in pattern
case class WorkItem(id: Int, data: String)
case class WorkResult(id: Int, result: String, worker: String)

class WorkerPool(workerCount: Int) {
  private val workers = (1 to workerCount).map(id => s"Worker-$id")

  def distributeWork(items: List[WorkItem]): Future[List[WorkResult]] = {
    val workDistribution = items.zipWithIndex.groupBy(_._2 % workerCount)

    val workerFutures = workDistribution.map { case (workerIndex, workItems) =>
      val worker = workers(workerIndex)
      Future.sequence(workItems.map { case (item, _) =>
        processWork(item, worker)
      })
    }.toList

    // Fan-in: combine all worker results
    Future.sequence(workerFutures).map(_.flatten)
  }

  private def processWork(item: WorkItem, worker: String): Future[WorkResult] = Future {
    Thread.sleep(Random.nextInt(200))  // Simulate work
    WorkResult(item.id, s"Processed: ${item.data}", worker)
  }
}

val workerPool = new WorkerPool(3)
val workItems = (1 to 15).map(i => WorkItem(i, s"task-$i")).toList

val distributedWork = workerPool.distributeWork(workItems)

distributedWork.foreach { results =>
  println("Distributed work results:")
  results.sortBy(_.id).foreach { result =>
    println(s"  Task ${result.id}: ${result.result} (by ${result.worker})")
  }
}

// Rate limiting with futures
class RateLimiter(maxRequests: Int, per: FiniteDuration) {
  private val timestamps = new java.util.concurrent.ConcurrentLinkedQueue[Long]()

  def execute[T](operation: () => Future[T]): Future[T] = {
    val now = System.currentTimeMillis()
    val windowStart = now - per.toMillis

    // Remove old timestamps
    while (!timestamps.isEmpty && timestamps.peek() < windowStart) {
      timestamps.poll()
    }

    if (timestamps.size() < maxRequests) {
      timestamps.offer(now)
      operation()
    } else {
      Future.failed(new RuntimeException("Rate limit exceeded"))
    }
  }
}

val rateLimiter = new RateLimiter(maxRequests = 5, per = 1.second)

// Test rate limiter
val rateLimitedTasks = (1 to 10).map { i =>
  rateLimiter.execute(() => Future {
    s"Task $i executed at ${System.currentTimeMillis()}"
  })
}

rateLimitedTasks.foreach(_.onComplete {
  case Success(result) => println(s"✓ $result")
  case Failure(exception) => println(s"✗ ${exception.getMessage}")
})

Thread.sleep(8000)  // Wait for all operations

Testing and Debugging Futures

Testing Strategies

import scala.concurrent.{Future, Promise}
import scala.util.{Success, Failure}

// Testing with Await
class UserService {
  def findUser(id: Int): Future[Option[String]] = Future {
    Thread.sleep(100)
    if (id > 0) Some(s"User-$id") else None
  }

  def updateUser(id: Int, name: String): Future[Boolean] = Future {
    Thread.sleep(200)
    id > 0 && name.nonEmpty
  }
}

// Synchronous testing (not recommended for production)
def testUserServiceSync(): Unit = {
  val service = new UserService()

  try {
    val user = Await.result(service.findUser(1), 1.second)
    assert(user.contains("User-1"))

    val updated = Await.result(service.updateUser(1, "Alice"), 1.second)
    assert(updated)

    println("✓ Synchronous tests passed")
  } catch {
    case exception: Exception =>
      println(s"✗ Test failed: ${exception.getMessage}")
  }
}

testUserServiceSync()

// Asynchronous testing patterns
def testUserServiceAsync(): Future[Unit] = {
  val service = new UserService()

  for {
    user <- service.findUser(1)
    _ = assert(user.contains("User-1"))

    updated <- service.updateUser(1, "Alice")
    _ = assert(updated)

    invalidUser <- service.findUser(-1)
    _ = assert(invalidUser.isEmpty)

  } yield {
    println("✓ Asynchronous tests passed")
  }
}

testUserServiceAsync().recover {
  case exception => println(s"✗ Async test failed: ${exception.getMessage}")
}

// Mock futures for testing
trait DatabaseService {
  def saveUser(user: String): Future[Int]
  def getUser(id: Int): Future[Option[String]]
}

class MockDatabaseService extends DatabaseService {
  private var users = Map.empty[Int, String]
  private var nextId = 1

  def saveUser(user: String): Future[Int] = Future.successful {
    val id = nextId
    users = users + (id -> user)
    nextId += 1
    id
  }

  def getUser(id: Int): Future[Option[String]] = Future.successful {
    users.get(id)
  }
}

class UserController(db: DatabaseService) {
  def createUser(name: String): Future[Int] = {
    if (name.trim.isEmpty) {
      Future.failed(new IllegalArgumentException("Name cannot be empty"))
    } else {
      db.saveUser(name)
    }
  }

  def getUser(id: Int): Future[Option[String]] = {
    db.getUser(id)
  }
}

// Test with mock
def testUserController(): Future[Unit] = {
  val mockDb = new MockDatabaseService()
  val controller = new UserController(mockDb)

  for {
    userId <- controller.createUser("Alice")
    _ = assert(userId == 1)

    user <- controller.getUser(userId)
    _ = assert(user.contains("Alice"))

    _ <- controller.createUser("").recover {
      case _: IllegalArgumentException => -1
    }

  } yield {
    println("✓ Controller tests passed")
  }
}

testUserController()

// Testing error scenarios
def testErrorHandling(): Future[Unit] = {
  def flakyService(shouldFail: Boolean): Future[String] = {
    if (shouldFail) {
      Future.failed(new RuntimeException("Service unavailable"))
    } else {
      Future.successful("Service response")
    }
  }

  def serviceWithRetry(maxRetries: Int): Future[String] = {
    def attempt(retriesLeft: Int): Future[String] = {
      flakyService(retriesLeft > 0).recoverWith {
        case _ if retriesLeft > 0 =>
          println(s"Retrying... $retriesLeft attempts left")
          attempt(retriesLeft - 1)
        case exception =>
          Future.failed(exception)
      }
    }

    attempt(maxRetries)
  }

  serviceWithRetry(3).map { result =>
    println(s"✓ Service succeeded: $result")
  }.recover {
    case exception =>
      println(s"✗ Service failed after retries: ${exception.getMessage}")
  }
}

testErrorHandling()

// Debugging helpers
def debugFuture[T](name: String)(future: Future[T]): Future[T] = {
  println(s"[$name] Future created at ${System.currentTimeMillis()}")

  future.andThen {
    case Success(value) => 
      println(s"[$name] Completed successfully at ${System.currentTimeMillis()}: $value")
    case Failure(exception) => 
      println(s"[$name] Failed at ${System.currentTimeMillis()}: ${exception.getMessage}")
  }
}

// Example usage of debug helper
val debuggedOperation = debugFuture("UserLookup") {
  Future {
    Thread.sleep(300)
    "User data"
  }
}

debuggedOperation.foreach(_ => ())

// Timeout testing
def testWithTimeout[T](future: Future[T], timeout: FiniteDuration): Future[T] = {
  val timeoutFuture = Future {
    Thread.sleep(timeout.toMillis)
    throw new java.util.concurrent.TimeoutException(s"Operation timed out after $timeout")
  }

  Future.firstCompletedOf(List(future, timeoutFuture))
}

val slowOperation = Future {
  Thread.sleep(2000)
  "Slow result"
}

testWithTimeout(slowOperation, 1.second).onComplete {
  case Success(result) => println(s"Operation completed: $result")
  case Failure(_: java.util.concurrent.TimeoutException) => println("Operation timed out")
  case Failure(exception) => println(s"Operation failed: ${exception.getMessage}")
}

Thread.sleep(5000)  // Wait for all tests to complete

Summary

In this lesson, you've mastered concurrency and asynchronous programming with Futures:

Future Basics: Creating, transforming, and composing futures
Error Handling: Recovering from failures and timeout patterns
Execution Contexts: Understanding and customizing thread pools
Promises: Converting callbacks and advanced coordination
Concurrent Patterns: Real-world application architectures
Testing: Strategies for testing asynchronous code
Best Practices: Performance and debugging techniques

Futures provide a powerful foundation for building responsive, scalable applications that can handle multiple concurrent operations efficiently.

What's Next

In the next lesson, we'll explore parallel collections in Scala, learning how to leverage multi-core processors for data-parallel computations and optimize performance for computational workloads.