Performance Optimization and Profiling in Scala

Performance optimization is crucial for building scalable Scala applications. This comprehensive lesson covers JVM optimization, memory management, profiling tools, benchmarking techniques, and systematic approaches to identifying and resolving performance bottlenecks in enterprise Scala systems.

JVM Performance Fundamentals

JVM Memory Management and Garbage Collection

// MemoryAnalysis.scala - Memory usage patterns and optimization
package com.example.performance.memory

import java.lang.management.ManagementFactory
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.util.Random

// Memory monitoring utilities
object MemoryProfiler {

  private val memoryBean = ManagementFactory.getMemoryMXBean
  private val gcBeans = ManagementFactory.getGarbageCollectorMXBeans

  case class MemorySnapshot(
    heapUsed: Long,
    heapMax: Long,
    nonHeapUsed: Long,
    nonHeapMax: Long,
    timestamp: Long = System.currentTimeMillis()
  ) {
    def heapUtilization: Double = heapUsed.toDouble / heapMax
    def nonHeapUtilization: Double = nonHeapUsed.toDouble / nonHeapMax
  }

  def takeSnapshot(): MemorySnapshot = {
    val heapMemory = memoryBean.getHeapMemoryUsage
    val nonHeapMemory = memoryBean.getNonHeapMemoryUsage

    MemorySnapshot(
      heapUsed = heapMemory.getUsed,
      heapMax = heapMemory.getMax,
      nonHeapUsed = nonHeapMemory.getUsed,
      nonHeapMax = nonHeapMemory.getMax
    )
  }

  def measureMemoryUsage[T](operation: => T): (T, MemorySnapshot, MemorySnapshot) = {
    System.gc() // Suggest garbage collection before measurement
    Thread.sleep(100) // Allow GC to complete

    val before = takeSnapshot()
    val result = operation
    val after = takeSnapshot()

    (result, before, after)
  }

  def measureGCActivity[T](operation: => T): (T, Map[String, (Long, Long)]) = {
    val gcCountsBefore = gcBeans.map(bean => bean.getName -> bean.getCollectionCount).toMap
    val gcTimesBefore = gcBeans.map(bean => bean.getName -> bean.getCollectionTime).toMap

    val result = operation

    val gcCountsAfter = gcBeans.map(bean => bean.getName -> bean.getCollectionCount).toMap
    val gcTimesAfter = gcBeans.map(bean => bean.getName -> bean.getCollectionTime).toMap

    val gcActivity = gcBeans.map { bean =>
      val name = bean.getName
      val countDiff = gcCountsAfter(name) - gcCountsBefore(name)
      val timeDiff = gcTimesAfter(name) - gcTimesBefore(name)
      name -> (countDiff, timeDiff)
    }.toMap

    (result, gcActivity)
  }

  def printMemoryInfo(): Unit = {
    val snapshot = takeSnapshot()
    println(s"""
      |Memory Usage:
      |  Heap: ${snapshot.heapUsed / 1024 / 1024} MB / ${snapshot.heapMax / 1024 / 1024} MB (${(snapshot.heapUtilization * 100).toInt}%)
      |  Non-Heap: ${snapshot.nonHeapUsed / 1024 / 1024} MB / ${snapshot.nonHeapMax / 1024 / 1024} MB (${(snapshot.nonHeapUtilization * 100).toInt}%)
      |
      |GC Information:
      |${gcBeans.map(bean => s"  ${bean.getName}: ${bean.getCollectionCount} collections, ${bean.getCollectionTime} ms").mkString("\n")}
      """.stripMargin)
  }
}

// Data structure optimization examples
object DataStructureOptimization {

  // Inefficient: Creates many intermediate collections
  def inefficientChainedOperations(data: List[Int]): List[Int] = {
    data
      .filter(_ > 0)
      .map(_ * 2)
      .filter(_ < 1000)
      .map(_ + 1)
      .distinct
      .sorted
  }

  // Optimized: Single pass with view
  def optimizedChainedOperations(data: List[Int]): List[Int] = {
    data.view
      .filter(_ > 0)
      .map(_ * 2)
      .filter(_ < 1000)
      .map(_ + 1)
      .distinct
      .to(List)
      .sorted
  }

  // Memory-efficient streaming approach
  def streamingOperations(data: Iterator[Int]): List[Int] = {
    val seen = mutable.Set[Int]()
    val result = mutable.ListBuffer[Int]()

    data.foreach { item =>
      if (item > 0) {
        val transformed = item * 2
        if (transformed < 1000) {
          val final = transformed + 1
          if (seen.add(final)) {
            result += final
          }
        }
      }
    }

    result.sorted.toList
  }

  // Array-based optimization for numeric operations
  def arrayBasedOperations(data: Array[Int]): Array[Int] = {
    val filtered = data.filter(_ > 0)
    var i = 0
    while (i < filtered.length) {
      filtered(i) = filtered(i) * 2 + 1
      i += 1
    }

    filtered.filter(_ < 1000).distinct.sorted
  }

  // Specialized collections for specific use cases
  class OptimizedIntSet private (private val bits: Array[Long]) {
    private val maxValue = bits.length * 64

    def contains(value: Int): Boolean = {
      if (value < 0 || value >= maxValue) false
      else {
        val arrayIndex = value / 64
        val bitIndex = value % 64
        (bits(arrayIndex) & (1L << bitIndex)) != 0
      }
    }

    def add(value: Int): Unit = {
      if (value >= 0 && value < maxValue) {
        val arrayIndex = value / 64
        val bitIndex = value % 64
        bits(arrayIndex) |= (1L << bitIndex)
      }
    }

