Concurrency Patterns and Parallel Computing: Beyond Futures and Actors

Modern applications demand high performance and responsiveness, requiring sophisticated concurrency and parallelism strategies. In this comprehensive lesson, we'll explore advanced concurrency patterns in Scala, from low-level thread management to distributed computing architectures.

Understanding Concurrency vs Parallelism

Fundamental Concepts

Concurrency and parallelism are related but distinct concepts that are crucial for modern application development.

import java.util.concurrent._
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{Success, Failure, Random}

// Concurrency: Dealing with multiple tasks at once (may run on single core)
// Parallelism: Actually executing multiple tasks simultaneously (requires multiple cores)

// Basic thread management
class ThreadExample {
  def demonstrateThreads(): Unit = {
    val thread1 = new Thread(() => {
      for (i <- 1 to 5) {
        println(s"Thread 1: $i")
        Thread.sleep(100)
      }
    })

    val thread2 = new Thread(() => {
      for (i <- 1 to 5) {
        println(s"Thread 2: $i")
        Thread.sleep(150)
      }
    })

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()

    println("Both threads completed")
  }
}

// Thread-safe shared state with synchronization
class Counter {
  @volatile private var count: Int = 0
  private val lock = new Object

  def increment(): Int = lock.synchronized {
    count += 1
    count
  }

  def get: Int = count // @volatile ensures visibility

  def incrementAtomic(): Int = {
    import java.util.concurrent.atomic.AtomicInteger
    val atomicCount = new AtomicInteger(count)
    atomicCount.incrementAndGet()
  }
}

// Race condition demonstration
def demonstrateRaceCondition(): Unit = {
  val counter = new Counter
  val threads = for (i <- 1 to 10) yield new Thread(() => {
    for (_ <- 1 to 1000) {
      counter.increment()
    }
  })

  threads.foreach(_.start())
  threads.foreach(_.join())

  println(s"Final count: ${counter.get}") // Should be 10000
}

Thread Pools and Execution Contexts

// Custom execution contexts for different types of work
object ExecutionContexts {
  // CPU-intensive work
  val cpuBound: ExecutionContext = ExecutionContext.fromExecutor(
    Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors())
  )

  // I/O-intensive work
  val ioBound: ExecutionContext = ExecutionContext.fromExecutor(
    Executors.newCachedThreadPool()
  )

  // Single-threaded for sequential operations
  val sequential: ExecutionContext = ExecutionContext.fromExecutor(
    Executors.newSingleThreadExecutor()
  )

  // Custom thread pool with monitoring
  def createMonitoredThreadPool(name: String, poolSize: Int): ExecutionContext = {
    val threadFactory = new ThreadFactory {
      private val counter = new java.util.concurrent.atomic.AtomicInteger(0)

      def newThread(r: Runnable): Thread = {
        val thread = new Thread(r, s"$name-${counter.incrementAndGet()}")
        thread.setDaemon(false)
        thread.setUncaughtExceptionHandler((t, e) => {
          println(s"Uncaught exception in thread ${t.getName}: ${e.getMessage}")
          e.printStackTrace()
        })
        thread
      }
    }

    ExecutionContext.fromExecutor(
      new ThreadPoolExecutor(
        poolSize, poolSize,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue[Runnable](),
        threadFactory
      )
    )
  }
}

// Using different execution contexts
class WorkloadManager {
  import ExecutionContexts._

  def performCpuIntensiveTask(data: List[Int]): Future[Int] = {
    Future {
      // Simulate CPU-intensive computation
      data.map(x => x * x).sum
    }(cpuBound)
  }

  def performIoTask(url: String): Future[String] = {
    Future {
      // Simulate I/O operation
      Thread.sleep(1000)
      s"Data from $url"
    }(ioBound)
  }

  def performSequentialTask(items: List[String]): Future[List[String]] = {
    Future {
      items.map(_.toUpperCase)
    }(sequential)
  }
}

Advanced Future Patterns

Future Composition and Error Handling

// Advanced Future composition techniques
class FutureCompositionExamples {
  implicit val ec: ExecutionContext = ExecutionContext.global

  // Sequential vs Parallel execution
  def sequentialExecution(urls: List[String]): Future[List[String]] = {
    urls.foldLeft(Future.successful(List.empty[String])) { (acc, url) =>
      for {
        results <- acc
        result <- fetchData(url)
      } yield results :+ result
    }
  }

  def parallelExecution(urls: List[String]): Future[List[String]] = {
    val futures = urls.map(fetchData)
    Future.sequence(futures)
  }

  // Timeout handling
  def withTimeout[T](future: Future[T], timeout: FiniteDuration): Future[T] = {
    val promise = Promise[T]()

    future.onComplete(promise.tryComplete)

    Future {
      Thread.sleep(timeout.toMillis)
      promise.tryFailure(new TimeoutException(s"Operation timed out after $timeout"))
    }

    promise.future
  }

