Performance Optimization and Profiling in Scala
Performance optimization is crucial for building scalable Scala applications. This comprehensive lesson covers JVM optimization, memory management, profiling tools, benchmarking techniques, and systematic approaches to identifying and resolving performance bottlenecks in enterprise Scala systems.
JVM Performance Fundamentals
JVM Memory Management and Garbage Collection
// MemoryAnalysis.scala - Memory usage patterns and optimization
package com.example.performance.memory
import java.lang.management.ManagementFactory
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.util.Random
// Memory monitoring utilities
object MemoryProfiler {
private val memoryBean = ManagementFactory.getMemoryMXBean
private val gcBeans = ManagementFactory.getGarbageCollectorMXBeans
case class MemorySnapshot(
heapUsed: Long,
heapMax: Long,
nonHeapUsed: Long,
nonHeapMax: Long,
timestamp: Long = System.currentTimeMillis()
) {
def heapUtilization: Double = heapUsed.toDouble / heapMax
def nonHeapUtilization: Double = nonHeapUsed.toDouble / nonHeapMax
}
def takeSnapshot(): MemorySnapshot = {
val heapMemory = memoryBean.getHeapMemoryUsage
val nonHeapMemory = memoryBean.getNonHeapMemoryUsage
MemorySnapshot(
heapUsed = heapMemory.getUsed,
heapMax = heapMemory.getMax,
nonHeapUsed = nonHeapMemory.getUsed,
nonHeapMax = nonHeapMemory.getMax
)
}
def measureMemoryUsage[T](operation: => T): (T, MemorySnapshot, MemorySnapshot) = {
System.gc() // Suggest garbage collection before measurement
Thread.sleep(100) // Allow GC to complete
val before = takeSnapshot()
val result = operation
val after = takeSnapshot()
(result, before, after)
}
def measureGCActivity[T](operation: => T): (T, Map[String, (Long, Long)]) = {
val gcCountsBefore = gcBeans.map(bean => bean.getName -> bean.getCollectionCount).toMap
val gcTimesBefore = gcBeans.map(bean => bean.getName -> bean.getCollectionTime).toMap
val result = operation
val gcCountsAfter = gcBeans.map(bean => bean.getName -> bean.getCollectionCount).toMap
val gcTimesAfter = gcBeans.map(bean => bean.getName -> bean.getCollectionTime).toMap
val gcActivity = gcBeans.map { bean =>
val name = bean.getName
val countDiff = gcCountsAfter(name) - gcCountsBefore(name)
val timeDiff = gcTimesAfter(name) - gcTimesBefore(name)
name -> (countDiff, timeDiff)
}.toMap
(result, gcActivity)
}
def printMemoryInfo(): Unit = {
val snapshot = takeSnapshot()
println(s"""
|Memory Usage:
| Heap: ${snapshot.heapUsed / 1024 / 1024} MB / ${snapshot.heapMax / 1024 / 1024} MB (${(snapshot.heapUtilization * 100).toInt}%)
| Non-Heap: ${snapshot.nonHeapUsed / 1024 / 1024} MB / ${snapshot.nonHeapMax / 1024 / 1024} MB (${(snapshot.nonHeapUtilization * 100).toInt}%)
|
|GC Information:
|${gcBeans.map(bean => s" ${bean.getName}: ${bean.getCollectionCount} collections, ${bean.getCollectionTime} ms").mkString("\n")}
""".stripMargin)
}
}
// Data structure optimization examples
object DataStructureOptimization {
// Inefficient: Creates many intermediate collections
def inefficientChainedOperations(data: List[Int]): List[Int] = {
data
.filter(_ > 0)
.map(_ * 2)
.filter(_ < 1000)
.map(_ + 1)
.distinct
.sorted
}
// Optimized: Single pass with view
def optimizedChainedOperations(data: List[Int]): List[Int] = {
data.view
.filter(_ > 0)
.map(_ * 2)
.filter(_ < 1000)
.map(_ + 1)
.distinct
.to(List)
.sorted
}
// Memory-efficient streaming approach
def streamingOperations(data: Iterator[Int]): List[Int] = {
val seen = mutable.Set[Int]()
val result = mutable.ListBuffer[Int]()
data.foreach { item =>
if (item > 0) {
val transformed = item * 2
if (transformed < 1000) {
val final = transformed + 1
if (seen.add(final)) {
result += final
}
}
}
}
result.sorted.toList
}
// Array-based optimization for numeric operations
def arrayBasedOperations(data: Array[Int]): Array[Int] = {
val filtered = data.filter(_ > 0)
var i = 0
while (i < filtered.length) {
filtered(i) = filtered(i) * 2 + 1
i += 1
}
filtered.filter(_ < 1000).distinct.sorted
}
// Specialized collections for specific use cases
class OptimizedIntSet private (private val bits: Array[Long]) {
private val maxValue = bits.length * 64
def contains(value: Int): Boolean = {
if (value < 0 || value >= maxValue) false
else {
val arrayIndex = value / 64
val bitIndex = value % 64
(bits(arrayIndex) & (1L << bitIndex)) != 0
}
}
def add(value: Int): Unit = {
if (value >= 0 && value < maxValue) {
val arrayIndex = value / 64
val bitIndex = value % 64
bits(arrayIndex) |= (1L << bitIndex)
}
}
def remove(value: Int): Unit = {
if (value >= 0 && value < maxValue) {
val arrayIndex = value / 64
val bitIndex = value % 64
bits(arrayIndex) &= ~(1L << bitIndex)
}
}
def size: Int = bits.map(java.lang.Long.bitCount).sum
def toArray: Array[Int] = {
val result = mutable.ArrayBuffer[Int]()
for (arrayIndex <- bits.indices) {
var bits_value = bits(arrayIndex)
var bitIndex = 0
while (bits_value != 0) {
if ((bits_value & 1) == 1) {
result += arrayIndex * 64 + bitIndex
}
bits_value >>>= 1
bitIndex += 1
}
}
result.toArray
}
}
object OptimizedIntSet {
def apply(maxValue: Int): OptimizedIntSet = {
val arraySize = (maxValue + 63) / 64
new OptimizedIntSet(new Array[Long](arraySize))
}
}
}
// Object pool pattern for expensive object creation
class ObjectPool[T](factory: () => T, maxSize: Int = 10) {
private val pool = new java.util.concurrent.ConcurrentLinkedQueue[T]()
private val borrowed = new java.util.concurrent.atomic.AtomicInteger(0)
def borrow(): T = {
Option(pool.poll()) match {
case Some(obj) =>
borrowed.incrementAndGet()
obj
case None =>
borrowed.incrementAndGet()
factory()
}
}
def return_(obj: T): Unit = {
borrowed.decrementAndGet()
if (pool.size() < maxSize) {
pool.offer(obj)
}
}
def withObject[R](f: T => R): R = {
val obj = borrow()
try {
f(obj)
} finally {
return_(obj)
}
}
def size: Int = pool.size()
def borrowed_count: Int = borrowed.get()
}
// Example: String builder pool for concatenation operations
object StringBuilderPool extends ObjectPool[StringBuilder](
factory = () => new StringBuilder(1024),
maxSize = 20
) {
def buildString(operations: StringBuilder => Unit): String = {
withObject { sb =>
sb.clear()
operations(sb)
sb.toString()
}
}
}
CPU Optimization Techniques
// CPUOptimization.scala - CPU-intensive optimization patterns
package com.example.performance.cpu
import scala.annotation.tailrec
import scala.collection.parallel.CollectionConverters._
import java.util.concurrent.{ForkJoinPool, RecursiveTask}
import scala.concurrent.{Future, ExecutionContext}
object CPUOptimization {
// Tail recursion optimization
def factorial(n: Long): Long = {
@tailrec
def factorialAcc(n: Long, acc: Long): Long = {
if (n <= 1) acc
else factorialAcc(n - 1, n * acc)
}
factorialAcc(n, 1)
}
// Fibonacci with memoization
class FibonacciMemoized {
private val cache = scala.collection.mutable.Map[Int, Long]()
def fib(n: Int): Long = {
cache.getOrElseUpdate(n, {
if (n <= 1) n
else fib(n - 1) + fib(n - 2)
})
}
}
// Parallel collection operations
def parallelSum(numbers: Vector[Int]): Long = {
numbers.par.map(_.toLong).sum
}
def parallelFilter(numbers: Vector[Int], predicate: Int => Boolean): Vector[Int] = {
numbers.par.filter(predicate).seq.to(Vector)
}
// Custom parallel processing with work stealing
class ParallelMergeSort {
private val threshold = 10000
def sort(array: Array[Int]): Array[Int] = {
if (array.length <= 1) array
else {
val task = new MergeSortTask(array, 0, array.length)
ForkJoinPool.commonPool().invoke(task)
array
}
}
private class MergeSortTask(array: Array[Int], left: Int, right: Int)
extends RecursiveTask[Unit] {
def compute(): Unit = {
if (right - left <= threshold) {
java.util.Arrays.sort(array, left, right)
} else {
val mid = left + (right - left) / 2
val leftTask = new MergeSortTask(array, left, mid)
val rightTask = new MergeSortTask(array, mid, right)
leftTask.fork()
rightTask.compute()
leftTask.join()
merge(array, left, mid, right)
}
}
private def merge(array: Array[Int], left: Int, mid: Int, right: Int): Unit = {
val temp = new Array[Int](right - left)
var i = left
var j = mid
var k = 0
while (i < mid && j < right) {
if (array(i) <= array(j)) {
temp(k) = array(i)
i += 1
} else {
temp(k) = array(j)
j += 1
}
k += 1
}
while (i < mid) {
temp(k) = array(i)
i += 1
k += 1
}
while (j < right) {
temp(k) = array(j)
j += 1
k += 1
}
System.arraycopy(temp, 0, array, left, temp.length)
}
}
}
// Vectorization-friendly algorithms
def dotProduct(a: Array[Double], b: Array[Double]): Double = {
require(a.length == b.length, "Arrays must have the same length")
var sum = 0.0
var i = 0
val length = a.length
// Process 4 elements at a time (helps with vectorization)
while (i < length - 3) {
sum += a(i) * b(i) +
a(i + 1) * b(i + 1) +
a(i + 2) * b(i + 2) +
a(i + 3) * b(i + 3)
i += 4
}
// Handle remaining elements
while (i < length) {
sum += a(i) * b(i)
i += 1
}
sum
}
// Bit manipulation optimizations
object BitOperations {
def isPowerOfTwo(n: Int): Boolean = n > 0 && (n & (n - 1)) == 0
def nextPowerOfTwo(n: Int): Int = {
var power = 1
while (power < n) power <<= 1
power
}
def countBits(n: Int): Int = Integer.bitCount(n)
def reverseBits(n: Int): Int = Integer.reverse(n)
// Fast modulo for powers of 2
def fastModulo(value: Int, modulus: Int): Int = {
require(isPowerOfTwo(modulus), "Modulus must be a power of 2")
value & (modulus - 1)
}
}
// Cache-friendly data access patterns
class CacheFriendlyMatrix(rows: Int, cols: Int) {
private val data = new Array[Double](rows * cols)
def apply(row: Int, col: Int): Double = data(row * cols + col)
def update(row: Int, col: Int, value: Double): Unit = {
data(row * cols + col) = value
}
// Row-major traversal (cache-friendly)
def sumByRows(): Double = {
var sum = 0.0
var i = 0
while (i < data.length) {
sum += data(i)
i += 1
}
sum
}
// Column-major traversal (cache-unfriendly, but sometimes necessary)
def sumByColumns(): Double = {
var sum = 0.0
for (col <- 0 until cols) {
for (row <- 0 until rows) {
sum += apply(row, col)
}
}
sum
}
// Blocked matrix multiplication for better cache locality
def multiply(other: CacheFriendlyMatrix, blockSize: Int = 64): CacheFriendlyMatrix = {
require(cols == other.rows, "Invalid matrix dimensions for multiplication")
val result = new CacheFriendlyMatrix(rows, other.cols)
for (i <- 0 until rows by blockSize) {
for (j <- 0 until other.cols by blockSize) {
for (k <- 0 until cols by blockSize) {
val maxI = math.min(i + blockSize, rows)
val maxJ = math.min(j + blockSize, other.cols)
val maxK = math.min(k + blockSize, cols)
for (ii <- i until maxI) {
for (jj <- j until maxJ) {
var sum = result(ii, jj)
for (kk <- k until maxK) {
sum += this(ii, kk) * other(kk, jj)
}
result(ii, jj) = sum
}
}
}
}
}
result
}
}
}
Profiling Tools and Techniques
JVM Profiling with Built-in Tools
// ProfilingExample.scala - Demonstrating profiling integration
package com.example.performance.profiling
import java.lang.management.ManagementFactory
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import scala.util.Random
object ProfilingExample {
// CPU profiling with JFR (Java Flight Recorder) annotations
import jdk.jfr.{Event, Label, Description}
@Label("Custom Operation")
@Description("A custom operation for profiling")
class CustomOperationEvent extends Event {
@Label("Operation Type")
var operationType: String = _
@Label("Data Size")
var dataSize: Int = _
}
def profiledOperation(operationType: String, dataSize: Int): Unit = {
val event = new CustomOperationEvent()
event.operationType = operationType
event.dataSize = dataSize
event.begin()
try {
// Simulate work
operationType match {
case "cpu_intensive" => cpuIntensiveWork(dataSize)
case "memory_allocation" => memoryAllocationWork(dataSize)
case "io_simulation" => ioSimulationWork(dataSize)
}
} finally {
event.end()
event.commit()
}
}
private def cpuIntensiveWork(size: Int): Unit = {
var sum = 0L
for (i <- 0 until size) {
sum += fibonacci(i % 35)
}
}
private def fibonacci(n: Int): Long = {
if (n <= 1) n
else fibonacci(n - 1) + fibonacci(n - 2)
}
private def memoryAllocationWork(size: Int): Unit = {
val arrays = for (i <- 0 until size) yield {
new Array[Byte](1024)
}
// Force usage to prevent optimization
arrays.foreach(_(0) = 1)
}
private def ioSimulationWork(size: Int): Unit = {
for (_ <- 0 until size) {
Thread.sleep(1) // Simulate I/O wait
}
}
// Microbenchmarking utilities
class Benchmark {
def warmup[T](operation: => T, iterations: Int = 1000): Unit = {
for (_ <- 0 until iterations) {
operation
}
}
def measure[T](operation: => T, iterations: Int = 10000): BenchmarkResult[T] = {
val times = new Array[Long](iterations)
var result: T = null.asInstanceOf[T]
for (i <- 0 until iterations) {
val start = System.nanoTime()
result = operation
val end = System.nanoTime()
times(i) = end - start
}
java.util.Arrays.sort(times)
BenchmarkResult(
result = result,
iterations = iterations,
min = times(0),
max = times(iterations - 1),
median = times(iterations / 2),
mean = times.sum / iterations,
p95 = times((iterations * 0.95).toInt),
p99 = times((iterations * 0.99).toInt)
)
}
}
case class BenchmarkResult[T](
result: T,
iterations: Int,
min: Long,
max: Long,
median: Long,
mean: Long,
p95: Long,
p99: Long
) {
def printReport(): Unit = {
println(s"""
|Benchmark Results ($iterations iterations):
| Min: ${formatNanos(min)}
| Median: ${formatNanos(median)}
| Mean: ${formatNanos(mean)}
| P95: ${formatNanos(p95)}
| P99: ${formatNanos(p99)}
| Max: ${formatNanos(max)}
""".stripMargin)
}
private def formatNanos(nanos: Long): String = {
if (nanos < 1000) s"${nanos}ns"
else if (nanos < 1000000) f"${nanos / 1000.0}%.2fμs"
else if (nanos < 1000000000) f"${nanos / 1000000.0}%.2fms"
else f"${nanos / 1000000000.0}%.2fs"
}
}
// Thread contention analysis
object ThreadContentionAnalyzer {
private val threadBean = ManagementFactory.getThreadMXBean
def analyzeContention[T](operation: => T): (T, ContentionReport) = {
threadBean.setThreadContentionMonitoringEnabled(true)
val beforeInfo = threadBean.getAllThreadInfo
val beforeContentionTime = threadBean.getCurrentThreadCpuTime
val result = operation
val afterInfo = threadBean.getAllThreadInfo
val afterContentionTime = threadBean.getCurrentThreadCpuTime
val contentionDiff = afterContentionTime - beforeContentionTime
val blockedThreads = afterInfo.count(_.getThreadState.toString.contains("BLOCKED"))
val report = ContentionReport(
contentionTime = contentionDiff,
blockedThreadCount = blockedThreads,
totalThreads = afterInfo.length
)
(result, report)
}
}
case class ContentionReport(
contentionTime: Long,
blockedThreadCount: Int,
totalThreads: Int
) {
def contentionRatio: Double = blockedThreadCount.toDouble / totalThreads
}
// Allocation profiling
class AllocationProfiler {
private var allocatedBytes = 0L
private var allocatedObjects = 0L
def track[T](operation: => T): (T, AllocationStats) = {
val beforeMemory = getUsedMemory
val beforeGC = getTotalGCTime
val result = operation
val afterMemory = getUsedMemory
val afterGC = getTotalGCTime
val memoryDiff = afterMemory - beforeMemory
val gcTimeDiff = afterGC - beforeGC
val stats = AllocationStats(
allocatedBytes = memoryDiff,
gcTime = gcTimeDiff
)
(result, stats)
}
private def getUsedMemory: Long = {
val memoryBean = ManagementFactory.getMemoryMXBean
memoryBean.getHeapMemoryUsage.getUsed
}
private def getTotalGCTime: Long = {
val gcBeans = ManagementFactory.getGarbageCollectorMXBeans
gcBeans.map(_.getCollectionTime).sum
}
}
case class AllocationStats(
allocatedBytes: Long,
gcTime: Long
) {
def allocatedMB: Double = allocatedBytes / (1024.0 * 1024.0)
def gcTimeMs: Double = gcTime
}
}
Advanced Profiling with JMH (Java Microbenchmark Harness)
// JMHBenchmarks.scala - JMH benchmark examples
package com.example.performance.jmh
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.util.Random
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
@Fork(1)
@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
class CollectionBenchmarks {
@Param(Array("100", "1000", "10000"))
var size: Int = _
var data: Array[Int] = _
var list: List[Int] = _
var vector: Vector[Int] = _
var arrayBuffer: mutable.ArrayBuffer[Int] = _
@Setup
def setup(): Unit = {
val random = new Random(42)
data = Array.fill(size)(random.nextInt(1000))
list = data.toList
vector = data.toVector
arrayBuffer = mutable.ArrayBuffer(data: _*)
}
@Benchmark
def arrayIteration(bh: Blackhole): Unit = {
var i = 0
while (i < data.length) {
bh.consume(data(i))
i += 1
}
}
@Benchmark
def listIteration(bh: Blackhole): Unit = {
var current = list
while (current.nonEmpty) {
bh.consume(current.head)
current = current.tail
}
}
@Benchmark
def vectorIteration(bh: Blackhole): Unit = {
var i = 0
while (i < vector.length) {
bh.consume(vector(i))
i += 1
}
}
@Benchmark
def arrayBufferIteration(bh: Blackhole): Unit = {
var i = 0
while (i < arrayBuffer.length) {
bh.consume(arrayBuffer(i))
i += 1
}
}
@Benchmark
def arrayRandomAccess(bh: Blackhole): Unit = {
val random = new Random(42)
for (_ <- 0 until 1000) {
val index = random.nextInt(data.length)
bh.consume(data(index))
}
}
@Benchmark
def vectorRandomAccess(bh: Blackhole): Unit = {
val random = new Random(42)
for (_ <- 0 until 1000) {
val index = random.nextInt(vector.length)
bh.consume(vector(index))
}
}
@Benchmark
def listPrepend(bh: Blackhole): Unit = {
var result = List.empty[Int]
for (i <- 0 until 1000) {
result = i :: result
}
bh.consume(result)
}
@Benchmark
def vectorAppend(bh: Blackhole): Unit = {
var result = Vector.empty[Int]
for (i <- 0 until 1000) {
result = result :+ i
}
bh.consume(result)
}
@Benchmark
def arrayBufferAppend(bh: Blackhole): Unit = {
val result = mutable.ArrayBuffer.empty[Int]
for (i <- 0 until 1000) {
result += i
}
bh.consume(result)
}
}
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Benchmark)
@Fork(1)
@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
class StringProcessingBenchmarks {
@Param(Array("small", "medium", "large"))
var dataSize: String = _
var strings: Array[String] = _
@Setup
def setup(): Unit = {
val count = dataSize match {
case "small" => 100
case "medium" => 1000
case "large" => 10000
}
val random = new Random(42)
strings = Array.fill(count) {
random.alphanumeric.take(50).mkString
}
}
@Benchmark
def stringConcatenation(bh: Blackhole): Unit = {
var result = ""
for (str <- strings) {
result += str
}
bh.consume(result)
}
@Benchmark
def stringBuilderConcatenation(bh: Blackhole): Unit = {
val sb = new StringBuilder()
for (str <- strings) {
sb.append(str)
}
bh.consume(sb.toString())
}
@Benchmark
def stringInterpolation(bh: Blackhole): Unit = {
val results = for (str <- strings) yield {
s"Processed: $str"
}
bh.consume(results)
}
@Benchmark
def regexMatching(bh: Blackhole): Unit = {
val pattern = ".*[0-9]+.*".r
val matches = strings.count(pattern.matches)
bh.consume(matches)
}
@Benchmark
def stringOperations(bh: Blackhole): Unit = {
val results = strings.map { str =>
str.toLowerCase.trim.replace(" ", "_")
}
bh.consume(results)
}
}
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
@Fork(1)
@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
class AlgorithmBenchmarks {
@Param(Array("1000", "10000", "100000"))
var size: Int = _
var data: Array[Int] = _
@Setup
def setup(): Unit = {
val random = new Random(42)
data = Array.fill(size)(random.nextInt(size))
}
@Benchmark
def quickSort(bh: Blackhole): Unit = {
val copy = data.clone()
quickSortInPlace(copy, 0, copy.length - 1)
bh.consume(copy)
}
@Benchmark
def mergeSort(bh: Blackhole): Unit = {
val result = mergeSort(data.toList)
bh.consume(result)
}
@Benchmark
def javaSort(bh: Blackhole): Unit = {
val copy = data.clone()
java.util.Arrays.sort(copy)
bh.consume(copy)
}
@Benchmark
def scalaSort(bh: Blackhole): Unit = {
val result = data.sorted
bh.consume(result)
}
private def quickSortInPlace(arr: Array[Int], low: Int, high: Int): Unit = {
if (low < high) {
val pi = partition(arr, low, high)
quickSortInPlace(arr, low, pi - 1)
quickSortInPlace(arr, pi + 1, high)
}
}
private def partition(arr: Array[Int], low: Int, high: Int): Int = {
val pivot = arr(high)
var i = low - 1
for (j <- low until high) {
if (arr(j) <= pivot) {
i += 1
val temp = arr(i)
arr(i) = arr(j)
arr(j) = temp
}
}
val temp = arr(i + 1)
arr(i + 1) = arr(high)
arr(high) = temp
i + 1
}
private def mergeSort(list: List[Int]): List[Int] = {
if (list.length <= 1) list
else {
val (left, right) = list.splitAt(list.length / 2)
merge(mergeSort(left), mergeSort(right))
}
}
private def merge(left: List[Int], right: List[Int]): List[Int] = {
(left, right) match {
case (Nil, r) => r
case (l, Nil) => l
case (lHead :: lTail, rHead :: rTail) =>
if (lHead <= rHead) lHead :: merge(lTail, right)
else rHead :: merge(left, rTail)
}
}
}
// Functional programming performance benchmarks
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
@Fork(1)
@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
class FunctionalBenchmarks {
@Param(Array("1000", "10000"))
var size: Int = _
var data: List[Int] = _
@Setup
def setup(): Unit = {
data = (1 to size).toList
}
@Benchmark
def recursiveSum(bh: Blackhole): Unit = {
def sum(list: List[Int]): Long = list match {
case Nil => 0
case head :: tail => head + sum(tail)
}
bh.consume(sum(data))
}
@Benchmark
def tailRecursiveSum(bh: Blackhole): Unit = {
import scala.annotation.tailrec
@tailrec
def sum(list: List[Int], acc: Long = 0): Long = list match {
case Nil => acc
case head :: tail => sum(tail, acc + head)
}
bh.consume(sum(data))
}
@Benchmark
def foldLeftSum(bh: Blackhole): Unit = {
val result = data.foldLeft(0L)(_ + _)
bh.consume(result)
}
@Benchmark
def imperativeSum(bh: Blackhole): Unit = {
var sum = 0L
var current = data
while (current.nonEmpty) {
sum += current.head
current = current.tail
}
bh.consume(sum)
}
@Benchmark
def chainedOperations(bh: Blackhole): Unit = {
val result = data
.filter(_ % 2 == 0)
.map(_ * 2)
.filter(_ < 1000)
.sum
bh.consume(result)
}
@Benchmark
def optimizedChainedOperations(bh: Blackhole): Unit = {
val result = data.view
.filter(_ % 2 == 0)
.map(_ * 2)
.filter(_ < 1000)
.sum
bh.consume(result)
}
@Benchmark
def singlePassOperation(bh: Blackhole): Unit = {
var sum = 0
for (x <- data) {
if (x % 2 == 0) {
val doubled = x * 2
if (doubled < 1000) {
sum += doubled
}
}
}
bh.consume(sum)
}
}
Performance Monitoring and Observability
Application Performance Monitoring (APM)
// PerformanceMonitoring.scala - APM integration and metrics
package com.example.performance.monitoring
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.concurrent.duration._
import scala.collection.concurrent.TrieMap
import scala.util.{Try, Success, Failure}
import java.time.Instant
// Metrics collection system
trait MetricsCollector {
def incrementCounter(name: String, tags: Map[String, String] = Map.empty): Unit
def recordTimer(name: String, duration: Duration, tags: Map[String, String] = Map.empty): Unit
def updateGauge(name: String, value: Double, tags: Map[String, String] = Map.empty): Unit
def recordHistogram(name: String, value: Double, tags: Map[String, String] = Map.empty): Unit
}
class InMemoryMetricsCollector extends MetricsCollector {
private val counters = new ConcurrentHashMap[String, AtomicLong]()
private val timers = new ConcurrentHashMap[String, TimerStats]()
private val gauges = new ConcurrentHashMap[String, AtomicReference[Double]]()
private val histograms = new ConcurrentHashMap[String, HistogramStats]()
def incrementCounter(name: String, tags: Map[String, String] = Map.empty): Unit = {
val key = createKey(name, tags)
counters.computeIfAbsent(key, _ => new AtomicLong(0)).incrementAndGet()
}
def recordTimer(name: String, duration: Duration, tags: Map[String, String] = Map.empty): Unit = {
val key = createKey(name, tags)
val durationMicros = duration.toMicros
timers.computeIfAbsent(key, _ => new TimerStats()).addMeasurement(durationMicros)
}
def updateGauge(name: String, value: Double, tags: Map[String, String] = Map.empty): Unit = {
val key = createKey(name, tags)
gauges.computeIfAbsent(key, _ => new AtomicReference(0.0)).set(value)
}
def recordHistogram(name: String, value: Double, tags: Map[String, String] = Map.empty): Unit = {
val key = createKey(name, tags)
histograms.computeIfAbsent(key, _ => new HistogramStats()).addValue(value)
}
private def createKey(name: String, tags: Map[String, String]): String = {
if (tags.isEmpty) name
else s"$name{${tags.map { case (k, v) => s"$k=$v" }.mkString(",")}}"
}
def getCounterValue(name: String, tags: Map[String, String] = Map.empty): Long = {
val key = createKey(name, tags)
Option(counters.get(key)).map(_.get()).getOrElse(0L)
}
def getTimerStats(name: String, tags: Map[String, String] = Map.empty): Option[TimerStats] = {
val key = createKey(name, tags)
Option(timers.get(key))
}
def getGaugeValue(name: String, tags: Map[String, String] = Map.empty): Option[Double] = {
val key = createKey(name, tags)
Option(gauges.get(key)).map(_.get())
}
def getHistogramStats(name: String, tags: Map[String, String] = Map.empty): Option[HistogramStats] = {
val key = createKey(name, tags)
Option(histograms.get(key))
}
def getAllMetrics: Map[String, Any] = {
val allCounters = counters.entrySet().asScala.map(e => e.getKey -> e.getValue.get()).toMap
val allGauges = gauges.entrySet().asScala.map(e => e.getKey -> e.getValue.get()).toMap
val allTimers = timers.entrySet().asScala.map(e => e.getKey -> e.getValue).toMap
val allHistograms = histograms.entrySet().asScala.map(e => e.getKey -> e.getValue).toMap
Map(
"counters" -> allCounters,
"gauges" -> allGauges,
"timers" -> allTimers,
"histograms" -> allHistograms
)
}
}
class TimerStats {
private val count = new AtomicLong(0)
private val sum = new AtomicLong(0)
private val min = new AtomicLong(Long.MaxValue)
private val max = new AtomicLong(Long.MinValue)
def addMeasurement(durationMicros: Long): Unit = {
count.incrementAndGet()
sum.addAndGet(durationMicros)
// Update min
var currentMin = min.get()
while (durationMicros < currentMin && !min.compareAndSet(currentMin, durationMicros)) {
currentMin = min.get()
}
// Update max
var currentMax = max.get()
while (durationMicros > currentMax && !max.compareAndSet(currentMax, durationMicros)) {
currentMax = max.get()
}
}
def getCount: Long = count.get()
def getSum: Long = sum.get()
def getMin: Long = if (count.get() == 0) 0 else min.get()
def getMax: Long = if (count.get() == 0) 0 else max.get()
def getMean: Double = if (count.get() == 0) 0.0 else sum.get().toDouble / count.get()
}
class HistogramStats {
private val values = new AtomicReference(Vector.empty[Double])
def addValue(value: Double): Unit = {
values.updateAndGet(_ :+ value)
}
def getPercentile(percentile: Double): Double = {
val sortedValues = values.get().sorted
if (sortedValues.isEmpty) 0.0
else {
val index = math.min((sortedValues.length * percentile / 100.0).toInt, sortedValues.length - 1)
sortedValues(index)
}
}
def getCount: Int = values.get().length
def getMin: Double = values.get().minOption.getOrElse(0.0)
def getMax: Double = values.get().maxOption.getOrElse(0.0)
def getMean: Double = {
val vals = values.get()
if (vals.isEmpty) 0.0 else vals.sum / vals.length
}
}
// Performance instrumentation
object PerformanceInstrumentation {
private val metricsCollector = new InMemoryMetricsCollector()
def timed[T](name: String, tags: Map[String, String] = Map.empty)(operation: => T): T = {
val start = System.nanoTime()
try {
val result = operation
val duration = (System.nanoTime() - start).nanos
metricsCollector.recordTimer(name, duration, tags)
metricsCollector.incrementCounter(s"$name.success", tags)
result
} catch {
case ex: Exception =>
val duration = (System.nanoTime() - start).nanos
metricsCollector.recordTimer(name, duration, tags + ("status" -> "error"))
metricsCollector.incrementCounter(s"$name.error", tags)
throw ex
}
}
def counted[T](name: String, tags: Map[String, String] = Map.empty)(operation: => T): T = {
metricsCollector.incrementCounter(name, tags)
operation
}
def monitored[T](name: String, tags: Map[String, String] = Map.empty)(operation: => T): T = {
val start = System.nanoTime()
val startMemory = getUsedMemory
try {
val result = operation
val endMemory = getUsedMemory
val duration = (System.nanoTime() - start).nanos
val memoryUsed = endMemory - startMemory
metricsCollector.recordTimer(s"$name.duration", duration, tags)
metricsCollector.recordHistogram(s"$name.memory", memoryUsed.toDouble, tags)
metricsCollector.incrementCounter(s"$name.success", tags)
result
} catch {
case ex: Exception =>
val duration = (System.nanoTime() - start).nanos
metricsCollector.recordTimer(s"$name.duration", duration, tags + ("status" -> "error"))
metricsCollector.incrementCounter(s"$name.error", tags)
throw ex
}
}
private def getUsedMemory: Long = {
val memoryBean = java.lang.management.ManagementFactory.getMemoryMXBean
memoryBean.getHeapMemoryUsage.getUsed
}
def getMetrics: Map[String, Any] = metricsCollector.getAllMetrics
def printMetricsReport(): Unit = {
val metrics = getMetrics
println("=== Performance Metrics Report ===")
println("\nCounters:")
metrics("counters").asInstanceOf[Map[String, Long]].foreach { case (name, value) =>
println(f" $name%-40s $value%10d")
}
println("\nTimers:")
metrics("timers").asInstanceOf[Map[String, TimerStats]].foreach { case (name, stats) =>
println(f" $name%-40s count=${stats.getCount}%6d mean=${stats.getMean / 1000}%8.2fms min=${stats.getMin / 1000}%8.2fms max=${stats.getMax / 1000}%8.2fms")
}
println("\nGauges:")
metrics("gauges").asInstanceOf[Map[String, Double]].foreach { case (name, value) =>
println(f" $name%-40s $value%10.2f")
}
println("\nHistograms:")
metrics("histograms").asInstanceOf[Map[String, HistogramStats]].foreach { case (name, stats) =>
println(f" $name%-40s count=${stats.getCount}%6d p50=${stats.getPercentile(50)}%8.2f p95=${stats.getPercentile(95)}%8.2f p99=${stats.getPercentile(99)}%8.2f")
}
}
}
// Circuit breaker pattern for performance protection
class CircuitBreaker(
name: String,
failureThreshold: Int = 5,
timeout: Duration = 60.seconds,
resetTimeout: Duration = 60.seconds
) {
sealed trait State
case object Closed extends State
case object Open extends State
case object HalfOpen extends State
private val state = new AtomicReference[State](Closed)
private val failureCount = new AtomicLong(0)
private val lastFailureTime = new AtomicLong(0)
private val lastSuccessTime = new AtomicLong(System.currentTimeMillis())
def execute[T](operation: => T): Try[T] = {
state.get() match {
case Closed =>
executeInClosedState(operation)
case Open =>
if (shouldAttemptReset()) {
state.set(HalfOpen)
executeInHalfOpenState(operation)
} else {
Failure(new RuntimeException(s"Circuit breaker $name is OPEN"))
}
case HalfOpen =>
executeInHalfOpenState(operation)
}
}
private def executeInClosedState[T](operation: => T): Try[T] = {
Try(operation) match {
case success @ Success(_) =>
onSuccess()
success
case failure @ Failure(_) =>
onFailure()
failure
}
}
private def executeInHalfOpenState[T](operation: => T): Try[T] = {
Try(operation) match {
case success @ Success(_) =>
onSuccess()
state.set(Closed)
success
case failure @ Failure(_) =>
onFailure()
state.set(Open)
failure
}
}
private def onSuccess(): Unit = {
failureCount.set(0)
lastSuccessTime.set(System.currentTimeMillis())
PerformanceInstrumentation.metricsCollector.incrementCounter(s"circuit_breaker.$name.success")
}
private def onFailure(): Unit = {
val failures = failureCount.incrementAndGet()
lastFailureTime.set(System.currentTimeMillis())
if (failures >= failureThreshold) {
state.set(Open)
PerformanceInstrumentation.metricsCollector.incrementCounter(s"circuit_breaker.$name.opened")
}
PerformanceInstrumentation.metricsCollector.incrementCounter(s"circuit_breaker.$name.failure")
}
private def shouldAttemptReset(): Boolean = {
System.currentTimeMillis() - lastFailureTime.get() > resetTimeout.toMillis
}
def getState: State = state.get()
def getFailureCount: Long = failureCount.get()
def isOpen: Boolean = state.get() == Open
def isClosed: Boolean = state.get() == Closed
}
// Performance testing utilities
object PerformanceTestUtils {
def loadTest[T](
name: String,
operation: => T,
duration: Duration = 60.seconds,
concurrency: Int = 10
): LoadTestResult = {
import scala.concurrent.{Future, ExecutionContext}
import java.util.concurrent.Executors
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(concurrency)
)
val endTime = System.currentTimeMillis() + duration.toMillis
val results = new AtomicLong(0)
val errors = new AtomicLong(0)
val responseTimes = new java.util.concurrent.ConcurrentLinkedQueue[Long]()
val futures = (1 to concurrency).map { _ =>
Future {
while (System.currentTimeMillis() < endTime) {
val start = System.nanoTime()
try {
operation
val responseTime = (System.nanoTime() - start) / 1000000 // Convert to milliseconds
responseTimes.offer(responseTime)
results.incrementAndGet()
} catch {
case _: Exception =>
errors.incrementAndGet()
}
}
}
}
// Wait for all futures to complete
import scala.concurrent.Await
Await.ready(Future.sequence(futures), duration + 10.seconds)
val responseTimesList = responseTimes.asScala.toList.sorted
LoadTestResult(
name = name,
duration = duration,
concurrency = concurrency,
totalRequests = results.get(),
totalErrors = errors.get(),
requestsPerSecond = results.get().toDouble / duration.toSeconds,
averageResponseTime = if (responseTimesList.nonEmpty) responseTimesList.sum.toDouble / responseTimesList.length else 0.0,
medianResponseTime = if (responseTimesList.nonEmpty) responseTimesList(responseTimesList.length / 2) else 0L,
p95ResponseTime = if (responseTimesList.nonEmpty) responseTimesList((responseTimesList.length * 0.95).toInt) else 0L,
p99ResponseTime = if (responseTimesList.nonEmpty) responseTimesList((responseTimesList.length * 0.99).toInt) else 0L,
maxResponseTime = if (responseTimesList.nonEmpty) responseTimesList.last else 0L
)
}
case class LoadTestResult(
name: String,
duration: Duration,
concurrency: Int,
totalRequests: Long,
totalErrors: Long,
requestsPerSecond: Double,
averageResponseTime: Double,
medianResponseTime: Long,
p95ResponseTime: Long,
p99ResponseTime: Long,
maxResponseTime: Long
) {
def errorRate: Double = totalErrors.toDouble / totalRequests
def printReport(): Unit = {
println(s"""
|=== Load Test Report: $name ===
|Duration: ${duration.toSeconds}s
|Concurrency: $concurrency
|Total Requests: $totalRequests
|Total Errors: $totalErrors
|Error Rate: ${(errorRate * 100).formatted("%.2f")}%
|Requests/second: ${requestsPerSecond.formatted("%.2f")}
|Response Times:
| Average: ${averageResponseTime.formatted("%.2f")}ms
| Median: ${medianResponseTime}ms
| P95: ${p95ResponseTime}ms
| P99: ${p99ResponseTime}ms
| Max: ${maxResponseTime}ms
""".stripMargin)
}
}
}
Conclusion
Performance optimization and profiling are essential skills for building high-performance Scala applications. Key concepts include:
JVM Performance:
- Memory management and garbage collection optimization
- CPU optimization techniques and algorithms
- Data structure selection and optimization
- Object pooling and resource management
Profiling Tools:
- JVM built-in profiling capabilities
- Java Flight Recorder (JFR) integration
- Memory and allocation profiling
- Thread contention analysis
Benchmarking:
- JMH microbenchmarking framework
- Performance regression testing
- Load testing and stress testing
- Comparative performance analysis
Monitoring and Observability:
- Application Performance Monitoring (APM)
- Metrics collection and reporting
- Real-time performance monitoring
- Circuit breaker patterns for resilience
Optimization Strategies:
- Algorithmic improvements
- Data structure optimization
- Parallel and concurrent processing
- Cache-friendly programming patterns
Performance Testing:
- Load testing methodologies
- Performance baseline establishment
- Regression detection
- Capacity planning
Production Monitoring:
- Real-time metrics collection
- Performance alerting
- Trend analysis
- Capacity monitoring
Best Practices:
- Profile before optimizing
- Measure performance impact
- Focus on bottlenecks
- Consider readability vs performance trade-offs
Systematic performance optimization enables teams to build scalable, efficient Scala applications that meet enterprise performance requirements while maintaining code quality and maintainability.
Comments
Be the first to comment on this lesson!