    def remove(value: Int): Unit = {
      if (value >= 0 && value < maxValue) {
        val arrayIndex = value / 64
        val bitIndex = value % 64
        bits(arrayIndex) &= ~(1L << bitIndex)
      }
    }

    def size: Int = bits.map(java.lang.Long.bitCount).sum

    def toArray: Array[Int] = {
      val result = mutable.ArrayBuffer[Int]()
      for (arrayIndex <- bits.indices) {
        var bits_value = bits(arrayIndex)
        var bitIndex = 0
        while (bits_value != 0) {
          if ((bits_value & 1) == 1) {
            result += arrayIndex * 64 + bitIndex
          }
          bits_value >>>= 1
          bitIndex += 1
        }
      }
      result.toArray
    }
  }

  object OptimizedIntSet {
    def apply(maxValue: Int): OptimizedIntSet = {
      val arraySize = (maxValue + 63) / 64
      new OptimizedIntSet(new Array[Long](arraySize))
    }
  }
}

// Object pool pattern for expensive object creation
class ObjectPool[T](factory: () => T, maxSize: Int = 10) {
  private val pool = new java.util.concurrent.ConcurrentLinkedQueue[T]()
  private val borrowed = new java.util.concurrent.atomic.AtomicInteger(0)

  def borrow(): T = {
    Option(pool.poll()) match {
      case Some(obj) =>
        borrowed.incrementAndGet()
        obj
      case None =>
        borrowed.incrementAndGet()
        factory()
    }
  }

  def return_(obj: T): Unit = {
    borrowed.decrementAndGet()
    if (pool.size() < maxSize) {
      pool.offer(obj)
    }
  }

  def withObject[R](f: T => R): R = {
    val obj = borrow()
    try {
      f(obj)
    } finally {
      return_(obj)
    }
  }

  def size: Int = pool.size()
  def borrowed_count: Int = borrowed.get()
}

// Example: String builder pool for concatenation operations
object StringBuilderPool extends ObjectPool[StringBuilder](
  factory = () => new StringBuilder(1024),
  maxSize = 20
) {

  def buildString(operations: StringBuilder => Unit): String = {
    withObject { sb =>
      sb.clear()
      operations(sb)
      sb.toString()
    }
  }
}

CPU Optimization Techniques

// CPUOptimization.scala - CPU-intensive optimization patterns
package com.example.performance.cpu

import scala.annotation.tailrec
import scala.collection.parallel.CollectionConverters._
import java.util.concurrent.{ForkJoinPool, RecursiveTask}
import scala.concurrent.{Future, ExecutionContext}

object CPUOptimization {

  // Tail recursion optimization
  def factorial(n: Long): Long = {
    @tailrec
    def factorialAcc(n: Long, acc: Long): Long = {
      if (n <= 1) acc
      else factorialAcc(n - 1, n * acc)
    }
    factorialAcc(n, 1)
  }

  // Fibonacci with memoization
  class FibonacciMemoized {
    private val cache = scala.collection.mutable.Map[Int, Long]()

    def fib(n: Int): Long = {
      cache.getOrElseUpdate(n, {
        if (n <= 1) n
        else fib(n - 1) + fib(n - 2)
      })
    }
  }

  // Parallel collection operations
  def parallelSum(numbers: Vector[Int]): Long = {
    numbers.par.map(_.toLong).sum
  }

  def parallelFilter(numbers: Vector[Int], predicate: Int => Boolean): Vector[Int] = {
    numbers.par.filter(predicate).seq.to(Vector)
  }

  // Custom parallel processing with work stealing
  class ParallelMergeSort {
    private val threshold = 10000

    def sort(array: Array[Int]): Array[Int] = {
      if (array.length <= 1) array
      else {
        val task = new MergeSortTask(array, 0, array.length)
        ForkJoinPool.commonPool().invoke(task)
        array
      }
    }

    private class MergeSortTask(array: Array[Int], left: Int, right: Int) 
        extends RecursiveTask[Unit] {

      def compute(): Unit = {
        if (right - left <= threshold) {
          java.util.Arrays.sort(array, left, right)
        } else {
          val mid = left + (right - left) / 2
          val leftTask = new MergeSortTask(array, left, mid)
          val rightTask = new MergeSortTask(array, mid, right)

          leftTask.fork()
          rightTask.compute()
          leftTask.join()

          merge(array, left, mid, right)
        }
      }

      private def merge(array: Array[Int], left: Int, mid: Int, right: Int): Unit = {
        val temp = new Array[Int](right - left)
        var i = left
        var j = mid
        var k = 0

        while (i < mid && j < right) {
          if (array(i) <= array(j)) {
            temp(k) = array(i)
            i += 1
          } else {
            temp(k) = array(j)
            j += 1
          }
          k += 1
        }

        while (i < mid) {
          temp(k) = array(i)
          i += 1
          k += 1
        }

        while (j < right) {
          temp(k) = array(j)
          j += 1
          k += 1
        }

        System.arraycopy(temp, 0, array, left, temp.length)
      }
    }
  }

  // Vectorization-friendly algorithms
  def dotProduct(a: Array[Double], b: Array[Double]): Double = {
    require(a.length == b.length, "Arrays must have the same length")

    var sum = 0.0
    var i = 0
    val length = a.length

    // Process 4 elements at a time (helps with vectorization)
    while (i < length - 3) {
      sum += a(i) * b(i) +
             a(i + 1) * b(i + 1) +
             a(i + 2) * b(i + 2) +
             a(i + 3) * b(i + 3)
      i += 4
    }

    // Handle remaining elements
    while (i < length) {
      sum += a(i) * b(i)
      i += 1
    }

    sum
  }