  // Retry mechanism
  def retry[T](operation: () => Future[T], maxRetries: Int, delay: FiniteDuration): Future[T] = {
    def attempt(retriesLeft: Int): Future[T] = {
      operation().recoverWith {
        case ex if retriesLeft > 0 =>
          println(s"Operation failed, retrying... ($retriesLeft retries left)")
          Future {
            Thread.sleep(delay.toMillis)
          }.flatMap(_ => attempt(retriesLeft - 1))
        case ex => 
          Future.failed(ex)
      }
    }

    attempt(maxRetries)
  }

  // Circuit breaker pattern
  class CircuitBreaker[T](
    failureThreshold: Int,
    timeout: FiniteDuration,
    resetTimeout: FiniteDuration
  ) {
    private enum State {
      case Closed, Open, HalfOpen
    }

    @volatile private var state: State = State.Closed
    @volatile private var failureCount: Int = 0
    @volatile private var lastFailureTime: Long = 0

    def call(operation: () => Future[T]): Future[T] = {
      state match {
        case State.Closed =>
          executeOperation(operation)
        case State.Open =>
          if (System.currentTimeMillis() - lastFailureTime > resetTimeout.toMillis) {
            state = State.HalfOpen
            executeOperation(operation)
          } else {
            Future.failed(new Exception("Circuit breaker is OPEN"))
          }
        case State.HalfOpen =>
          executeOperation(operation)
      }
    }

    private def executeOperation(operation: () => Future[T]): Future[T] = {
      operation().andThen {
        case Success(_) =>
          failureCount = 0
          state = State.Closed
        case Failure(_) =>
          failureCount += 1
          lastFailureTime = System.currentTimeMillis()
          if (failureCount >= failureThreshold) {
            state = State.Open
          }
      }
    }
  }

  // Bulkhead pattern - isolate resources
  class BulkheadExecutor {
    private val criticalPool = ExecutionContext.fromExecutor(
      Executors.newFixedThreadPool(2)
    )
    private val normalPool = ExecutionContext.fromExecutor(
      Executors.newFixedThreadPool(8)
    )

    def executeCritical[T](task: => T): Future[T] = 
      Future(task)(criticalPool)

    def executeNormal[T](task: => T): Future[T] = 
      Future(task)(normalPool)
  }

  private def fetchData(url: String): Future[String] = {
    Future {
      Thread.sleep(Random.nextInt(1000))
      if (Random.nextDouble() < 0.1) throw new Exception(s"Failed to fetch $url")
      s"Data from $url"
    }
  }
}

Async/Await Pattern

// Async/await pattern for cleaner asynchronous code
import scala.async.Async.{async, await}

class AsyncAwaitExamples {
  implicit val ec: ExecutionContext = ExecutionContext.global

  // Traditional Future composition
  def traditionalComposition(userId: String): Future[String] = {
    fetchUser(userId).flatMap { user =>
      fetchProfile(user.id).flatMap { profile =>
        fetchPreferences(user.id).map { preferences =>
          s"User: ${user.name}, Profile: ${profile.bio}, Theme: ${preferences.theme}"
        }
      }
    }
  }

  // Async/await version
  def asyncAwaitComposition(userId: String): Future[String] = async {
    val user = await(fetchUser(userId))
    val profile = await(fetchProfile(user.id))
    val preferences = await(fetchPreferences(user.id))

    s"User: ${user.name}, Profile: ${profile.bio}, Theme: ${preferences.theme}"
  }

  // Parallel execution with async/await
  def parallelAsyncAwait(userId: String): Future[String] = async {
    val user = await(fetchUser(userId))

    // These can run in parallel
    val profileFuture = fetchProfile(user.id)
    val preferencesFuture = fetchPreferences(user.id)

    val profile = await(profileFuture)
    val preferences = await(preferencesFuture)

    s"User: ${user.name}, Profile: ${profile.bio}, Theme: ${preferences.theme}"
  }

  // Error handling with async/await
  def asyncAwaitWithErrorHandling(userId: String): Future[Either[String, String]] = async {
    try {
      val user = await(fetchUser(userId))
      val profile = await(fetchProfile(user.id))
      Right(s"User: ${user.name}, Profile: ${profile.bio}")
    } catch {
      case ex: Exception => Left(s"Error: ${ex.getMessage}")
    }
  }

  // Helper classes and methods
  case class User(id: String, name: String)
  case class Profile(bio: String)
  case class Preferences(theme: String)

  private def fetchUser(id: String): Future[User] = 
    Future(User(id, s"User$id"))

  private def fetchProfile(userId: String): Future[Profile] = 
    Future(Profile(s"Bio for $userId"))

  private def fetchPreferences(userId: String): Future[Preferences] = 
    Future(Preferences("dark"))
}

Parallel Collections

Built-in Parallel Collections

// Parallel collections for data parallelism
class ParallelCollectionExamples {

