Concurrency Patterns and Parallel Computing: Beyond Futures and Actors
Modern applications demand high performance and responsiveness, requiring sophisticated concurrency and parallelism strategies. In this comprehensive lesson, we'll explore advanced concurrency patterns in Scala, from low-level thread management to distributed computing architectures.
Understanding Concurrency vs Parallelism
Fundamental Concepts
Concurrency and parallelism are related but distinct concepts that are crucial for modern application development.
import java.util.concurrent._
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{Success, Failure, Random}
// Concurrency: Dealing with multiple tasks at once (may run on single core)
// Parallelism: Actually executing multiple tasks simultaneously (requires multiple cores)
// Basic thread management
class ThreadExample {
def demonstrateThreads(): Unit = {
val thread1 = new Thread(() => {
for (i <- 1 to 5) {
println(s"Thread 1: $i")
Thread.sleep(100)
}
})
val thread2 = new Thread(() => {
for (i <- 1 to 5) {
println(s"Thread 2: $i")
Thread.sleep(150)
}
})
thread1.start()
thread2.start()
thread1.join()
thread2.join()
println("Both threads completed")
}
}
// Thread-safe shared state with synchronization
class Counter {
@volatile private var count: Int = 0
private val lock = new Object
def increment(): Int = lock.synchronized {
count += 1
count
}
def get: Int = count // @volatile ensures visibility
def incrementAtomic(): Int = {
import java.util.concurrent.atomic.AtomicInteger
val atomicCount = new AtomicInteger(count)
atomicCount.incrementAndGet()
}
}
// Race condition demonstration
def demonstrateRaceCondition(): Unit = {
val counter = new Counter
val threads = for (i <- 1 to 10) yield new Thread(() => {
for (_ <- 1 to 1000) {
counter.increment()
}
})
threads.foreach(_.start())
threads.foreach(_.join())
println(s"Final count: ${counter.get}") // Should be 10000
}
Thread Pools and Execution Contexts
// Custom execution contexts for different types of work
object ExecutionContexts {
// CPU-intensive work
val cpuBound: ExecutionContext = ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors())
)
// I/O-intensive work
val ioBound: ExecutionContext = ExecutionContext.fromExecutor(
Executors.newCachedThreadPool()
)
// Single-threaded for sequential operations
val sequential: ExecutionContext = ExecutionContext.fromExecutor(
Executors.newSingleThreadExecutor()
)
// Custom thread pool with monitoring
def createMonitoredThreadPool(name: String, poolSize: Int): ExecutionContext = {
val threadFactory = new ThreadFactory {
private val counter = new java.util.concurrent.atomic.AtomicInteger(0)
def newThread(r: Runnable): Thread = {
val thread = new Thread(r, s"$name-${counter.incrementAndGet()}")
thread.setDaemon(false)
thread.setUncaughtExceptionHandler((t, e) => {
println(s"Uncaught exception in thread ${t.getName}: ${e.getMessage}")
e.printStackTrace()
})
thread
}
}
ExecutionContext.fromExecutor(
new ThreadPoolExecutor(
poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue[Runnable](),
threadFactory
)
)
}
}
// Using different execution contexts
class WorkloadManager {
import ExecutionContexts._
def performCpuIntensiveTask(data: List[Int]): Future[Int] = {
Future {
// Simulate CPU-intensive computation
data.map(x => x * x).sum
}(cpuBound)
}
def performIoTask(url: String): Future[String] = {
Future {
// Simulate I/O operation
Thread.sleep(1000)
s"Data from $url"
}(ioBound)
}
def performSequentialTask(items: List[String]): Future[List[String]] = {
Future {
items.map(_.toUpperCase)
}(sequential)
}
}
Advanced Future Patterns
Future Composition and Error Handling
// Advanced Future composition techniques
class FutureCompositionExamples {
implicit val ec: ExecutionContext = ExecutionContext.global
// Sequential vs Parallel execution
def sequentialExecution(urls: List[String]): Future[List[String]] = {
urls.foldLeft(Future.successful(List.empty[String])) { (acc, url) =>
for {
results <- acc
result <- fetchData(url)
} yield results :+ result
}
}
def parallelExecution(urls: List[String]): Future[List[String]] = {
val futures = urls.map(fetchData)
Future.sequence(futures)
}
// Timeout handling
def withTimeout[T](future: Future[T], timeout: FiniteDuration): Future[T] = {
val promise = Promise[T]()
future.onComplete(promise.tryComplete)
Future {
Thread.sleep(timeout.toMillis)
promise.tryFailure(new TimeoutException(s"Operation timed out after $timeout"))
}
promise.future
}
// Retry mechanism
def retry[T](operation: () => Future[T], maxRetries: Int, delay: FiniteDuration): Future[T] = {
def attempt(retriesLeft: Int): Future[T] = {
operation().recoverWith {
case ex if retriesLeft > 0 =>
println(s"Operation failed, retrying... ($retriesLeft retries left)")
Future {
Thread.sleep(delay.toMillis)
}.flatMap(_ => attempt(retriesLeft - 1))
case ex =>
Future.failed(ex)
}
}
attempt(maxRetries)
}
// Circuit breaker pattern
class CircuitBreaker[T](
failureThreshold: Int,
timeout: FiniteDuration,
resetTimeout: FiniteDuration
) {
private enum State {
case Closed, Open, HalfOpen
}
@volatile private var state: State = State.Closed
@volatile private var failureCount: Int = 0
@volatile private var lastFailureTime: Long = 0
def call(operation: () => Future[T]): Future[T] = {
state match {
case State.Closed =>
executeOperation(operation)
case State.Open =>
if (System.currentTimeMillis() - lastFailureTime > resetTimeout.toMillis) {
state = State.HalfOpen
executeOperation(operation)
} else {
Future.failed(new Exception("Circuit breaker is OPEN"))
}
case State.HalfOpen =>
executeOperation(operation)
}
}
private def executeOperation(operation: () => Future[T]): Future[T] = {
operation().andThen {
case Success(_) =>
failureCount = 0
state = State.Closed
case Failure(_) =>
failureCount += 1
lastFailureTime = System.currentTimeMillis()
if (failureCount >= failureThreshold) {
state = State.Open
}
}
}
}
// Bulkhead pattern - isolate resources
class BulkheadExecutor {
private val criticalPool = ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(2)
)
private val normalPool = ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(8)
)
def executeCritical[T](task: => T): Future[T] =
Future(task)(criticalPool)
def executeNormal[T](task: => T): Future[T] =
Future(task)(normalPool)
}
private def fetchData(url: String): Future[String] = {
Future {
Thread.sleep(Random.nextInt(1000))
if (Random.nextDouble() < 0.1) throw new Exception(s"Failed to fetch $url")
s"Data from $url"
}
}
}
Async/Await Pattern
// Async/await pattern for cleaner asynchronous code
import scala.async.Async.{async, await}
class AsyncAwaitExamples {
implicit val ec: ExecutionContext = ExecutionContext.global
// Traditional Future composition
def traditionalComposition(userId: String): Future[String] = {
fetchUser(userId).flatMap { user =>
fetchProfile(user.id).flatMap { profile =>
fetchPreferences(user.id).map { preferences =>
s"User: ${user.name}, Profile: ${profile.bio}, Theme: ${preferences.theme}"
}
}
}
}
// Async/await version
def asyncAwaitComposition(userId: String): Future[String] = async {
val user = await(fetchUser(userId))
val profile = await(fetchProfile(user.id))
val preferences = await(fetchPreferences(user.id))
s"User: ${user.name}, Profile: ${profile.bio}, Theme: ${preferences.theme}"
}
// Parallel execution with async/await
def parallelAsyncAwait(userId: String): Future[String] = async {
val user = await(fetchUser(userId))
// These can run in parallel
val profileFuture = fetchProfile(user.id)
val preferencesFuture = fetchPreferences(user.id)
val profile = await(profileFuture)
val preferences = await(preferencesFuture)
s"User: ${user.name}, Profile: ${profile.bio}, Theme: ${preferences.theme}"
}
// Error handling with async/await
def asyncAwaitWithErrorHandling(userId: String): Future[Either[String, String]] = async {
try {
val user = await(fetchUser(userId))
val profile = await(fetchProfile(user.id))
Right(s"User: ${user.name}, Profile: ${profile.bio}")
} catch {
case ex: Exception => Left(s"Error: ${ex.getMessage}")
}
}
// Helper classes and methods
case class User(id: String, name: String)
case class Profile(bio: String)
case class Preferences(theme: String)
private def fetchUser(id: String): Future[User] =
Future(User(id, s"User$id"))
private def fetchProfile(userId: String): Future[Profile] =
Future(Profile(s"Bio for $userId"))
private def fetchPreferences(userId: String): Future[Preferences] =
Future(Preferences("dark"))
}
Parallel Collections
Built-in Parallel Collections
// Parallel collections for data parallelism
class ParallelCollectionExamples {
// CPU-intensive operations benefit from parallelization
def demonstrateParallelCollections(): Unit = {
val largeList = (1 to 10000000).toList
// Sequential processing
val start1 = System.currentTimeMillis()
val sequentialResult = largeList.map(x => math.sqrt(x * x + 1)).sum
val sequential Time = System.currentTimeMillis() - start1
// Parallel processing
val start2 = System.currentTimeMillis()
val parallelResult = largeList.par.map(x => math.sqrt(x * x + 1)).sum
val parallelTime = System.currentTimeMillis() - start2
println(s"Sequential: ${sequentialTime}ms, Result: $sequentialResult")
println(s"Parallel: ${parallelTime}ms, Result: $parallelResult")
println(s"Speedup: ${sequentialTime.toDouble / parallelTime}x")
}
// Custom parallel operations
def parallelGroupBy[T, K](collection: List[T])(keyFunc: T => K): Map[K, List[T]] = {
collection.par
.groupBy(keyFunc)
.map { case (k, v) => k -> v.toList }
.seq
.toMap
}
def parallelFilter[T](collection: List[T])(predicate: T => Boolean): List[T] = {
collection.par.filter(predicate).toList
}
// Parallel fold with custom operations
def parallelWordCount(texts: List[String]): Map[String, Int] = {
texts.par
.flatMap(_.toLowerCase.split("\\s+"))
.groupBy(identity)
.mapValues(_.size)
.seq
.toMap
}
// Monte Carlo simulation using parallel collections
def estimatePi(samples: Int): Double = {
val hits = (1 to samples).par.count { _ =>
val x = Random.nextDouble()
val y = Random.nextDouble()
x * x + y * y <= 1.0
}
4.0 * hits / samples
}
// Custom parallel aggregation
def parallelHistogram[T](data: List[T], buckets: Int)(valueFunc: T => Double): Array[Int] = {
val minValue = data.par.map(valueFunc).min
val maxValue = data.par.map(valueFunc).max
val range = maxValue - minValue
val bucketSize = range / buckets
val histogram = Array.fill(buckets)(0)
data.par.foreach { item =>
val value = valueFunc(item)
val bucketIndex = math.min(((value - minValue) / bucketSize).toInt, buckets - 1)
histogram.synchronized {
histogram(bucketIndex) += 1
}
}
histogram
}
}
Fork/Join Framework
import java.util.concurrent.{ForkJoinPool, RecursiveTask, RecursiveAction}
// Custom Fork/Join tasks for fine-grained control
class ForkJoinExamples {
// Parallel merge sort using Fork/Join
class ParallelMergeSort(
private val array: Array[Int],
private val low: Int,
private val high: Int
) extends RecursiveAction {
private val THRESHOLD = 1000
override def compute(): Unit = {
if (high - low <= THRESHOLD) {
// Sequential sort for small arrays
java.util.Arrays.sort(array, low, high + 1)
} else {
val mid = (low + high) / 2
val leftTask = new ParallelMergeSort(array, low, mid)
val rightTask = new ParallelMergeSort(array, mid + 1, high)
// Fork both subtasks
leftTask.fork()
rightTask.compute()
leftTask.join()
// Merge the sorted halves
merge(array, low, mid, high)
}
}
private def merge(arr: Array[Int], low: Int, mid: Int, high: Int): Unit = {
val temp = new Array[Int](high - low + 1)
var i = low
var j = mid + 1
var k = 0
while (i <= mid && j <= high) {
if (arr(i) <= arr(j)) {
temp(k) = arr(i)
i += 1
} else {
temp(k) = arr(j)
j += 1
}
k += 1
}
while (i <= mid) {
temp(k) = arr(i)
i += 1
k += 1
}
while (j <= high) {
temp(k) = arr(j)
j += 1
k += 1
}
System.arraycopy(temp, 0, arr, low, temp.length)
}
}
// Parallel matrix multiplication
class ParallelMatrixMultiply(
private val a: Array[Array[Double]],
private val b: Array[Array[Double]],
private val result: Array[Array[Double]],
private val startRow: Int,
private val endRow: Int
) extends RecursiveAction {
private val THRESHOLD = 50
override def compute(): Unit = {
if (endRow - startRow <= THRESHOLD) {
// Sequential computation for small ranges
for (i <- startRow until endRow) {
for (j <- b(0).indices) {
var sum = 0.0
for (k <- a(0).indices) {
sum += a(i)(k) * b(k)(j)
}
result(i)(j) = sum
}
}
} else {
val mid = (startRow + endRow) / 2
val task1 = new ParallelMatrixMultiply(a, b, result, startRow, mid)
val task2 = new ParallelMatrixMultiply(a, b, result, mid, endRow)
invokeAll(task1, task2)
}
}
}
// Parallel tree traversal
sealed trait Tree[T] {
def value: T
}
case class Leaf[T](value: T) extends Tree[T]
case class Branch[T](value: T, left: Tree[T], right: Tree[T]) extends Tree[T]
class ParallelTreeSum(private val tree: Tree[Int]) extends RecursiveTask[Int] {
override def compute(): Int = tree match {
case Leaf(value) => value
case Branch(value, left, right) =>
val leftTask = new ParallelTreeSum(left)
val rightTask = new ParallelTreeSum(right)
leftTask.fork()
val rightResult = rightTask.compute()
val leftResult = leftTask.join()
value + leftResult + rightResult
}
}
// Usage examples
def demonstrateForkJoin(): Unit = {
val forkJoinPool = new ForkJoinPool()
// Parallel sort
val array = Array.fill(100000)(Random.nextInt(1000))
val sortTask = new ParallelMergeSort(array, 0, array.length - 1)
forkJoinPool.invoke(sortTask)
// Parallel matrix multiplication
val size = 500
val matrixA = Array.fill(size, size)(Random.nextDouble())
val matrixB = Array.fill(size, size)(Random.nextDouble())
val result = Array.fill(size, size)(0.0)
val multiplyTask = new ParallelMatrixMultiply(matrixA, matrixB, result, 0, size)
forkJoinPool.invoke(multiplyTask)
forkJoinPool.shutdown()
}
}
Software Transactional Memory (STM)
STM Implementation
// Software Transactional Memory for lock-free concurrency
import java.util.concurrent.atomic.AtomicReference
// Simple STM implementation
class STMRef[T](initialValue: T) {
private val ref = new AtomicReference[T](initialValue)
def get: T = ref.get()
def set(newValue: T): Unit = ref.set(newValue)
def compareAndSet(expected: T, update: T): Boolean =
ref.compareAndSet(expected, update)
def transform[R](f: T => (T, R)): R = {
var result: R = null.asInstanceOf[R]
var success = false
while (!success) {
val current = ref.get()
val (newValue, returnValue) = f(current)
success = ref.compareAndSet(current, newValue)
if (success) result = returnValue
}
result
}
def update(f: T => T): Unit = transform(current => (f(current), ()))
}
// Transaction monad for composing STM operations
case class STM[A](run: () => A) {
def map[B](f: A => B): STM[B] = STM(() => f(run()))
def flatMap[B](f: A => STM[B]): STM[B] = STM(() => f(run()).run())
def retry: STM[A] = this // Simplified - real STM would implement proper retry
}
object STM {
def pure[A](a: A): STM[A] = STM(() => a)
def read[T](ref: STMRef[T]): STM[T] = STM(() => ref.get)
def write[T](ref: STMRef[T], value: T): STM[Unit] = STM(() => ref.set(value))
def modify[T](ref: STMRef[T])(f: T => T): STM[Unit] =
STM(() => ref.update(f))
// Execute transaction atomically
def atomically[A](transaction: STM[A]): A = {
// Simplified implementation - real STM would handle conflicts and retries
transaction.run()
}
}
// Bank account example using STM
class Account(initialBalance: Int) {
private val balance = new STMRef(initialBalance)
def getBalance: Int = balance.get
def deposit(amount: Int): STM[Unit] =
STM.modify(balance)(_ + amount)
def withdraw(amount: Int): STM[Boolean] = for {
currentBalance <- STM.read(balance)
result <- if (currentBalance >= amount) {
STM.modify(balance)(_ - amount).map(_ => true)
} else {
STM.pure(false)
}
} yield result
def transfer(to: Account, amount: Int): STM[Boolean] = for {
withdrawn <- withdraw(amount)
result <- if (withdrawn) {
to.deposit(amount).map(_ => true)
} else {
STM.pure(false)
}
} yield result
}
// Dining philosophers problem using STM
class DiningPhilosophers {
case class Fork(id: Int) {
private val inUse = new STMRef(false)
def tryTake(): STM[Boolean] = for {
available <- STM.read(inUse)
result <- if (!available) {
STM.write(inUse, true).map(_ => true)
} else {
STM.pure(false)
}
} yield result
def release(): STM[Unit] = STM.write(inUse, false)
}
class Philosopher(id: Int, leftFork: Fork, rightFork: Fork) {
def dine(): STM[Unit] = for {
leftTaken <- leftFork.tryTake()
rightTaken <- if (leftTaken) rightFork.tryTake() else STM.pure(false)
_ <- if (leftTaken && rightTaken) {
for {
_ <- STM.pure(println(s"Philosopher $id is eating"))
_ <- leftFork.release()
_ <- rightFork.release()
} yield ()
} else {
if (leftTaken) leftFork.release() else STM.pure(())
}
} yield ()
}
}
Lock-Free Data Structures
import java.util.concurrent.atomic.{AtomicReference, AtomicInteger}
// Lock-free stack implementation
class LockFreeStack[T] {
private case class Node(data: T, next: Option[Node])
private val head = new AtomicReference[Option[Node]](None)
@annotation.tailrec
final def push(item: T): Unit = {
val currentHead = head.get()
val newNode = Node(item, currentHead)
if (!head.compareAndSet(currentHead, Some(newNode))) {
push(item) // Retry on conflict
}
}
@annotation.tailrec
final def pop(): Option[T] = {
head.get() match {
case None => None
case Some(node) =>
if (head.compareAndSet(Some(node), node.next)) {
Some(node.data)
} else {
pop() // Retry on conflict
}
}
}
def isEmpty: Boolean = head.get().isEmpty
def size: Int = {
@annotation.tailrec
def count(node: Option[Node], acc: Int): Int = node match {
case None => acc
case Some(n) => count(n.next, acc + 1)
}
count(head.get(), 0)
}
}
// Lock-free queue using Michael & Scott algorithm
class LockFreeQueue[T] {
private case class Node(data: Option[T], next: AtomicReference[Option[Node]]) {
def this() = this(None, new AtomicReference(None))
def this(data: T) = this(Some(data), new AtomicReference(None))
}
private val dummy = new Node()
private val head = new AtomicReference(dummy)
private val tail = new AtomicReference(dummy)
def enqueue(item: T): Unit = {
val newNode = new Node(item)
while (true) {
val currentTail = tail.get()
val tailNext = currentTail.next.get()
if (currentTail == tail.get()) { // Check for consistency
if (tailNext == None) {
// Try to link new node at the end of list
if (currentTail.next.compareAndSet(None, Some(newNode))) {
tail.compareAndSet(currentTail, newNode) // Try to swing tail to new node
return
}
} else {
// Try to swing tail to next node
tail.compareAndSet(currentTail, tailNext.get)
}
}
}
}
def dequeue(): Option[T] = {
while (true) {
val currentHead = head.get()
val currentTail = tail.get()
val headNext = currentHead.next.get()
if (currentHead == head.get()) { // Check for consistency
if (currentHead == currentTail) {
if (headNext == None) {
return None // Queue is empty
}
// Try to advance tail
tail.compareAndSet(currentTail, headNext.get)
} else {
// Try to read data
headNext match {
case Some(next) =>
if (head.compareAndSet(currentHead, next)) {
return next.data
}
case None => // Continue retry
}
}
}
}
None // Should never reach here
}
}
// Lock-free counter with backoff
class LockFreeCounter {
private val count = new AtomicInteger(0)
private val maxBackoff = 1000 // microseconds
def increment(): Int = {
var backoff = 1
while (true) {
val current = count.get()
val next = current + 1
if (count.compareAndSet(current, next)) {
return next
}
// Exponential backoff to reduce contention
if (backoff < maxBackoff) {
Thread.sleep(0, backoff * 1000) // nanoseconds
backoff *= 2
}
}
-1 // Should never reach here
}
def get: Int = count.get()
def compareAndIncrement(expected: Int): Boolean =
count.compareAndSet(expected, expected + 1)
}
Distributed Computing Patterns
Message Passing and Remote Actors
// Remote actor communication patterns
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
// Distributed work distribution pattern
object WorkDistribution {
// Work item definition
case class WorkItem(id: String, payload: Array[Byte])
case class WorkResult(id: String, result: String)
// Master actor that distributes work
class MasterActor extends Actor {
import context.dispatcher
private var workers = List.empty[ActorRef]
private var pendingWork = List.empty[WorkItem]
private var workInProgress = Map.empty[String, ActorRef]
private var results = Map.empty[String, WorkResult]
def receive = {
case "RegisterWorker" =>
workers = sender() :: workers
println(s"Worker registered: ${sender()}")
processNextWork()
case work: WorkItem =>
pendingWork = work :: pendingWork
processNextWork()
case result: WorkResult =>
workInProgress.get(result.id) match {
case Some(worker) =>
workInProgress -= result.id
results += result.id -> result
println(s"Work ${result.id} completed by $worker")
processNextWork()
case None =>
println(s"Received result for unknown work: ${result.id}")
}
case "GetResults" =>
sender() ! results.values.toList
}
private def processNextWork(): Unit = {
(pendingWork, workers.filterNot(workInProgress.values.toSet)) match {
case (work :: remainingWork, availableWorker :: _) =>
pendingWork = remainingWork
workInProgress += work.id -> availableWorker
availableWorker ! work
println(s"Assigned work ${work.id} to $availableWorker")
processNextWork() // Try to assign more work
case _ => // No work or no available workers
}
}
}
// Worker actor that processes work
class WorkerActor extends Actor {
import context.dispatcher
override def preStart(): Unit = {
// Register with master on startup
context.system.actorSelection("/user/master") ! "RegisterWorker"
}
def receive = {
case work: WorkItem =>
println(s"Processing work: ${work.id}")
// Simulate work processing
Future {
Thread.sleep(Random.nextInt(2000) + 1000)
val result = s"Processed: ${work.payload.length} bytes"
WorkResult(work.id, result)
}.onComplete {
case scala.util.Success(result) => sender() ! result
case scala.util.Failure(ex) =>
println(s"Work ${work.id} failed: ${ex.getMessage}")
}
}
}
}
// Cluster-aware actor system
class ClusterManager {
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
class ClusterListener extends Actor {
val cluster = Cluster(context.system)
override def preStart(): Unit = {
cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
classOf[MemberEvent], classOf[UnreachableMember])
}
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case MemberUp(member) =>
println(s"Member is Up: ${member.address}")
case UnreachableMember(member) =>
println(s"Member detected as unreachable: ${member}")
case MemberRemoved(member, previousStatus) =>
println(s"Member is Removed: ${member.address} after $previousStatus")
case _: MemberEvent => // ignore
}
}
// Distributed cache using cluster sharding
object DistributedCache {
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
case class Get(key: String)
case class Put(key: String, value: String)
case class CacheEntry(value: String, timestamp: Long)
class CacheActor extends Actor {
private var cache = Map.empty[String, CacheEntry]
def receive = {
case Get(key) =>
sender() ! cache.get(key).map(_.value)
case Put(key, value) =>
cache += key -> CacheEntry(value, System.currentTimeMillis())
sender() ! "OK"
}
}
def start(system: ActorSystem): ActorRef = {
val extractEntityId: ShardRegion.ExtractEntityId = {
case Get(key) => (key.hashCode.toString, Get(key))
case Put(key, value) => (key.hashCode.toString, Put(key, value))
}
val extractShardId: ShardRegion.ExtractShardId = {
case Get(key) => (key.hashCode % 10).toString
case Put(key, _) => (key.hashCode % 10).toString
}
ClusterSharding(system).start(
typeName = "Cache",
entityProps = Props[CacheActor],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId
)
}
}
}
Event Sourcing and CQRS
// Event sourcing pattern for distributed systems
object EventSourcing {
// Domain events
sealed trait Event {
def entityId: String
def timestamp: Long
}
case class AccountCreated(entityId: String, initialBalance: BigDecimal, timestamp: Long) extends Event
case class MoneyDeposited(entityId: String, amount: BigDecimal, timestamp: Long) extends Event
case class MoneyWithdrawn(entityId: String, amount: BigDecimal, timestamp: Long) extends Event
case class AccountClosed(entityId: String, timestamp: Long) extends Event
// Aggregate state
sealed trait AccountState
case object NonExistent extends AccountState
case class Active(balance: BigDecimal) extends AccountState
case object Closed extends AccountState
// Event store
trait EventStore {
def saveEvents(entityId: String, events: List[Event], expectedVersion: Int): Future[Unit]
def getEvents(entityId: String): Future[List[Event]]
def getEventsFromVersion(entityId: String, version: Int): Future[List[Event]]
}
// In-memory event store implementation
class InMemoryEventStore extends EventStore {
private val events = scala.collection.mutable.Map[String, List[Event]]()
def saveEvents(entityId: String, newEvents: List[Event], expectedVersion: Int): Future[Unit] = {
val currentEvents = events.getOrElse(entityId, List.empty)
if (currentEvents.length != expectedVersion) {
Future.failed(new Exception("Concurrency conflict"))
} else {
events(entityId) = currentEvents ++ newEvents
Future.successful(())
}
}
def getEvents(entityId: String): Future[List[Event]] =
Future.successful(events.getOrElse(entityId, List.empty))
def getEventsFromVersion(entityId: String, version: Int): Future[List[Event]] =
Future.successful(events.getOrElse(entityId, List.empty).drop(version))
}
// Aggregate
class Account(id: String, eventStore: EventStore) {
implicit val ec: ExecutionContext = ExecutionContext.global
private var currentState: AccountState = NonExistent
private var version: Int = 0
def create(initialBalance: BigDecimal): Future[Unit] = {
currentState match {
case NonExistent =>
val event = AccountCreated(id, initialBalance, System.currentTimeMillis())
applyEvent(event)
eventStore.saveEvents(id, List(event), version)
case _ =>
Future.failed(new Exception("Account already exists"))
}
}
def deposit(amount: BigDecimal): Future[Unit] = {
currentState match {
case Active(_) =>
val event = MoneyDeposited(id, amount, System.currentTimeMillis())
applyEvent(event)
eventStore.saveEvents(id, List(event), version)
case _ =>
Future.failed(new Exception("Account not active"))
}
}
def withdraw(amount: BigDecimal): Future[Unit] = {
currentState match {
case Active(balance) if balance >= amount =>
val event = MoneyWithdrawn(id, amount, System.currentTimeMillis())
applyEvent(event)
eventStore.saveEvents(id, List(event), version)
case Active(_) =>
Future.failed(new Exception("Insufficient funds"))
case _ =>
Future.failed(new Exception("Account not active"))
}
}
def getBalance: BigDecimal = currentState match {
case Active(balance) => balance
case _ => BigDecimal(0)
}
def loadFromHistory(): Future[Unit] = {
eventStore.getEvents(id).map { events =>
events.foreach(applyEvent)
}
}
private def applyEvent(event: Event): Unit = {
currentState = currentState match {
case NonExistent => event match {
case AccountCreated(_, initialBalance, _) => Active(initialBalance)
case _ => NonExistent
}
case Active(balance) => event match {
case MoneyDeposited(_, amount, _) => Active(balance + amount)
case MoneyWithdrawn(_, amount, _) => Active(balance - amount)
case AccountClosed(_, _) => Closed
case _ => currentState
}
case Closed => currentState
}
version += 1
}
}
// Read model for CQRS
case class AccountSummary(id: String, balance: BigDecimal, transactionCount: Int, lastActivity: Long)
class AccountProjection(eventStore: EventStore) {
implicit val ec: ExecutionContext = ExecutionContext.global
private val summaries = scala.collection.mutable.Map[String, AccountSummary]()
def getAccountSummary(accountId: String): Option[AccountSummary] =
summaries.get(accountId)
def getAllAccounts: List[AccountSummary] = summaries.values.toList
def updateProjection(accountId: String): Future[Unit] = {
eventStore.getEvents(accountId).map { events =>
val summary = events.foldLeft(AccountSummary(accountId, BigDecimal(0), 0, 0L)) { (acc, event) =>
event match {
case AccountCreated(_, initialBalance, timestamp) =>
acc.copy(balance = initialBalance, transactionCount = acc.transactionCount + 1, lastActivity = timestamp)
case MoneyDeposited(_, amount, timestamp) =>
acc.copy(balance = acc.balance + amount, transactionCount = acc.transactionCount + 1, lastActivity = timestamp)
case MoneyWithdrawn(_, amount, timestamp) =>
acc.copy(balance = acc.balance - amount, transactionCount = acc.transactionCount + 1, lastActivity = timestamp)
case AccountClosed(_, timestamp) =>
acc.copy(lastActivity = timestamp)
}
}
summaries(accountId) = summary
}
}
}
}
Performance Monitoring and Optimization
Profiling Concurrent Applications
// Performance monitoring for concurrent applications
class ConcurrencyProfiler {
// Thread contention monitor
class ContentionMonitor {
private val threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean()
case class ThreadInfo(
name: String,
state: Thread.State,
cpuTime: Long,
blockedTime: Long,
waitTime: Long
)
def getThreadContention: Map[String, ThreadInfo] = {
threadMXBean.getAllThreadIds.map { id =>
val info = threadMXBean.getThreadInfo(id)
val cpuTime = if (threadMXBean.isThreadCpuTimeSupported) {
threadMXBean.getThreadCpuTime(id)
} else 0L
info.getThreadName -> ThreadInfo(
info.getThreadName,
info.getThreadState,
cpuTime,
info.getBlockedTime,
info.getWaitedTime
)
}.toMap
}
def printContentionReport(): Unit = {
val contentions = getThreadContention
println("Thread Contention Report:")
println("=" * 50)
contentions.values.groupBy(_.state).foreach { case (state, threads) =>
println(s"$state: ${threads.size} threads")
threads.take(5).foreach { thread =>
println(f" ${thread.name}%-30s CPU: ${thread.cpuTime / 1000000}%6d ms")
}
}
}
}
// Lock contention detector
class LockContentionDetector {
import java.lang.management.ManagementFactory
def detectDeadlocks(): Option[String] = {
val threadMXBean = ManagementFactory.getThreadMXBean()
Option(threadMXBean.findDeadlockedThreads()).map { deadlockedThreads =>
val threadInfos = threadMXBean.getThreadInfo(deadlockedThreads)
val report = new StringBuilder("DEADLOCK DETECTED:\n")
threadInfos.foreach { info =>
report.append(s"Thread: ${info.getThreadName}\n")
report.append(s" State: ${info.getThreadState}\n")
report.append(s" Lock: ${info.getLockName}\n")
report.append(s" Lock Owner: ${info.getLockOwnerName}\n")
report.append("\n")
}
report.toString()
}
}
def monitorLockContention(interval: FiniteDuration): Unit = {
val scheduler = Executors.newSingleThreadScheduledExecutor()
scheduler.scheduleAtFixedRate(
() => {
detectDeadlocks().foreach { report =>
println(report)
// In real application, would log or alert
}
},
0,
interval.toMillis,
TimeUnit.MILLISECONDS
)
}
}
// Performance metrics collector
class PerformanceMetrics {
private val requestCounts = new java.util.concurrent.ConcurrentHashMap[String, AtomicInteger]()
private val responseTimes = new java.util.concurrent.ConcurrentHashMap[String, AtomicReference[List[Long]]]()
def recordRequest(operation: String, responseTime: Long): Unit = {
requestCounts.computeIfAbsent(operation, _ => new AtomicInteger(0)).incrementAndGet()
responseTimes.computeIfAbsent(operation, _ => new AtomicReference(List.empty[Long]))
.updateAndGet(times => (responseTime :: times).take(1000)) // Keep last 1000 measurements
}
def getMetrics: Map[String, (Int, Double, Long, Long)] = {
requestCounts.asScala.map { case (operation, count) =>
val times = responseTimes.get(operation).get()
val avgTime = if (times.nonEmpty) times.sum.toDouble / times.length else 0.0
val minTime = if (times.nonEmpty) times.min else 0L
val maxTime = if (times.nonEmpty) times.max else 0L
operation -> (count.get(), avgTime, minTime, maxTime)
}.toMap
}
def printReport(): Unit = {
println("Performance Metrics Report:")
println("=" * 70)
println(f"${"Operation"}%-20s ${"Count"}%8s ${"Avg(ms)"}%10s ${"Min(ms)"}%10s ${"Max(ms)"}%10s")
println("-" * 70)
getMetrics.foreach { case (operation, (count, avg, min, max)) =>
println(f"$operation%-20s $count%8d $avg%10.2f $min%10d $max%10d")
}
}
}
// Benchmark utility for concurrent operations
def benchmarkConcurrentOperation[T](
name: String,
operation: () => T,
threads: Int,
iterations: Int
): Unit = {
val latch = new CountDownLatch(threads)
val startTime = System.nanoTime()
val results = new Array[Long](threads)
val threadPool = Executors.newFixedThreadPool(threads)
for (i <- 0 until threads) {
threadPool.submit(new Runnable {
def run(): Unit = {
val threadStart = System.nanoTime()
for (_ <- 0 until iterations) {
operation()
}
val threadEnd = System.nanoTime()
results(i) = threadEnd - threadStart
latch.countDown()
}
})
}
latch.await()
val endTime = System.nanoTime()
val totalTime = (endTime - startTime) / 1000000 // Convert to milliseconds
val avgThreadTime = results.sum / (results.length * 1000000)
val throughput = (threads * iterations * 1000) / totalTime
println(s"Benchmark: $name")
println(s" Total time: ${totalTime}ms")
println(s" Average thread time: ${avgThreadTime}ms")
println(s" Throughput: ${throughput.toInt} ops/sec")
println(s" Threads: $threads, Iterations per thread: $iterations")
println()
threadPool.shutdown()
}
}
Conclusion
Advanced concurrency and parallel computing in Scala provides multiple approaches for building high-performance, scalable applications. Key concepts and patterns include:
Concurrency Fundamentals:
- Understanding the difference between concurrency and parallelism
- Proper thread management and execution context selection
- Thread safety mechanisms and synchronization primitives
- Performance monitoring and bottleneck identification
Advanced Future Patterns:
- Composition and error handling strategies
- Timeout and retry mechanisms
- Circuit breaker and bulkhead patterns
- Async/await for cleaner asynchronous code
Parallel Computing:
- Parallel collections for data parallelism
- Fork/Join framework for divide-and-conquer algorithms
- Custom thread pools for different workload types
- Lock-free programming for maximum performance
Software Transactional Memory:
- STM for composable concurrent operations
- Lock-free data structures and algorithms
- Compare-and-swap operations
- Optimistic concurrency control
Distributed Patterns:
- Message passing and remote actor communication
- Event sourcing and CQRS for scalable architectures
- Cluster management and distributed caching
- Work distribution and load balancing
Performance Optimization:
- Profiling and monitoring concurrent applications
- Detecting and resolving contention issues
- Benchmarking concurrent operations
- Memory-efficient parallel processing
Best Practices:
- Choose appropriate concurrency models for specific use cases
- Minimize shared mutable state
- Use immutable data structures when possible
- Design for fault tolerance and graceful degradation
- Monitor and measure performance regularly
These advanced concurrency patterns enable the development of systems that can efficiently utilize modern multi-core hardware and scale across distributed environments while maintaining correctness and reliability.
Comments
Be the first to comment on this lesson!