  // Bit manipulation optimizations
  object BitOperations {

    def isPowerOfTwo(n: Int): Boolean = n > 0 && (n & (n - 1)) == 0

    def nextPowerOfTwo(n: Int): Int = {
      var power = 1
      while (power < n) power <<= 1
      power
    }

    def countBits(n: Int): Int = Integer.bitCount(n)

    def reverseBits(n: Int): Int = Integer.reverse(n)

    // Fast modulo for powers of 2
    def fastModulo(value: Int, modulus: Int): Int = {
      require(isPowerOfTwo(modulus), "Modulus must be a power of 2")
      value & (modulus - 1)
    }
  }

  // Cache-friendly data access patterns
  class CacheFriendlyMatrix(rows: Int, cols: Int) {
    private val data = new Array[Double](rows * cols)

    def apply(row: Int, col: Int): Double = data(row * cols + col)

    def update(row: Int, col: Int, value: Double): Unit = {
      data(row * cols + col) = value
    }

    // Row-major traversal (cache-friendly)
    def sumByRows(): Double = {
      var sum = 0.0
      var i = 0
      while (i < data.length) {
        sum += data(i)
        i += 1
      }
      sum
    }

    // Column-major traversal (cache-unfriendly, but sometimes necessary)
    def sumByColumns(): Double = {
      var sum = 0.0
      for (col <- 0 until cols) {
        for (row <- 0 until rows) {
          sum += apply(row, col)
        }
      }
      sum
    }

    // Blocked matrix multiplication for better cache locality
    def multiply(other: CacheFriendlyMatrix, blockSize: Int = 64): CacheFriendlyMatrix = {
      require(cols == other.rows, "Invalid matrix dimensions for multiplication")

      val result = new CacheFriendlyMatrix(rows, other.cols)

      for (i <- 0 until rows by blockSize) {
        for (j <- 0 until other.cols by blockSize) {
          for (k <- 0 until cols by blockSize) {

            val maxI = math.min(i + blockSize, rows)
            val maxJ = math.min(j + blockSize, other.cols)
            val maxK = math.min(k + blockSize, cols)

            for (ii <- i until maxI) {
              for (jj <- j until maxJ) {
                var sum = result(ii, jj)
                for (kk <- k until maxK) {
                  sum += this(ii, kk) * other(kk, jj)
                }
                result(ii, jj) = sum
              }
            }
          }
        }
      }

      result
    }
  }
}

Profiling Tools and Techniques

JVM Profiling with Built-in Tools

// ProfilingExample.scala - Demonstrating profiling integration
package com.example.performance.profiling

import java.lang.management.ManagementFactory
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import scala.util.Random

object ProfilingExample {

  // CPU profiling with JFR (Java Flight Recorder) annotations
  import jdk.jfr.{Event, Label, Description}

  @Label("Custom Operation")
  @Description("A custom operation for profiling")
  class CustomOperationEvent extends Event {
    @Label("Operation Type")
    var operationType: String = _

    @Label("Data Size")
    var dataSize: Int = _
  }

  def profiledOperation(operationType: String, dataSize: Int): Unit = {
    val event = new CustomOperationEvent()
    event.operationType = operationType
    event.dataSize = dataSize
    event.begin()

    try {
      // Simulate work
      operationType match {
        case "cpu_intensive" => cpuIntensiveWork(dataSize)
        case "memory_allocation" => memoryAllocationWork(dataSize)
        case "io_simulation" => ioSimulationWork(dataSize)
      }
    } finally {
      event.end()
      event.commit()
    }
  }

  private def cpuIntensiveWork(size: Int): Unit = {
    var sum = 0L
    for (i <- 0 until size) {
      sum += fibonacci(i % 35)
    }
  }

  private def fibonacci(n: Int): Long = {
    if (n <= 1) n
    else fibonacci(n - 1) + fibonacci(n - 2)
  }

  private def memoryAllocationWork(size: Int): Unit = {
    val arrays = for (i <- 0 until size) yield {
      new Array[Byte](1024)
    }
    // Force usage to prevent optimization
    arrays.foreach(_(0) = 1)
  }

  private def ioSimulationWork(size: Int): Unit = {
    for (_ <- 0 until size) {
      Thread.sleep(1) // Simulate I/O wait
    }
  }

  // Microbenchmarking utilities
  class Benchmark {
    def warmup[T](operation: => T, iterations: Int = 1000): Unit = {
      for (_ <- 0 until iterations) {
        operation
      }
    }

    def measure[T](operation: => T, iterations: Int = 10000): BenchmarkResult[T] = {
      val times = new Array[Long](iterations)
      var result: T = null.asInstanceOf[T]

      for (i <- 0 until iterations) {
        val start = System.nanoTime()
        result = operation
        val end = System.nanoTime()
        times(i) = end - start
      }

      java.util.Arrays.sort(times)

      BenchmarkResult(
        result = result,
        iterations = iterations,
        min = times(0),
        max = times(iterations - 1),
        median = times(iterations / 2),
        mean = times.sum / iterations,
        p95 = times((iterations * 0.95).toInt),
        p99 = times((iterations * 0.99).toInt)
      )
    }
  }