  // CPU-intensive operations benefit from parallelization
  def demonstrateParallelCollections(): Unit = {
    val largeList = (1 to 10000000).toList

    // Sequential processing
    val start1 = System.currentTimeMillis()
    val sequentialResult = largeList.map(x => math.sqrt(x * x + 1)).sum
    val sequential Time = System.currentTimeMillis() - start1

    // Parallel processing
    val start2 = System.currentTimeMillis()
    val parallelResult = largeList.par.map(x => math.sqrt(x * x + 1)).sum
    val parallelTime = System.currentTimeMillis() - start2

    println(s"Sequential: ${sequentialTime}ms, Result: $sequentialResult")
    println(s"Parallel: ${parallelTime}ms, Result: $parallelResult")
    println(s"Speedup: ${sequentialTime.toDouble / parallelTime}x")
  }

  // Custom parallel operations
  def parallelGroupBy[T, K](collection: List[T])(keyFunc: T => K): Map[K, List[T]] = {
    collection.par
      .groupBy(keyFunc)
      .map { case (k, v) => k -> v.toList }
      .seq
      .toMap
  }

  def parallelFilter[T](collection: List[T])(predicate: T => Boolean): List[T] = {
    collection.par.filter(predicate).toList
  }

  // Parallel fold with custom operations
  def parallelWordCount(texts: List[String]): Map[String, Int] = {
    texts.par
      .flatMap(_.toLowerCase.split("\\s+"))
      .groupBy(identity)
      .mapValues(_.size)
      .seq
      .toMap
  }

  // Monte Carlo simulation using parallel collections
  def estimatePi(samples: Int): Double = {
    val hits = (1 to samples).par.count { _ =>
      val x = Random.nextDouble()
      val y = Random.nextDouble()
      x * x + y * y <= 1.0
    }

    4.0 * hits / samples
  }

  // Custom parallel aggregation
  def parallelHistogram[T](data: List[T], buckets: Int)(valueFunc: T => Double): Array[Int] = {
    val minValue = data.par.map(valueFunc).min
    val maxValue = data.par.map(valueFunc).max
    val range = maxValue - minValue
    val bucketSize = range / buckets

    val histogram = Array.fill(buckets)(0)

    data.par.foreach { item =>
      val value = valueFunc(item)
      val bucketIndex = math.min(((value - minValue) / bucketSize).toInt, buckets - 1)
      histogram.synchronized {
        histogram(bucketIndex) += 1
      }
    }

    histogram
  }
}

Fork/Join Framework

import java.util.concurrent.{ForkJoinPool, RecursiveTask, RecursiveAction}

// Custom Fork/Join tasks for fine-grained control
class ForkJoinExamples {

  // Parallel merge sort using Fork/Join
  class ParallelMergeSort(
    private val array: Array[Int],
    private val low: Int,
    private val high: Int
  ) extends RecursiveAction {

    private val THRESHOLD = 1000

    override def compute(): Unit = {
      if (high - low <= THRESHOLD) {
        // Sequential sort for small arrays
        java.util.Arrays.sort(array, low, high + 1)
      } else {
        val mid = (low + high) / 2

        val leftTask = new ParallelMergeSort(array, low, mid)
        val rightTask = new ParallelMergeSort(array, mid + 1, high)

        // Fork both subtasks
        leftTask.fork()
        rightTask.compute()
        leftTask.join()

        // Merge the sorted halves
        merge(array, low, mid, high)
      }
    }

    private def merge(arr: Array[Int], low: Int, mid: Int, high: Int): Unit = {
      val temp = new Array[Int](high - low + 1)
      var i = low
      var j = mid + 1
      var k = 0

      while (i <= mid && j <= high) {
        if (arr(i) <= arr(j)) {
          temp(k) = arr(i)
          i += 1
        } else {
          temp(k) = arr(j)
          j += 1
        }
        k += 1
      }

      while (i <= mid) {
        temp(k) = arr(i)
        i += 1
        k += 1
      }

      while (j <= high) {
        temp(k) = arr(j)
        j += 1
        k += 1
      }

      System.arraycopy(temp, 0, arr, low, temp.length)
    }
  }

  // Parallel matrix multiplication
  class ParallelMatrixMultiply(
    private val a: Array[Array[Double]],
    private val b: Array[Array[Double]],
    private val result: Array[Array[Double]],
    private val startRow: Int,
    private val endRow: Int
  ) extends RecursiveAction {

    private val THRESHOLD = 50

    override def compute(): Unit = {
      if (endRow - startRow <= THRESHOLD) {
        // Sequential computation for small ranges
        for (i <- startRow until endRow) {
          for (j <- b(0).indices) {
            var sum = 0.0
            for (k <- a(0).indices) {
              sum += a(i)(k) * b(k)(j)
            }
            result(i)(j) = sum
          }
        }
      } else {
        val mid = (startRow + endRow) / 2
        val task1 = new ParallelMatrixMultiply(a, b, result, startRow, mid)
        val task2 = new ParallelMatrixMultiply(a, b, result, mid, endRow)

        invokeAll(task1, task2)
      }
    }
  }

  // Parallel tree traversal
  sealed trait Tree[T] {
    def value: T
  }
  case class Leaf[T](value: T) extends Tree[T]
  case class Branch[T](value: T, left: Tree[T], right: Tree[T]) extends Tree[T]

  class ParallelTreeSum(private val tree: Tree[Int]) extends RecursiveTask[Int] {
    override def compute(): Int = tree match {
      case Leaf(value) => value
      case Branch(value, left, right) =>
        val leftTask = new ParallelTreeSum(left)
        val rightTask = new ParallelTreeSum(right)

        leftTask.fork()
        val rightResult = rightTask.compute()
        val leftResult = leftTask.join()

        value + leftResult + rightResult
    }
  }

  // Usage examples
  def demonstrateForkJoin(): Unit = {
    val forkJoinPool = new ForkJoinPool()

    // Parallel sort
    val array = Array.fill(100000)(Random.nextInt(1000))
    val sortTask = new ParallelMergeSort(array, 0, array.length - 1)
    forkJoinPool.invoke(sortTask)

    // Parallel matrix multiplication
    val size = 500
    val matrixA = Array.fill(size, size)(Random.nextDouble())
    val matrixB = Array.fill(size, size)(Random.nextDouble())
    val result = Array.fill(size, size)(0.0)

    val multiplyTask = new ParallelMatrixMultiply(matrixA, matrixB, result, 0, size)
    forkJoinPool.invoke(multiplyTask)

    forkJoinPool.shutdown()
  }
}

Software Transactional Memory (STM)

STM Implementation

// Software Transactional Memory for lock-free concurrency
import java.util.concurrent.atomic.AtomicReference

// Simple STM implementation
class STMRef[T](initialValue: T) {
  private val ref = new AtomicReference[T](initialValue)

  def get: T = ref.get()

  def set(newValue: T): Unit = ref.set(newValue)

  def compareAndSet(expected: T, update: T): Boolean = 
    ref.compareAndSet(expected, update)

  def transform[R](f: T => (T, R)): R = {
    var result: R = null.asInstanceOf[R]
    var success = false

    while (!success) {
      val current = ref.get()
      val (newValue, returnValue) = f(current)
      success = ref.compareAndSet(current, newValue)
      if (success) result = returnValue
    }

    result
  }

  def update(f: T => T): Unit = transform(current => (f(current), ()))
}

// Transaction monad for composing STM operations
case class STM[A](run: () => A) {
  def map[B](f: A => B): STM[B] = STM(() => f(run()))

  def flatMap[B](f: A => STM[B]): STM[B] = STM(() => f(run()).run())

  def retry: STM[A] = this // Simplified - real STM would implement proper retry
}

object STM {
  def pure[A](a: A): STM[A] = STM(() => a)

  def read[T](ref: STMRef[T]): STM[T] = STM(() => ref.get)

  def write[T](ref: STMRef[T], value: T): STM[Unit] = STM(() => ref.set(value))

  def modify[T](ref: STMRef[T])(f: T => T): STM[Unit] = 
    STM(() => ref.update(f))

  // Execute transaction atomically
  def atomically[A](transaction: STM[A]): A = {
    // Simplified implementation - real STM would handle conflicts and retries
    transaction.run()
  }
}

// Bank account example using STM
class Account(initialBalance: Int) {
  private val balance = new STMRef(initialBalance)

  def getBalance: Int = balance.get

  def deposit(amount: Int): STM[Unit] = 
    STM.modify(balance)(_ + amount)

  def withdraw(amount: Int): STM[Boolean] = for {
    currentBalance <- STM.read(balance)
    result <- if (currentBalance >= amount) {
      STM.modify(balance)(_ - amount).map(_ => true)
    } else {
      STM.pure(false)
    }
  } yield result

  def transfer(to: Account, amount: Int): STM[Boolean] = for {
    withdrawn <- withdraw(amount)
    result <- if (withdrawn) {
      to.deposit(amount).map(_ => true)
    } else {
      STM.pure(false)
    }
  } yield result
}

// Dining philosophers problem using STM
class DiningPhilosophers {
  case class Fork(id: Int) {
    private val inUse = new STMRef(false)

    def tryTake(): STM[Boolean] = for {
      available <- STM.read(inUse)
      result <- if (!available) {
        STM.write(inUse, true).map(_ => true)
      } else {
        STM.pure(false)
      }
    } yield result

    def release(): STM[Unit] = STM.write(inUse, false)
  }

  class Philosopher(id: Int, leftFork: Fork, rightFork: Fork) {
    def dine(): STM[Unit] = for {
      leftTaken <- leftFork.tryTake()
      rightTaken <- if (leftTaken) rightFork.tryTake() else STM.pure(false)
      _ <- if (leftTaken && rightTaken) {
        for {
          _ <- STM.pure(println(s"Philosopher $id is eating"))
          _ <- leftFork.release()
          _ <- rightFork.release()
        } yield ()
      } else {
        if (leftTaken) leftFork.release() else STM.pure(())
      }
    } yield ()
  }
}

Lock-Free Data Structures

import java.util.concurrent.atomic.{AtomicReference, AtomicInteger}

// Lock-free stack implementation
class LockFreeStack[T] {
  private case class Node(data: T, next: Option[Node])
  private val head = new AtomicReference[Option[Node]](None)