  case class BenchmarkResult[T](
    result: T,
    iterations: Int,
    min: Long,
    max: Long,
    median: Long,
    mean: Long,
    p95: Long,
    p99: Long
  ) {
    def printReport(): Unit = {
      println(s"""
        |Benchmark Results ($iterations iterations):
        |  Min:    ${formatNanos(min)}
        |  Median: ${formatNanos(median)}
        |  Mean:   ${formatNanos(mean)}
        |  P95:    ${formatNanos(p95)}
        |  P99:    ${formatNanos(p99)}
        |  Max:    ${formatNanos(max)}
        """.stripMargin)
    }

    private def formatNanos(nanos: Long): String = {
      if (nanos < 1000) s"${nanos}ns"
      else if (nanos < 1000000) f"${nanos / 1000.0}%.2fμs"
      else if (nanos < 1000000000) f"${nanos / 1000000.0}%.2fms"
      else f"${nanos / 1000000000.0}%.2fs"
    }
  }

  // Thread contention analysis
  object ThreadContentionAnalyzer {
    private val threadBean = ManagementFactory.getThreadMXBean

    def analyzeContention[T](operation: => T): (T, ContentionReport) = {
      threadBean.setThreadContentionMonitoringEnabled(true)

      val beforeInfo = threadBean.getAllThreadInfo
      val beforeContentionTime = threadBean.getCurrentThreadCpuTime

      val result = operation

      val afterInfo = threadBean.getAllThreadInfo
      val afterContentionTime = threadBean.getCurrentThreadCpuTime

      val contentionDiff = afterContentionTime - beforeContentionTime
      val blockedThreads = afterInfo.count(_.getThreadState.toString.contains("BLOCKED"))

      val report = ContentionReport(
        contentionTime = contentionDiff,
        blockedThreadCount = blockedThreads,
        totalThreads = afterInfo.length
      )

      (result, report)
    }
  }

  case class ContentionReport(
    contentionTime: Long,
    blockedThreadCount: Int,
    totalThreads: Int
  ) {
    def contentionRatio: Double = blockedThreadCount.toDouble / totalThreads
  }

  // Allocation profiling
  class AllocationProfiler {
    private var allocatedBytes = 0L
    private var allocatedObjects = 0L

    def track[T](operation: => T): (T, AllocationStats) = {
      val beforeMemory = getUsedMemory
      val beforeGC = getTotalGCTime

      val result = operation

      val afterMemory = getUsedMemory
      val afterGC = getTotalGCTime

      val memoryDiff = afterMemory - beforeMemory
      val gcTimeDiff = afterGC - beforeGC

      val stats = AllocationStats(
        allocatedBytes = memoryDiff,
        gcTime = gcTimeDiff
      )

      (result, stats)
    }

    private def getUsedMemory: Long = {
      val memoryBean = ManagementFactory.getMemoryMXBean
      memoryBean.getHeapMemoryUsage.getUsed
    }

    private def getTotalGCTime: Long = {
      val gcBeans = ManagementFactory.getGarbageCollectorMXBeans
      gcBeans.map(_.getCollectionTime).sum
    }
  }

  case class AllocationStats(
    allocatedBytes: Long,
    gcTime: Long
  ) {
    def allocatedMB: Double = allocatedBytes / (1024.0 * 1024.0)
    def gcTimeMs: Double = gcTime
  }
}

Advanced Profiling with JMH (Java Microbenchmark Harness)

// JMHBenchmarks.scala - JMH benchmark examples
package com.example.performance.jmh

import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.util.Random

@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
@Fork(1)
@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
class CollectionBenchmarks {

  @Param(Array("100", "1000", "10000"))
  var size: Int = _

  var data: Array[Int] = _
  var list: List[Int] = _
  var vector: Vector[Int] = _
  var arrayBuffer: mutable.ArrayBuffer[Int] = _

  @Setup
  def setup(): Unit = {
    val random = new Random(42)
    data = Array.fill(size)(random.nextInt(1000))
    list = data.toList
    vector = data.toVector
    arrayBuffer = mutable.ArrayBuffer(data: _*)
  }

  @Benchmark
  def arrayIteration(bh: Blackhole): Unit = {
    var i = 0
    while (i < data.length) {
      bh.consume(data(i))
      i += 1
    }
  }

  @Benchmark
  def listIteration(bh: Blackhole): Unit = {
    var current = list
    while (current.nonEmpty) {
      bh.consume(current.head)
      current = current.tail
    }
  }

  @Benchmark
  def vectorIteration(bh: Blackhole): Unit = {
    var i = 0
    while (i < vector.length) {
      bh.consume(vector(i))
      i += 1
    }
  }

  @Benchmark
  def arrayBufferIteration(bh: Blackhole): Unit = {
    var i = 0
    while (i < arrayBuffer.length) {
      bh.consume(arrayBuffer(i))
      i += 1
    }
  }

  @Benchmark
  def arrayRandomAccess(bh: Blackhole): Unit = {
    val random = new Random(42)
    for (_ <- 0 until 1000) {
      val index = random.nextInt(data.length)
      bh.consume(data(index))
    }
  }

  @Benchmark
  def vectorRandomAccess(bh: Blackhole): Unit = {
    val random = new Random(42)
    for (_ <- 0 until 1000) {
      val index = random.nextInt(vector.length)
      bh.consume(vector(index))
    }
  }

  @Benchmark
  def listPrepend(bh: Blackhole): Unit = {
    var result = List.empty[Int]
    for (i <- 0 until 1000) {
      result = i :: result
    }
    bh.consume(result)
  }

  @Benchmark
  def vectorAppend(bh: Blackhole): Unit = {
    var result = Vector.empty[Int]
    for (i <- 0 until 1000) {
      result = result :+ i
    }
    bh.consume(result)
  }