  @annotation.tailrec
  final def push(item: T): Unit = {
    val currentHead = head.get()
    val newNode = Node(item, currentHead)
    if (!head.compareAndSet(currentHead, Some(newNode))) {
      push(item) // Retry on conflict
    }
  }

  @annotation.tailrec
  final def pop(): Option[T] = {
    head.get() match {
      case None => None
      case Some(node) =>
        if (head.compareAndSet(Some(node), node.next)) {
          Some(node.data)
        } else {
          pop() // Retry on conflict
        }
    }
  }

  def isEmpty: Boolean = head.get().isEmpty

  def size: Int = {
    @annotation.tailrec
    def count(node: Option[Node], acc: Int): Int = node match {
      case None => acc
      case Some(n) => count(n.next, acc + 1)
    }
    count(head.get(), 0)
  }
}

// Lock-free queue using Michael & Scott algorithm
class LockFreeQueue[T] {
  private case class Node(data: Option[T], next: AtomicReference[Option[Node]]) {
    def this() = this(None, new AtomicReference(None))
    def this(data: T) = this(Some(data), new AtomicReference(None))
  }

  private val dummy = new Node()
  private val head = new AtomicReference(dummy)
  private val tail = new AtomicReference(dummy)

  def enqueue(item: T): Unit = {
    val newNode = new Node(item)

    while (true) {
      val currentTail = tail.get()
      val tailNext = currentTail.next.get()

      if (currentTail == tail.get()) { // Check for consistency
        if (tailNext == None) {
          // Try to link new node at the end of list
          if (currentTail.next.compareAndSet(None, Some(newNode))) {
            tail.compareAndSet(currentTail, newNode) // Try to swing tail to new node
            return
          }
        } else {
          // Try to swing tail to next node
          tail.compareAndSet(currentTail, tailNext.get)
        }
      }
    }
  }

  def dequeue(): Option[T] = {
    while (true) {
      val currentHead = head.get()
      val currentTail = tail.get()
      val headNext = currentHead.next.get()

      if (currentHead == head.get()) { // Check for consistency
        if (currentHead == currentTail) {
          if (headNext == None) {
            return None // Queue is empty
          }
          // Try to advance tail
          tail.compareAndSet(currentTail, headNext.get)
        } else {
          // Try to read data
          headNext match {
            case Some(next) =>
              if (head.compareAndSet(currentHead, next)) {
                return next.data
              }
            case None => // Continue retry
          }
        }
      }
    }
    None // Should never reach here
  }
}

// Lock-free counter with backoff
class LockFreeCounter {
  private val count = new AtomicInteger(0)
  private val maxBackoff = 1000 // microseconds

  def increment(): Int = {
    var backoff = 1
    while (true) {
      val current = count.get()
      val next = current + 1

      if (count.compareAndSet(current, next)) {
        return next
      }

      // Exponential backoff to reduce contention
      if (backoff < maxBackoff) {
        Thread.sleep(0, backoff * 1000) // nanoseconds
        backoff *= 2
      }
    }
    -1 // Should never reach here
  }

  def get: Int = count.get()

  def compareAndIncrement(expected: Int): Boolean = 
    count.compareAndSet(expected, expected + 1)
}

Distributed Computing Patterns

Message Passing and Remote Actors

// Remote actor communication patterns
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._

// Distributed work distribution pattern
object WorkDistribution {

  // Work item definition
  case class WorkItem(id: String, payload: Array[Byte])
  case class WorkResult(id: String, result: String)

  // Master actor that distributes work
  class MasterActor extends Actor {
    import context.dispatcher

    private var workers = List.empty[ActorRef]
    private var pendingWork = List.empty[WorkItem]
    private var workInProgress = Map.empty[String, ActorRef]
    private var results = Map.empty[String, WorkResult]

    def receive = {
      case "RegisterWorker" =>
        workers = sender() :: workers
        println(s"Worker registered: ${sender()}")
        processNextWork()

      case work: WorkItem =>
        pendingWork = work :: pendingWork
        processNextWork()

      case result: WorkResult =>
        workInProgress.get(result.id) match {
          case Some(worker) =>
            workInProgress -= result.id
            results += result.id -> result
            println(s"Work ${result.id} completed by $worker")
            processNextWork()
          case None =>
            println(s"Received result for unknown work: ${result.id}")
        }

      case "GetResults" =>
        sender() ! results.values.toList
    }

    private def processNextWork(): Unit = {
      (pendingWork, workers.filterNot(workInProgress.values.toSet)) match {
        case (work :: remainingWork, availableWorker :: _) =>
          pendingWork = remainingWork
          workInProgress += work.id -> availableWorker
          availableWorker ! work
          println(s"Assigned work ${work.id} to $availableWorker")
          processNextWork() // Try to assign more work
        case _ => // No work or no available workers
      }
    }
  }

  // Worker actor that processes work
  class WorkerActor extends Actor {
    import context.dispatcher

    override def preStart(): Unit = {
      // Register with master on startup
      context.system.actorSelection("/user/master") ! "RegisterWorker"
    }