  @Benchmark
  def arrayBufferAppend(bh: Blackhole): Unit = {
    val result = mutable.ArrayBuffer.empty[Int]
    for (i <- 0 until 1000) {
      result += i
    }
    bh.consume(result)
  }
}

@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Benchmark)
@Fork(1)
@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
class StringProcessingBenchmarks {

  @Param(Array("small", "medium", "large"))
  var dataSize: String = _

  var strings: Array[String] = _

  @Setup
  def setup(): Unit = {
    val count = dataSize match {
      case "small" => 100
      case "medium" => 1000
      case "large" => 10000
    }

    val random = new Random(42)
    strings = Array.fill(count) {
      random.alphanumeric.take(50).mkString
    }
  }

  @Benchmark
  def stringConcatenation(bh: Blackhole): Unit = {
    var result = ""
    for (str <- strings) {
      result += str
    }
    bh.consume(result)
  }

  @Benchmark
  def stringBuilderConcatenation(bh: Blackhole): Unit = {
    val sb = new StringBuilder()
    for (str <- strings) {
      sb.append(str)
    }
    bh.consume(sb.toString())
  }

  @Benchmark
  def stringInterpolation(bh: Blackhole): Unit = {
    val results = for (str <- strings) yield {
      s"Processed: $str"
    }
    bh.consume(results)
  }

  @Benchmark
  def regexMatching(bh: Blackhole): Unit = {
    val pattern = ".*[0-9]+.*".r
    val matches = strings.count(pattern.matches)
    bh.consume(matches)
  }

  @Benchmark
  def stringOperations(bh: Blackhole): Unit = {
    val results = strings.map { str =>
      str.toLowerCase.trim.replace(" ", "_")
    }
    bh.consume(results)
  }
}

@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
@Fork(1)
@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
class AlgorithmBenchmarks {

  @Param(Array("1000", "10000", "100000"))
  var size: Int = _

  var data: Array[Int] = _

  @Setup
  def setup(): Unit = {
    val random = new Random(42)
    data = Array.fill(size)(random.nextInt(size))
  }

  @Benchmark
  def quickSort(bh: Blackhole): Unit = {
    val copy = data.clone()
    quickSortInPlace(copy, 0, copy.length - 1)
    bh.consume(copy)
  }

  @Benchmark
  def mergeSort(bh: Blackhole): Unit = {
    val result = mergeSort(data.toList)
    bh.consume(result)
  }

  @Benchmark
  def javaSort(bh: Blackhole): Unit = {
    val copy = data.clone()
    java.util.Arrays.sort(copy)
    bh.consume(copy)
  }

  @Benchmark
  def scalaSort(bh: Blackhole): Unit = {
    val result = data.sorted
    bh.consume(result)
  }

  private def quickSortInPlace(arr: Array[Int], low: Int, high: Int): Unit = {
    if (low < high) {
      val pi = partition(arr, low, high)
      quickSortInPlace(arr, low, pi - 1)
      quickSortInPlace(arr, pi + 1, high)
    }
  }

  private def partition(arr: Array[Int], low: Int, high: Int): Int = {
    val pivot = arr(high)
    var i = low - 1

    for (j <- low until high) {
      if (arr(j) <= pivot) {
        i += 1
        val temp = arr(i)
        arr(i) = arr(j)
        arr(j) = temp
      }
    }

    val temp = arr(i + 1)
    arr(i + 1) = arr(high)
    arr(high) = temp

    i + 1
  }

  private def mergeSort(list: List[Int]): List[Int] = {
    if (list.length <= 1) list
    else {
      val (left, right) = list.splitAt(list.length / 2)
      merge(mergeSort(left), mergeSort(right))
    }
  }

  private def merge(left: List[Int], right: List[Int]): List[Int] = {
    (left, right) match {
      case (Nil, r) => r
      case (l, Nil) => l
      case (lHead :: lTail, rHead :: rTail) =>
        if (lHead <= rHead) lHead :: merge(lTail, right)
        else rHead :: merge(left, rTail)
    }
  }
}

// Functional programming performance benchmarks
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
@Fork(1)
@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
class FunctionalBenchmarks {

  @Param(Array("1000", "10000"))
  var size: Int = _

  var data: List[Int] = _

  @Setup
  def setup(): Unit = {
    data = (1 to size).toList
  }

  @Benchmark
  def recursiveSum(bh: Blackhole): Unit = {
    def sum(list: List[Int]): Long = list match {
      case Nil => 0
      case head :: tail => head + sum(tail)
    }
    bh.consume(sum(data))
  }

  @Benchmark
  def tailRecursiveSum(bh: Blackhole): Unit = {
    import scala.annotation.tailrec

    @tailrec
    def sum(list: List[Int], acc: Long = 0): Long = list match {
      case Nil => acc
      case head :: tail => sum(tail, acc + head)
    }
    bh.consume(sum(data))
  }

  @Benchmark
  def foldLeftSum(bh: Blackhole): Unit = {
    val result = data.foldLeft(0L)(_ + _)
    bh.consume(result)
  }

  @Benchmark
  def imperativeSum(bh: Blackhole): Unit = {
    var sum = 0L
    var current = data
    while (current.nonEmpty) {
      sum += current.head
      current = current.tail
    }
    bh.consume(sum)
  }

  @Benchmark
  def chainedOperations(bh: Blackhole): Unit = {
    val result = data
      .filter(_ % 2 == 0)
      .map(_ * 2)
      .filter(_ < 1000)
      .sum
    bh.consume(result)
  }