    def receive = {
      case work: WorkItem =>
        println(s"Processing work: ${work.id}")

        // Simulate work processing
        Future {
          Thread.sleep(Random.nextInt(2000) + 1000)
          val result = s"Processed: ${work.payload.length} bytes"
          WorkResult(work.id, result)
        }.onComplete {
          case scala.util.Success(result) => sender() ! result
          case scala.util.Failure(ex) => 
            println(s"Work ${work.id} failed: ${ex.getMessage}")
        }
    }
  }
}

// Cluster-aware actor system
class ClusterManager {
  import akka.cluster.Cluster
  import akka.cluster.ClusterEvent._

  class ClusterListener extends Actor {
    val cluster = Cluster(context.system)

    override def preStart(): Unit = {
      cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
        classOf[MemberEvent], classOf[UnreachableMember])
    }

    override def postStop(): Unit = cluster.unsubscribe(self)

    def receive = {
      case MemberUp(member) =>
        println(s"Member is Up: ${member.address}")

      case UnreachableMember(member) =>
        println(s"Member detected as unreachable: ${member}")

      case MemberRemoved(member, previousStatus) =>
        println(s"Member is Removed: ${member.address} after $previousStatus")

      case _: MemberEvent => // ignore
    }
  }

  // Distributed cache using cluster sharding
  object DistributedCache {
    import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}

    case class Get(key: String)
    case class Put(key: String, value: String)
    case class CacheEntry(value: String, timestamp: Long)

    class CacheActor extends Actor {
      private var cache = Map.empty[String, CacheEntry]

      def receive = {
        case Get(key) =>
          sender() ! cache.get(key).map(_.value)

        case Put(key, value) =>
          cache += key -> CacheEntry(value, System.currentTimeMillis())
          sender() ! "OK"
      }
    }

    def start(system: ActorSystem): ActorRef = {
      val extractEntityId: ShardRegion.ExtractEntityId = {
        case Get(key) => (key.hashCode.toString, Get(key))
        case Put(key, value) => (key.hashCode.toString, Put(key, value))
      }

      val extractShardId: ShardRegion.ExtractShardId = {
        case Get(key) => (key.hashCode % 10).toString
        case Put(key, _) => (key.hashCode % 10).toString
      }

      ClusterSharding(system).start(
        typeName = "Cache",
        entityProps = Props[CacheActor],
        settings = ClusterShardingSettings(system),
        extractEntityId = extractEntityId,
        extractShardId = extractShardId
      )
    }
  }
}

Event Sourcing and CQRS

// Event sourcing pattern for distributed systems
object EventSourcing {

  // Domain events
  sealed trait Event {
    def entityId: String
    def timestamp: Long
  }

  case class AccountCreated(entityId: String, initialBalance: BigDecimal, timestamp: Long) extends Event
  case class MoneyDeposited(entityId: String, amount: BigDecimal, timestamp: Long) extends Event
  case class MoneyWithdrawn(entityId: String, amount: BigDecimal, timestamp: Long) extends Event
  case class AccountClosed(entityId: String, timestamp: Long) extends Event

  // Aggregate state
  sealed trait AccountState
  case object NonExistent extends AccountState
  case class Active(balance: BigDecimal) extends AccountState
  case object Closed extends AccountState

  // Event store
  trait EventStore {
    def saveEvents(entityId: String, events: List[Event], expectedVersion: Int): Future[Unit]
    def getEvents(entityId: String): Future[List[Event]]
    def getEventsFromVersion(entityId: String, version: Int): Future[List[Event]]
  }

  // In-memory event store implementation
  class InMemoryEventStore extends EventStore {
    private val events = scala.collection.mutable.Map[String, List[Event]]()

    def saveEvents(entityId: String, newEvents: List[Event], expectedVersion: Int): Future[Unit] = {
      val currentEvents = events.getOrElse(entityId, List.empty)
      if (currentEvents.length != expectedVersion) {
        Future.failed(new Exception("Concurrency conflict"))
      } else {
        events(entityId) = currentEvents ++ newEvents
        Future.successful(())
      }
    }

    def getEvents(entityId: String): Future[List[Event]] = 
      Future.successful(events.getOrElse(entityId, List.empty))

    def getEventsFromVersion(entityId: String, version: Int): Future[List[Event]] = 
      Future.successful(events.getOrElse(entityId, List.empty).drop(version))
  }

  // Aggregate
  class Account(id: String, eventStore: EventStore) {
    implicit val ec: ExecutionContext = ExecutionContext.global

    private var currentState: AccountState = NonExistent
    private var version: Int = 0

    def create(initialBalance: BigDecimal): Future[Unit] = {
      currentState match {
        case NonExistent =>
          val event = AccountCreated(id, initialBalance, System.currentTimeMillis())
          applyEvent(event)
          eventStore.saveEvents(id, List(event), version)
        case _ =>
          Future.failed(new Exception("Account already exists"))
      }
    }

    def deposit(amount: BigDecimal): Future[Unit] = {
      currentState match {
        case Active(_) =>
          val event = MoneyDeposited(id, amount, System.currentTimeMillis())
          applyEvent(event)
          eventStore.saveEvents(id, List(event), version)
        case _ =>
          Future.failed(new Exception("Account not active"))
      }
    }