  @Benchmark
  def optimizedChainedOperations(bh: Blackhole): Unit = {
    val result = data.view
      .filter(_ % 2 == 0)
      .map(_ * 2)
      .filter(_ < 1000)
      .sum
    bh.consume(result)
  }

  @Benchmark
  def singlePassOperation(bh: Blackhole): Unit = {
    var sum = 0
    for (x <- data) {
      if (x % 2 == 0) {
        val doubled = x * 2
        if (doubled < 1000) {
          sum += doubled
        }
      }
    }
    bh.consume(sum)
  }
}

Performance Monitoring and Observability

Application Performance Monitoring (APM)

// PerformanceMonitoring.scala - APM integration and metrics
package com.example.performance.monitoring

import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.concurrent.duration._
import scala.collection.concurrent.TrieMap
import scala.util.{Try, Success, Failure}
import java.time.Instant

// Metrics collection system
trait MetricsCollector {
  def incrementCounter(name: String, tags: Map[String, String] = Map.empty): Unit
  def recordTimer(name: String, duration: Duration, tags: Map[String, String] = Map.empty): Unit
  def updateGauge(name: String, value: Double, tags: Map[String, String] = Map.empty): Unit
  def recordHistogram(name: String, value: Double, tags: Map[String, String] = Map.empty): Unit
}

class InMemoryMetricsCollector extends MetricsCollector {
  private val counters = new ConcurrentHashMap[String, AtomicLong]()
  private val timers = new ConcurrentHashMap[String, TimerStats]()
  private val gauges = new ConcurrentHashMap[String, AtomicReference[Double]]()
  private val histograms = new ConcurrentHashMap[String, HistogramStats]()

  def incrementCounter(name: String, tags: Map[String, String] = Map.empty): Unit = {
    val key = createKey(name, tags)
    counters.computeIfAbsent(key, _ => new AtomicLong(0)).incrementAndGet()
  }

  def recordTimer(name: String, duration: Duration, tags: Map[String, String] = Map.empty): Unit = {
    val key = createKey(name, tags)
    val durationMicros = duration.toMicros
    timers.computeIfAbsent(key, _ => new TimerStats()).addMeasurement(durationMicros)
  }

  def updateGauge(name: String, value: Double, tags: Map[String, String] = Map.empty): Unit = {
    val key = createKey(name, tags)
    gauges.computeIfAbsent(key, _ => new AtomicReference(0.0)).set(value)
  }

  def recordHistogram(name: String, value: Double, tags: Map[String, String] = Map.empty): Unit = {
    val key = createKey(name, tags)
    histograms.computeIfAbsent(key, _ => new HistogramStats()).addValue(value)
  }

  private def createKey(name: String, tags: Map[String, String]): String = {
    if (tags.isEmpty) name
    else s"$name{${tags.map { case (k, v) => s"$k=$v" }.mkString(",")}}"
  }

  def getCounterValue(name: String, tags: Map[String, String] = Map.empty): Long = {
    val key = createKey(name, tags)
    Option(counters.get(key)).map(_.get()).getOrElse(0L)
  }

  def getTimerStats(name: String, tags: Map[String, String] = Map.empty): Option[TimerStats] = {
    val key = createKey(name, tags)
    Option(timers.get(key))
  }

  def getGaugeValue(name: String, tags: Map[String, String] = Map.empty): Option[Double] = {
    val key = createKey(name, tags)
    Option(gauges.get(key)).map(_.get())
  }

  def getHistogramStats(name: String, tags: Map[String, String] = Map.empty): Option[HistogramStats] = {
    val key = createKey(name, tags)
    Option(histograms.get(key))
  }

  def getAllMetrics: Map[String, Any] = {
    val allCounters = counters.entrySet().asScala.map(e => e.getKey -> e.getValue.get()).toMap
    val allGauges = gauges.entrySet().asScala.map(e => e.getKey -> e.getValue.get()).toMap
    val allTimers = timers.entrySet().asScala.map(e => e.getKey -> e.getValue).toMap
    val allHistograms = histograms.entrySet().asScala.map(e => e.getKey -> e.getValue).toMap

    Map(
      "counters" -> allCounters,
      "gauges" -> allGauges,
      "timers" -> allTimers,
      "histograms" -> allHistograms
    )
  }
}

class TimerStats {
  private val count = new AtomicLong(0)
  private val sum = new AtomicLong(0)
  private val min = new AtomicLong(Long.MaxValue)
  private val max = new AtomicLong(Long.MinValue)

  def addMeasurement(durationMicros: Long): Unit = {
    count.incrementAndGet()
    sum.addAndGet(durationMicros)

    // Update min
    var currentMin = min.get()
    while (durationMicros < currentMin && !min.compareAndSet(currentMin, durationMicros)) {
      currentMin = min.get()
    }

    // Update max
    var currentMax = max.get()
    while (durationMicros > currentMax && !max.compareAndSet(currentMax, durationMicros)) {
      currentMax = max.get()
    }
  }

  def getCount: Long = count.get()
  def getSum: Long = sum.get()
  def getMin: Long = if (count.get() == 0) 0 else min.get()
  def getMax: Long = if (count.get() == 0) 0 else max.get()
  def getMean: Double = if (count.get() == 0) 0.0 else sum.get().toDouble / count.get()
}

class HistogramStats {
  private val values = new AtomicReference(Vector.empty[Double])

  def addValue(value: Double): Unit = {
    values.updateAndGet(_ :+ value)
  }

  def getPercentile(percentile: Double): Double = {
    val sortedValues = values.get().sorted
    if (sortedValues.isEmpty) 0.0
    else {
      val index = math.min((sortedValues.length * percentile / 100.0).toInt, sortedValues.length - 1)
      sortedValues(index)
    }
  }

  def getCount: Int = values.get().length
  def getMin: Double = values.get().minOption.getOrElse(0.0)
  def getMax: Double = values.get().maxOption.getOrElse(0.0)
  def getMean: Double = {
    val vals = values.get()
    if (vals.isEmpty) 0.0 else vals.sum / vals.length
  }
}

// Performance instrumentation
object PerformanceInstrumentation {
  private val metricsCollector = new InMemoryMetricsCollector()

  def timed[T](name: String, tags: Map[String, String] = Map.empty)(operation: => T): T = {
    val start = System.nanoTime()
    try {
      val result = operation
      val duration = (System.nanoTime() - start).nanos
      metricsCollector.recordTimer(name, duration, tags)
      metricsCollector.incrementCounter(s"$name.success", tags)
      result
    } catch {
      case ex: Exception =>
        val duration = (System.nanoTime() - start).nanos
        metricsCollector.recordTimer(name, duration, tags + ("status" -> "error"))
        metricsCollector.incrementCounter(s"$name.error", tags)
        throw ex
    }
  }

  def counted[T](name: String, tags: Map[String, String] = Map.empty)(operation: => T): T = {
    metricsCollector.incrementCounter(name, tags)
    operation
  }

  def monitored[T](name: String, tags: Map[String, String] = Map.empty)(operation: => T): T = {
    val start = System.nanoTime()
    val startMemory = getUsedMemory

    try {
      val result = operation
      val endMemory = getUsedMemory
      val duration = (System.nanoTime() - start).nanos
      val memoryUsed = endMemory - startMemory

      metricsCollector.recordTimer(s"$name.duration", duration, tags)
      metricsCollector.recordHistogram(s"$name.memory", memoryUsed.toDouble, tags)
      metricsCollector.incrementCounter(s"$name.success", tags)

      result
    } catch {
      case ex: Exception =>
        val duration = (System.nanoTime() - start).nanos
        metricsCollector.recordTimer(s"$name.duration", duration, tags + ("status" -> "error"))
        metricsCollector.incrementCounter(s"$name.error", tags)
        throw ex
    }
  }

  private def getUsedMemory: Long = {
    val memoryBean = java.lang.management.ManagementFactory.getMemoryMXBean
    memoryBean.getHeapMemoryUsage.getUsed
  }

  def getMetrics: Map[String, Any] = metricsCollector.getAllMetrics

  def printMetricsReport(): Unit = {
    val metrics = getMetrics

    println("=== Performance Metrics Report ===")

    println("\nCounters:")
    metrics("counters").asInstanceOf[Map[String, Long]].foreach { case (name, value) =>
      println(f"  $name%-40s $value%10d")
    }

    println("\nTimers:")
    metrics("timers").asInstanceOf[Map[String, TimerStats]].foreach { case (name, stats) =>
      println(f"  $name%-40s count=${stats.getCount}%6d mean=${stats.getMean / 1000}%8.2fms min=${stats.getMin / 1000}%8.2fms max=${stats.getMax / 1000}%8.2fms")
    }

    println("\nGauges:")
    metrics("gauges").asInstanceOf[Map[String, Double]].foreach { case (name, value) =>
      println(f"  $name%-40s $value%10.2f")
    }

    println("\nHistograms:")
    metrics("histograms").asInstanceOf[Map[String, HistogramStats]].foreach { case (name, stats) =>
      println(f"  $name%-40s count=${stats.getCount}%6d p50=${stats.getPercentile(50)}%8.2f p95=${stats.getPercentile(95)}%8.2f p99=${stats.getPercentile(99)}%8.2f")
    }
  }
}

// Circuit breaker pattern for performance protection
class CircuitBreaker(
  name: String,
  failureThreshold: Int = 5,
  timeout: Duration = 60.seconds,
  resetTimeout: Duration = 60.seconds
) {

  sealed trait State
  case object Closed extends State
  case object Open extends State
  case object HalfOpen extends State

  private val state = new AtomicReference[State](Closed)
  private val failureCount = new AtomicLong(0)
  private val lastFailureTime = new AtomicLong(0)
  private val lastSuccessTime = new AtomicLong(System.currentTimeMillis())

  def execute[T](operation: => T): Try[T] = {
    state.get() match {
      case Closed =>
        executeInClosedState(operation)
      case Open =>
        if (shouldAttemptReset()) {
          state.set(HalfOpen)
          executeInHalfOpenState(operation)
        } else {
          Failure(new RuntimeException(s"Circuit breaker $name is OPEN"))
        }
      case HalfOpen =>
        executeInHalfOpenState(operation)
    }
  }

  private def executeInClosedState[T](operation: => T): Try[T] = {
    Try(operation) match {
      case success @ Success(_) =>
        onSuccess()
        success
      case failure @ Failure(_) =>
        onFailure()
        failure
    }
  }

  private def executeInHalfOpenState[T](operation: => T): Try[T] = {
    Try(operation) match {
      case success @ Success(_) =>
        onSuccess()
        state.set(Closed)
        success
      case failure @ Failure(_) =>
        onFailure()
        state.set(Open)
        failure
    }
  }

  private def onSuccess(): Unit = {
    failureCount.set(0)
    lastSuccessTime.set(System.currentTimeMillis())
    PerformanceInstrumentation.metricsCollector.incrementCounter(s"circuit_breaker.$name.success")
  }

  private def onFailure(): Unit = {
    val failures = failureCount.incrementAndGet()
    lastFailureTime.set(System.currentTimeMillis())

    if (failures >= failureThreshold) {
      state.set(Open)
      PerformanceInstrumentation.metricsCollector.incrementCounter(s"circuit_breaker.$name.opened")
    }

    PerformanceInstrumentation.metricsCollector.incrementCounter(s"circuit_breaker.$name.failure")
  }

  private def shouldAttemptReset(): Boolean = {
    System.currentTimeMillis() - lastFailureTime.get() > resetTimeout.toMillis
  }

  def getState: State = state.get()
  def getFailureCount: Long = failureCount.get()
  def isOpen: Boolean = state.get() == Open
  def isClosed: Boolean = state.get() == Closed
}