    def withdraw(amount: BigDecimal): Future[Unit] = {
      currentState match {
        case Active(balance) if balance >= amount =>
          val event = MoneyWithdrawn(id, amount, System.currentTimeMillis())
          applyEvent(event)
          eventStore.saveEvents(id, List(event), version)
        case Active(_) =>
          Future.failed(new Exception("Insufficient funds"))
        case _ =>
          Future.failed(new Exception("Account not active"))
      }
    }

    def getBalance: BigDecimal = currentState match {
      case Active(balance) => balance
      case _ => BigDecimal(0)
    }

    def loadFromHistory(): Future[Unit] = {
      eventStore.getEvents(id).map { events =>
        events.foreach(applyEvent)
      }
    }

    private def applyEvent(event: Event): Unit = {
      currentState = currentState match {
        case NonExistent => event match {
          case AccountCreated(_, initialBalance, _) => Active(initialBalance)
          case _ => NonExistent
        }
        case Active(balance) => event match {
          case MoneyDeposited(_, amount, _) => Active(balance + amount)
          case MoneyWithdrawn(_, amount, _) => Active(balance - amount)
          case AccountClosed(_, _) => Closed
          case _ => currentState
        }
        case Closed => currentState
      }
      version += 1
    }
  }

  // Read model for CQRS
  case class AccountSummary(id: String, balance: BigDecimal, transactionCount: Int, lastActivity: Long)

  class AccountProjection(eventStore: EventStore) {
    implicit val ec: ExecutionContext = ExecutionContext.global

    private val summaries = scala.collection.mutable.Map[String, AccountSummary]()

    def getAccountSummary(accountId: String): Option[AccountSummary] = 
      summaries.get(accountId)

    def getAllAccounts: List[AccountSummary] = summaries.values.toList

    def updateProjection(accountId: String): Future[Unit] = {
      eventStore.getEvents(accountId).map { events =>
        val summary = events.foldLeft(AccountSummary(accountId, BigDecimal(0), 0, 0L)) { (acc, event) =>
          event match {
            case AccountCreated(_, initialBalance, timestamp) =>
              acc.copy(balance = initialBalance, transactionCount = acc.transactionCount + 1, lastActivity = timestamp)
            case MoneyDeposited(_, amount, timestamp) =>
              acc.copy(balance = acc.balance + amount, transactionCount = acc.transactionCount + 1, lastActivity = timestamp)
            case MoneyWithdrawn(_, amount, timestamp) =>
              acc.copy(balance = acc.balance - amount, transactionCount = acc.transactionCount + 1, lastActivity = timestamp)
            case AccountClosed(_, timestamp) =>
              acc.copy(lastActivity = timestamp)
          }
        }
        summaries(accountId) = summary
      }
    }
  }
}

Performance Monitoring and Optimization

Profiling Concurrent Applications

// Performance monitoring for concurrent applications
class ConcurrencyProfiler {

  // Thread contention monitor
  class ContentionMonitor {
    private val threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean()

    case class ThreadInfo(
      name: String,
      state: Thread.State,
      cpuTime: Long,
      blockedTime: Long,
      waitTime: Long
    )

    def getThreadContention: Map[String, ThreadInfo] = {
      threadMXBean.getAllThreadIds.map { id =>
        val info = threadMXBean.getThreadInfo(id)
        val cpuTime = if (threadMXBean.isThreadCpuTimeSupported) {
          threadMXBean.getThreadCpuTime(id)
        } else 0L

        info.getThreadName -> ThreadInfo(
          info.getThreadName,
          info.getThreadState,
          cpuTime,
          info.getBlockedTime,
          info.getWaitedTime
        )
      }.toMap
    }

    def printContentionReport(): Unit = {
      val contentions = getThreadContention
      println("Thread Contention Report:")
      println("=" * 50)

      contentions.values.groupBy(_.state).foreach { case (state, threads) =>
        println(s"$state: ${threads.size} threads")
        threads.take(5).foreach { thread =>
          println(f"  ${thread.name}%-30s CPU: ${thread.cpuTime / 1000000}%6d ms")
        }
      }
    }
  }

  // Lock contention detector
  class LockContentionDetector {
    import java.lang.management.ManagementFactory

    def detectDeadlocks(): Option[String] = {
      val threadMXBean = ManagementFactory.getThreadMXBean()
      Option(threadMXBean.findDeadlockedThreads()).map { deadlockedThreads =>
        val threadInfos = threadMXBean.getThreadInfo(deadlockedThreads)
        val report = new StringBuilder("DEADLOCK DETECTED:\n")

        threadInfos.foreach { info =>
          report.append(s"Thread: ${info.getThreadName}\n")
          report.append(s"  State: ${info.getThreadState}\n")
          report.append(s"  Lock: ${info.getLockName}\n")
          report.append(s"  Lock Owner: ${info.getLockOwnerName}\n")
          report.append("\n")
        }

        report.toString()
      }
    }