// Performance testing utilities
object PerformanceTestUtils {

  def loadTest[T](
    name: String,
    operation: => T,
    duration: Duration = 60.seconds,
    concurrency: Int = 10
  ): LoadTestResult = {

    import scala.concurrent.{Future, ExecutionContext}
    import java.util.concurrent.Executors

    implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(
      Executors.newFixedThreadPool(concurrency)
    )

    val endTime = System.currentTimeMillis() + duration.toMillis
    val results = new AtomicLong(0)
    val errors = new AtomicLong(0)
    val responseTimes = new java.util.concurrent.ConcurrentLinkedQueue[Long]()

    val futures = (1 to concurrency).map { _ =>
      Future {
        while (System.currentTimeMillis() < endTime) {
          val start = System.nanoTime()
          try {
            operation
            val responseTime = (System.nanoTime() - start) / 1000000 // Convert to milliseconds
            responseTimes.offer(responseTime)
            results.incrementAndGet()
          } catch {
            case _: Exception =>
              errors.incrementAndGet()
          }
        }
      }
    }

    // Wait for all futures to complete
    import scala.concurrent.Await
    Await.ready(Future.sequence(futures), duration + 10.seconds)

    val responseTimesList = responseTimes.asScala.toList.sorted

    LoadTestResult(
      name = name,
      duration = duration,
      concurrency = concurrency,
      totalRequests = results.get(),
      totalErrors = errors.get(),
      requestsPerSecond = results.get().toDouble / duration.toSeconds,
      averageResponseTime = if (responseTimesList.nonEmpty) responseTimesList.sum.toDouble / responseTimesList.length else 0.0,
      medianResponseTime = if (responseTimesList.nonEmpty) responseTimesList(responseTimesList.length / 2) else 0L,
      p95ResponseTime = if (responseTimesList.nonEmpty) responseTimesList((responseTimesList.length * 0.95).toInt) else 0L,
      p99ResponseTime = if (responseTimesList.nonEmpty) responseTimesList((responseTimesList.length * 0.99).toInt) else 0L,
      maxResponseTime = if (responseTimesList.nonEmpty) responseTimesList.last else 0L
    )
  }

  case class LoadTestResult(
    name: String,
    duration: Duration,
    concurrency: Int,
    totalRequests: Long,
    totalErrors: Long,
    requestsPerSecond: Double,
    averageResponseTime: Double,
    medianResponseTime: Long,
    p95ResponseTime: Long,
    p99ResponseTime: Long,
    maxResponseTime: Long
  ) {
    def errorRate: Double = totalErrors.toDouble / totalRequests

    def printReport(): Unit = {
      println(s"""
        |=== Load Test Report: $name ===
        |Duration: ${duration.toSeconds}s
        |Concurrency: $concurrency
        |Total Requests: $totalRequests
        |Total Errors: $totalErrors
        |Error Rate: ${(errorRate * 100).formatted("%.2f")}%
        |Requests/second: ${requestsPerSecond.formatted("%.2f")}
        |Response Times:
        |  Average: ${averageResponseTime.formatted("%.2f")}ms
        |  Median:  ${medianResponseTime}ms
        |  P95:     ${p95ResponseTime}ms
        |  P99:     ${p99ResponseTime}ms
        |  Max:     ${maxResponseTime}ms
        """.stripMargin)
    }
  }
}

Conclusion

Performance optimization and profiling are essential skills for building high-performance Scala applications. Key concepts include:

JVM Performance:

  • Memory management and garbage collection optimization
  • CPU optimization techniques and algorithms
  • Data structure selection and optimization
  • Object pooling and resource management

Profiling Tools:

  • JVM built-in profiling capabilities
  • Java Flight Recorder (JFR) integration
  • Memory and allocation profiling
  • Thread contention analysis

Benchmarking:

  • JMH microbenchmarking framework
  • Performance regression testing
  • Load testing and stress testing
  • Comparative performance analysis

Monitoring and Observability:

  • Application Performance Monitoring (APM)
  • Metrics collection and reporting
  • Real-time performance monitoring
  • Circuit breaker patterns for resilience

Optimization Strategies:

  • Algorithmic improvements
  • Data structure optimization
  • Parallel and concurrent processing
  • Cache-friendly programming patterns

Performance Testing:

  • Load testing methodologies
  • Performance baseline establishment
  • Regression detection
  • Capacity planning

Production Monitoring:

  • Real-time metrics collection
  • Performance alerting
  • Trend analysis
  • Capacity monitoring

Best Practices:

  • Profile before optimizing
  • Measure performance impact
  • Focus on bottlenecks
  • Consider readability vs performance trade-offs

Systematic performance optimization enables teams to build scalable, efficient Scala applications that meet enterprise performance requirements while maintaining code quality and maintainability.