    def monitorLockContention(interval: FiniteDuration): Unit = {
      val scheduler = Executors.newSingleThreadScheduledExecutor()

      scheduler.scheduleAtFixedRate(
        () => {
          detectDeadlocks().foreach { report =>
            println(report)
            // In real application, would log or alert
          }
        },
        0,
        interval.toMillis,
        TimeUnit.MILLISECONDS
      )
    }
  }

  // Performance metrics collector
  class PerformanceMetrics {
    private val requestCounts = new java.util.concurrent.ConcurrentHashMap[String, AtomicInteger]()
    private val responseTimes = new java.util.concurrent.ConcurrentHashMap[String, AtomicReference[List[Long]]]()

    def recordRequest(operation: String, responseTime: Long): Unit = {
      requestCounts.computeIfAbsent(operation, _ => new AtomicInteger(0)).incrementAndGet()

      responseTimes.computeIfAbsent(operation, _ => new AtomicReference(List.empty[Long]))
        .updateAndGet(times => (responseTime :: times).take(1000)) // Keep last 1000 measurements
    }

    def getMetrics: Map[String, (Int, Double, Long, Long)] = {
      requestCounts.asScala.map { case (operation, count) =>
        val times = responseTimes.get(operation).get()
        val avgTime = if (times.nonEmpty) times.sum.toDouble / times.length else 0.0
        val minTime = if (times.nonEmpty) times.min else 0L
        val maxTime = if (times.nonEmpty) times.max else 0L

        operation -> (count.get(), avgTime, minTime, maxTime)
      }.toMap
    }

    def printReport(): Unit = {
      println("Performance Metrics Report:")
      println("=" * 70)
      println(f"${"Operation"}%-20s ${"Count"}%8s ${"Avg(ms)"}%10s ${"Min(ms)"}%10s ${"Max(ms)"}%10s")
      println("-" * 70)

      getMetrics.foreach { case (operation, (count, avg, min, max)) =>
        println(f"$operation%-20s $count%8d $avg%10.2f $min%10d $max%10d")
      }
    }
  }

  // Benchmark utility for concurrent operations
  def benchmarkConcurrentOperation[T](
    name: String,
    operation: () => T,
    threads: Int,
    iterations: Int
  ): Unit = {
    val latch = new CountDownLatch(threads)
    val startTime = System.nanoTime()
    val results = new Array[Long](threads)

    val threadPool = Executors.newFixedThreadPool(threads)

    for (i <- 0 until threads) {
      threadPool.submit(new Runnable {
        def run(): Unit = {
          val threadStart = System.nanoTime()

          for (_ <- 0 until iterations) {
            operation()
          }

          val threadEnd = System.nanoTime()
          results(i) = threadEnd - threadStart
          latch.countDown()
        }
      })
    }

    latch.await()
    val endTime = System.nanoTime()

    val totalTime = (endTime - startTime) / 1000000 // Convert to milliseconds
    val avgThreadTime = results.sum / (results.length * 1000000)
    val throughput = (threads * iterations * 1000) / totalTime

    println(s"Benchmark: $name")
    println(s"  Total time: ${totalTime}ms")
    println(s"  Average thread time: ${avgThreadTime}ms")
    println(s"  Throughput: ${throughput.toInt} ops/sec")
    println(s"  Threads: $threads, Iterations per thread: $iterations")
    println()

    threadPool.shutdown()
  }
}

Conclusion

Advanced concurrency and parallel computing in Scala provides multiple approaches for building high-performance, scalable applications. Key concepts and patterns include:

Concurrency Fundamentals:

  • Understanding the difference between concurrency and parallelism
  • Proper thread management and execution context selection
  • Thread safety mechanisms and synchronization primitives
  • Performance monitoring and bottleneck identification

Advanced Future Patterns:

  • Composition and error handling strategies
  • Timeout and retry mechanisms
  • Circuit breaker and bulkhead patterns
  • Async/await for cleaner asynchronous code

Parallel Computing:

  • Parallel collections for data parallelism
  • Fork/Join framework for divide-and-conquer algorithms
  • Custom thread pools for different workload types
  • Lock-free programming for maximum performance

Software Transactional Memory:

  • STM for composable concurrent operations
  • Lock-free data structures and algorithms
  • Compare-and-swap operations
  • Optimistic concurrency control

Distributed Patterns:

  • Message passing and remote actor communication
  • Event sourcing and CQRS for scalable architectures
  • Cluster management and distributed caching
  • Work distribution and load balancing

Performance Optimization:

  • Profiling and monitoring concurrent applications
  • Detecting and resolving contention issues
  • Benchmarking concurrent operations
  • Memory-efficient parallel processing

Best Practices:

  • Choose appropriate concurrency models for specific use cases
  • Minimize shared mutable state
  • Use immutable data structures when possible
  • Design for fault tolerance and graceful degradation
  • Monitor and measure performance regularly

These advanced concurrency patterns enable the development of systems that can efficiently utilize modern multi-core hardware and scale across distributed environments while maintaining correctness and reliability.