Parallel Collections: Data Parallelism

Introduction

Parallel collections in Scala provide an elegant way to leverage multi-core processors for data-parallel computations. By simply adding .par to most collection operations, you can automatically distribute work across multiple cores, significantly improving performance for CPU-intensive tasks.

This lesson will teach you how to use parallel collections effectively, understand when parallelization helps, and avoid common pitfalls while building high-performance applications.

Understanding Parallel Collections

Basic Parallel Operations

import scala.collection.parallel.CollectionConverters._

// Converting collections to parallel
val numbers = (1 to 1000000).toList
val parallelNumbers = numbers.par

println(s"Original collection type: ${numbers.getClass.getSimpleName}")
println(s"Parallel collection type: ${parallelNumbers.getClass.getSimpleName}")

// Timing helper function
def timeOperation[T](name: String)(operation: => T): T = {
  val start = System.nanoTime()
  val result = operation
  val end = System.nanoTime()
  println(f"$name: ${(end - start) / 1e6}%.2f ms")
  result
}

// CPU-intensive operation for demonstration
def expensiveFunction(n: Int): Int = {
  var result = n
  for (_ <- 1 to 1000) {
    result = (result * 1.1).toInt
  }
  result
}

// Compare sequential vs parallel performance
val data = (1 to 100000).toList

timeOperation("Sequential map") {
  val result = data.map(expensiveFunction)
  result.length
}

timeOperation("Parallel map") {
  val result = data.par.map(expensiveFunction)
  result.length
}

// Parallel filter operations
timeOperation("Sequential filter") {
  val result = numbers.filter(_ % 17 == 0)
  result.length
}

timeOperation("Parallel filter") {
  val result = numbers.par.filter(_ % 17 == 0)
  result.length
}

// Parallel reduce operations
timeOperation("Sequential sum") {
  numbers.sum
}

timeOperation("Parallel sum") {
  numbers.par.sum
}

// Custom parallel reduce
timeOperation("Sequential reduce") {
  numbers.reduce(_ + _)
}

timeOperation("Parallel reduce") {
  numbers.par.reduce(_ + _)
}

// Parallel foreach (for side effects)
import java.util.concurrent.atomic.AtomicInteger

val counter = new AtomicInteger(0)

timeOperation("Sequential foreach") {
  data.foreach(_ => counter.incrementAndGet())
}

counter.set(0)

timeOperation("Parallel foreach") {
  data.par.foreach(_ => counter.incrementAndGet())
}

println(s"Final counter value: ${counter.get()}")

// Parallel find operations
val target = 567890

timeOperation("Sequential find") {
  numbers.find(_ == target)
}

timeOperation("Parallel find") {
  numbers.par.find(_ == target)
}

// Parallel exists and forall
timeOperation("Sequential exists") {
  numbers.exists(_ > 999000)
}

timeOperation("Parallel exists") {
  numbers.par.exists(_ > 999000)
}

timeOperation("Sequential forall") {
  numbers.forall(_ > 0)
}

timeOperation("Parallel forall") {
  numbers.par.forall(_ > 0)
}

Parallel Collection Types

// Different parallel collection types
val list = List(1, 2, 3, 4, 5)
val vector = Vector(1, 2, 3, 4, 5)
val array = Array(1, 2, 3, 4, 5)
val range = (1 to 5)
val set = Set(1, 2, 3, 4, 5)
val map = Map("a" -> 1, "b" -> 2, "c" -> 3)

// Converting to parallel collections
val parList = list.par
val parVector = vector.par
val parArray = array.par
val parRange = range.par
val parSet = set.par
val parMap = map.par

println("Parallel collection types:")
println(s"Parallel List: ${parList.getClass.getSimpleName}")
println(s"Parallel Vector: ${parVector.getClass.getSimpleName}")
println(s"Parallel Array: ${parArray.getClass.getSimpleName}")
println(s"Parallel Range: ${parRange.getClass.getSimpleName}")
println(s"Parallel Set: ${parSet.getClass.getSimpleName}")
println(s"Parallel Map: ${parMap.getClass.getSimpleName}")

// Converting back to sequential
val backToSeq = parList.seq
println(s"Back to sequential: ${backToSeq.getClass.getSimpleName}")

// Parallel string operations
val text = "The quick brown fox jumps over the lazy dog " * 10000
val words = text.split("\\s+").toVector

timeOperation("Sequential word processing") {
  words
    .filter(_.length > 3)
    .map(_.toUpperCase)
    .groupBy(_.head)
    .view.mapValues(_.length)
    .toMap
    .size
}

timeOperation("Parallel word processing") {
  words.par
    .filter(_.length > 3)
    .map(_.toUpperCase)
    .groupBy(_.head)
    .view.mapValues(_.length)
    .toMap
    .size
}

// Parallel matrix operations
case class Matrix(data: Vector[Vector[Double]]) {
  def +(other: Matrix): Matrix = {
    Matrix(
      data.zip(other.data).par.map { case (row1, row2) =>
        row1.zip(row2).map { case (a, b) => a + b }
      }.seq
    )
  }

  def *(scalar: Double): Matrix = {
    Matrix(
      data.par.map(row => row.map(_ * scalar)).seq
    )
  }

  def transpose: Matrix = {
    Matrix(
      data.head.indices.par.map { col =>
        data.map(_(col))
      }.seq.toVector
    )
  }
}

// Create test matrices
val size = 500
val matrix1 = Matrix(
  Vector.fill(size)(Vector.fill(size)(scala.util.Random.nextDouble()))
)
val matrix2 = Matrix(
  Vector.fill(size)(Vector.fill(size)(scala.util.Random.nextDouble()))
)

timeOperation("Matrix addition") {
  val result = matrix1 + matrix2
  result.data.length
}

timeOperation("Matrix scalar multiplication") {
  val result = matrix1 * 2.5
  result.data.length
}

timeOperation("Matrix transpose") {
  val result = matrix1.transpose
  result.data.length
}

// Parallel groupBy operations
case class Student(name: String, grade: Int, subject: String, score: Double)

val students = (1 to 10000).map { i =>
  Student(
    s"Student$i",
    (i % 4) + 9,  // Grades 9-12
    List("Math", "Science", "English", "History")(i % 4),
    50 + scala.util.Random.nextDouble() * 50  // Scores 50-100
  )
}.toVector

timeOperation("Sequential groupBy grade") {
  students.groupBy(_.grade).view.mapValues(_.length).toMap
}

timeOperation("Parallel groupBy grade") {
  students.par.groupBy(_.grade).view.mapValues(_.length).toMap
}

// Advanced parallel aggregations
timeOperation("Sequential grade analysis") {
  students
    .groupBy(_.subject)
    .view.mapValues { subjectStudents =>
      val scores = subjectStudents.map(_.score)
      Map(
        "average" -> scores.sum / scores.length,
        "max" -> scores.max,
        "min" -> scores.min,
        "count" -> scores.length
      )
    }.toMap
}

timeOperation("Parallel grade analysis") {
  students.par
    .groupBy(_.subject)
    .view.mapValues { subjectStudents =>
      val scores = subjectStudents.map(_.score)
      Map(
        "average" -> scores.sum / scores.length,
        "max" -> scores.max,
        "min" -> scores.min,
        "count" -> scores.length
      )
    }.toMap
}

Configuring Parallelism

Task Support and Thread Pools

import scala.collection.parallel.{ForkJoinTaskSupport, ExecutorServiceTaskSupport}
import java.util.concurrent.{ForkJoinPool, Executors}

// Default parallelism level
val defaultParCollection = (1 to 1000).par
println(s"Default parallelism level: ${defaultParCollection.tasksupport.parallelismLevel}")

// Custom ForkJoinPool
val customPool = new ForkJoinPool(8)  // 8 threads
val customParCollection = (1 to 1000).par
customParCollection.tasksupport = new ForkJoinTaskSupport(customPool)

println(s"Custom parallelism level: ${customParCollection.tasksupport.parallelismLevel}")

// Timing with different parallelism levels
def testParallelismLevel(level: Int, data: Vector[Int]): Unit = {
  val pool = new ForkJoinPool(level)
  val parCollection = data.par
  parCollection.tasksupport = new ForkJoinTaskSupport(pool)

  val time = timeOperation(s"Parallelism level $level") {
    parCollection.map(expensiveFunction).length
  }

  pool.shutdown()
}

val testData = (1 to 10000).toVector

// Test different parallelism levels
List(1, 2, 4, 8, 16).foreach(level => testParallelismLevel(level, testData))

// Using ExecutorService instead of ForkJoinPool
val executorService = Executors.newFixedThreadPool(4)
val executorParCollection = testData.par
executorParCollection.tasksupport = new ExecutorServiceTaskSupport(executorService)

timeOperation("ExecutorService TaskSupport") {
  executorParCollection.map(x => x * x).length
}

executorService.shutdown()

// Configuring threshold for parallel operations
class ConfigurableParallelCollection[T](data: Vector[T]) {
  def mapWithThreshold[R](f: T => R, threshold: Int): Vector[R] = {
    if (data.length < threshold) {
      data.map(f)
    } else {
      data.par.map(f).seq
    }
  }

  def filterWithThreshold(p: T => Boolean, threshold: Int): Vector[T] = {
    if (data.length < threshold) {
      data.filter(p)
    } else {
      data.par.filter(p).seq
    }
  }
}

val configurableCollection = new ConfigurableParallelCollection((1 to 1000).toVector)

timeOperation("Small threshold (sequential)") {
  configurableCollection.mapWithThreshold(_ * 2, threshold = 10000).length
}

timeOperation("Large threshold (parallel)") {
  configurableCollection.mapWithThreshold(_ * 2, threshold = 100).length
}

// Dynamic parallelism based on system resources
object AdaptiveParallelism {
  val availableProcessors = Runtime.getRuntime.availableProcessors()

  def createOptimalParallelCollection[T](data: Vector[T]): Vector[T] = {
    val optimalThreads = math.max(2, availableProcessors - 1)  // Leave one core for OS
    val pool = new ForkJoinPool(optimalThreads)

    val parData = data.par
    parData.tasksupport = new ForkJoinTaskSupport(pool)

    parData.seq  // Return the configured parallel collection
  }

  def processWithAdaptiveParallelism[T, R](
    data: Vector[T], 
    operation: T => R,
    minSizeForParallel: Int = 1000
  ): Vector[R] = {
    if (data.length < minSizeForParallel) {
      data.map(operation)
    } else {
      val parData = data.par
      parData.tasksupport = new ForkJoinTaskSupport(
        new ForkJoinPool(availableProcessors)
      )
      parData.map(operation).seq
    }
  }
}

println(s"Available processors: ${AdaptiveParallelism.availableProcessors}")

val largeDataset = (1 to 50000).toVector

timeOperation("Adaptive parallel processing") {
  AdaptiveParallelism.processWithAdaptiveParallelism(
    largeDataset, 
    (x: Int) => math.sqrt(x.toDouble).toInt
  ).length
}

// Nested parallelism considerations
def nestedParallelOperation(data: Vector[Vector[Int]]): Vector[Vector[Int]] = {
  // Outer parallelism
  data.par.map { innerVector =>
    // Inner parallelism - be careful with nested parallelism
    if (innerVector.length > 1000) {
      innerVector.par.map(_ * 2).seq
    } else {
      innerVector.map(_ * 2)
    }
  }.seq
}

val nestedData = Vector.fill(100)(Vector.fill(1000)(scala.util.Random.nextInt(1000)))

timeOperation("Nested parallel operation") {
  nestedParallelOperation(nestedData).length
}

// Memory considerations with parallel collections
def memoryEfficientParallelProcessing[T, R](
  data: Vector[T],
  operation: T => R,
  chunkSize: Int = 10000
): Vector[R] = {

  data.grouped(chunkSize).map { chunk =>
    chunk.par.map(operation).seq
  }.flatten.toVector
}

val hugeDataset = (1 to 100000).toVector

timeOperation("Memory-efficient parallel processing") {
  memoryEfficientParallelProcessing(
    hugeDataset,
    (x: Int) => expensiveFunction(x),
    chunkSize = 5000
  ).length
}

When to Use Parallel Collections

Performance Analysis and Guidelines

// Measuring scalability
def measureScalability[T](
  data: Vector[Int],
  operation: Int => T,
  maxThreads: Int = 16
): Unit = {

  println(s"Scalability analysis for ${data.length} elements:")
  println("Threads\tTime (ms)\tSpeedup\tEfficiency")

  // Sequential baseline
  val sequentialTime = timeOperation("Sequential baseline") {
    data.map(operation)
  }

  (1 to maxThreads by 2).foreach { threads =>
    val pool = new ForkJoinPool(threads)
    val parData = data.par
    parData.tasksupport = new ForkJoinTaskSupport(pool)

    val parallelTime = {
      val start = System.nanoTime()
      parData.map(operation)
      val end = System.nanoTime()
      (end - start) / 1e6
    }

    val speedup = sequentialTime / parallelTime
    val efficiency = speedup / threads

    println(f"$threads\t$parallelTime%.2f\t\t$speedup%.2f\t$efficiency%.2f")

    pool.shutdown()
  }
}

// Test with CPU-intensive operation
println("CPU-intensive operation scalability:")
measureScalability((1 to 20000).toVector, expensiveFunction)

// Test with less CPU-intensive operation
println("\nSimple operation scalability:")
measureScalability((1 to 100000).toVector, (x: Int) => x * x + 1)

// Guidelines for when to use parallel collections
object ParallelCollectionGuidelines {

  // 1. CPU-intensive operations benefit most
  def cpuIntensiveExample(): Unit = {
    val data = (1 to 50000).toVector

    // Good candidate: complex mathematical operations
    timeOperation("Sequential: Complex math") {
      data.map(n => math.pow(math.sin(n), 2) + math.cos(n)).length
    }

    timeOperation("Parallel: Complex math") {
      data.par.map(n => math.pow(math.sin(n), 2) + math.cos(n)).length
    }
  }

  // 2. Simple operations may not benefit
  def simpleOperationExample(): Unit = {
    val data = (1 to 1000000).toVector

    // Poor candidate: simple operations with overhead
    timeOperation("Sequential: Simple increment") {
      data.map(_ + 1).length
    }

    timeOperation("Parallel: Simple increment") {
      data.par.map(_ + 1).length
    }
  }

  // 3. I/O operations are usually not suitable
  def ioOperationExample(): Unit = {
    val urls = (1 to 100).map(i => s"http://example.com/api/$i").toVector

    // Not suitable: I/O bound operations
    def simulateHttpCall(url: String): String = {
      Thread.sleep(10)  // Simulate network latency
      s"Response from $url"
    }

    timeOperation("Sequential: I/O simulation") {
      urls.map(simulateHttpCall).length
    }

    timeOperation("Parallel: I/O simulation") {
      urls.par.map(simulateHttpCall).length
    }
  }

  // 4. Data size matters
  def dataSizeExample(): Unit = {
    val sizes = List(100, 1000, 10000, 100000)

    sizes.foreach { size =>
      val data = (1 to size).toVector

      println(s"\nData size: $size")

      timeOperation("Sequential") {
        data.map(expensiveFunction).length
      }

      timeOperation("Parallel") {
        data.par.map(expensiveFunction).length
      }
    }
  }

  // 5. Shared mutable state is problematic
  def sharedStateExample(): Unit = {
    val data = (1 to 100000).toVector

    // Problematic: shared mutable state
    var sum = 0

    timeOperation("Sequential with shared state") {
      data.foreach(sum += _)
    }

    println(s"Sequential sum: $sum")

    // This will produce incorrect results!
    sum = 0
    timeOperation("Parallel with shared state (WRONG!)") {
      data.par.foreach(sum += _)
    }

    println(s"Parallel sum (incorrect): $sum")

    // Correct approach: use reduce or atomic operations
    timeOperation("Parallel with reduce (CORRECT)") {
      val correctSum = data.par.reduce(_ + _)
      println(s"Parallel sum (correct): $correctSum")
    }
  }
}

println("\n=== Parallel Collection Guidelines ===")

println("\n1. CPU-intensive operations:")
ParallelCollectionGuidelines.cpuIntensiveExample()

println("\n2. Simple operations:")
ParallelCollectionGuidelines.simpleOperationExample()

println("\n3. I/O operations:")
ParallelCollectionGuidelines.ioOperationExample()

println("\n4. Data size impact:")
ParallelCollectionGuidelines.dataSizeExample()

println("\n5. Shared state problems:")
ParallelCollectionGuidelines.sharedStateExample()

// Performance profiling helper
class ParallelPerformanceProfiler {
  def profile[T, R](
    data: Vector[T],
    operation: T => R,
    name: String = "Operation"
  ): Unit = {

    println(s"\n=== Profiling: $name ===")
    println(s"Data size: ${data.length}")

    // Memory usage before
    val runtime = Runtime.getRuntime
    runtime.gc()
    val memoryBefore = runtime.totalMemory() - runtime.freeMemory()

    // Sequential execution
    val seqTime = {
      val start = System.nanoTime()
      data.map(operation)
      val end = System.nanoTime()
      (end - start) / 1e6
    }

    runtime.gc()
    val memoryAfterSeq = runtime.totalMemory() - runtime.freeMemory()

    // Parallel execution
    val parTime = {
      val start = System.nanoTime()
      data.par.map(operation)
      val end = System.nanoTime()
      (end - start) / 1e6
    }

    runtime.gc()
    val memoryAfterPar = runtime.totalMemory() - runtime.freeMemory()

    // Results
    val speedup = seqTime / parTime
    val efficiency = speedup / Runtime.getRuntime.availableProcessors()

    println(f"Sequential time: $seqTime%.2f ms")
    println(f"Parallel time: $parTime%.2f ms")
    println(f"Speedup: $speedup%.2fx")
    println(f"Efficiency: ${efficiency * 100}%.1f%%")
    println(f"Memory (sequential): ${(memoryAfterSeq - memoryBefore) / 1024 / 1024}%.1f MB")
    println(f"Memory (parallel): ${(memoryAfterPar - memoryAfterSeq) / 1024 / 1024}%.1f MB")

    if (speedup > 1.2) {
      println("✓ Good parallelization candidate")
    } else if (speedup > 0.8) {
      println("⚠ Marginal benefit from parallelization")
    } else {
      println("✗ Poor parallelization candidate")
    }
  }
}

val profiler = new ParallelPerformanceProfiler()

// Profile different operations
profiler.profile(
  (1 to 10000).toVector,
  (x: Int) => math.sqrt(x * x + 1),
  "Square root calculation"
)

profiler.profile(
  (1 to 100000).toVector,
  (x: Int) => x.toString.length,
  "String conversion"
)

profiler.profile(
  (1 to 5000).toVector,
  expensiveFunction,
  "Expensive computation"
)

Advanced Parallel Patterns

Custom Parallel Algorithms

// Parallel merge sort implementation
object ParallelMergeSort {
  def mergeSort[T](arr: Array[T])(implicit ord: Ordering[T]): Array[T] = {
    if (arr.length <= 1) arr
    else {
      val mid = arr.length / 2
      val (left, right) = arr.splitAt(mid)

      // Parallel execution of recursive calls
      val sortedLeft = if (left.length > 1000) {
        // Use parallel for large arrays
        Future { mergeSort(left) }(scala.concurrent.ExecutionContext.global)
      } else {
        scala.concurrent.Future.successful(mergeSort(left))
      }

      val sortedRight = if (right.length > 1000) {
        Future { mergeSort(right) }(scala.concurrent.ExecutionContext.global)
      } else {
        scala.concurrent.Future.successful(mergeSort(right))
      }

      // Merge results
      import scala.concurrent.Await
      import scala.concurrent.duration._

      merge(Await.result(sortedLeft, 10.seconds), Await.result(sortedRight, 10.seconds))
    }
  }

  private def merge[T](left: Array[T], right: Array[T])(implicit ord: Ordering[T]): Array[T] = {
    val result = new Array[T](left.length + right.length)(scala.reflect.ClassTag(left.getClass.getComponentType))
    var i, j, k = 0

    while (i < left.length && j < right.length) {
      if (ord.lteq(left(i), right(j))) {
        result(k) = left(i)
        i += 1
      } else {
        result(k) = right(j)
        j += 1
      }
      k += 1
    }

    while (i < left.length) {
      result(k) = left(i)
      i += 1
      k += 1
    }

    while (j < right.length) {
      result(k) = right(j)
      j += 1
      k += 1
    }

    result
  }
}

// Test parallel merge sort
val unsortedArray = Array.fill(50000)(scala.util.Random.nextInt(100000))

timeOperation("Parallel merge sort") {
  val sorted = ParallelMergeSort.mergeSort(unsortedArray.clone())
  sorted.length
}

timeOperation("Built-in sort") {
  val sorted = unsortedArray.clone()
  scala.util.Sorting.quickSort(sorted)
  sorted.length
}

// Parallel matrix multiplication
class ParallelMatrix(val data: Array[Array[Double]]) {
  val rows: Int = data.length
  val cols: Int = if (rows > 0) data(0).length else 0

  def *(other: ParallelMatrix): ParallelMatrix = {
    require(this.cols == other.rows, "Matrix dimensions don't match")

    val result = Array.ofDim[Double](this.rows, other.cols)

    // Parallel computation of result matrix
    (0 until this.rows).par.foreach { i =>
      (0 until other.cols).par.foreach { j =>
        result(i)(j) = (0 until this.cols).map { k =>
          this.data(i)(k) * other.data(k)(j)
        }.sum
      }
    }

    new ParallelMatrix(result)
  }

  def +(other: ParallelMatrix): ParallelMatrix = {
    require(this.rows == other.rows && this.cols == other.cols, "Matrix dimensions don't match")

    val result = Array.ofDim[Double](rows, cols)

    (0 until rows).par.foreach { i =>
      (0 until cols).foreach { j =>
        result(i)(j) = this.data(i)(j) + other.data(i)(j)
      }
    }

    new ParallelMatrix(result)
  }
}

// Test parallel matrix operations
val matrix1 = new ParallelMatrix(Array.fill(200, 200)(scala.util.Random.nextDouble()))
val matrix2 = new ParallelMatrix(Array.fill(200, 200)(scala.util.Random.nextDouble()))

timeOperation("Parallel matrix addition") {
  val result = matrix1 + matrix2
  result.rows
}

timeOperation("Parallel matrix multiplication") {
  val result = matrix1 * matrix2
  result.rows
}

// Parallel search algorithms
object ParallelSearch {
  def parallelBinarySearch[T](
    arr: Array[T], 
    target: T,
    threshold: Int = 10000
  )(implicit ord: Ordering[T]): Option[Int] = {

    def search(start: Int, end: Int): Option[Int] = {
      if (start > end) None
      else if (end - start < threshold) {
        // Use sequential search for small ranges
        sequentialBinarySearch(arr, target, start, end)
      } else {
        val mid = start + (end - start) / 2
        val midValue = arr(mid)

        if (ord.equiv(midValue, target)) Some(mid)
        else if (ord.lt(midValue, target)) {
          // Search right half in parallel
          import scala.concurrent.{Future, ExecutionContext}
          implicit val ec: ExecutionContext = ExecutionContext.global

          val leftFuture = Future { search(start, mid - 1) }
          val rightResult = search(mid + 1, end)

          rightResult.orElse {
            import scala.concurrent.Await
            import scala.concurrent.duration._
            Await.result(leftFuture, 5.seconds)
          }
        } else {
          search(start, mid - 1)
        }
      }
    }

    search(0, arr.length - 1)
  }

  private def sequentialBinarySearch[T](
    arr: Array[T], 
    target: T, 
    start: Int, 
    end: Int
  )(implicit ord: Ordering[T]): Option[Int] = {
    var left = start
    var right = end

    while (left <= right) {
      val mid = left + (right - left) / 2
      val midValue = arr(mid)

      if (ord.equiv(midValue, target)) return Some(mid)
      else if (ord.lt(midValue, target)) left = mid + 1
      else right = mid - 1
    }

    None
  }
}

// Test parallel search
val sortedArray = (1 to 1000000).toArray
val searchTarget = 567890

timeOperation("Sequential binary search") {
  sortedArray.indexOf(searchTarget)
}

timeOperation("Parallel binary search") {
  ParallelSearch.parallelBinarySearch(sortedArray, searchTarget)
}

// Parallel aggregation patterns
object ParallelAggregation {

  case class Statistics(count: Long, sum: Double, min: Double, max: Double) {
    def mean: Double = if (count > 0) sum / count else 0.0

    def combine(other: Statistics): Statistics = {
      Statistics(
        count + other.count,
        sum + other.sum,
        math.min(min, other.min),
        math.max(max, other.max)
      )
    }
  }

  def computeStatistics(data: Vector[Double]): Statistics = {
    if (data.isEmpty) {
      Statistics(0, 0.0, Double.MaxValue, Double.MinValue)
    } else {
      data.par.aggregate(Statistics(0, 0.0, Double.MaxValue, Double.MinValue))(
        // Sequential operation: combine element with accumulator
        (stats, value) => Statistics(
          stats.count + 1,
          stats.sum + value,
          math.min(stats.min, value),
          math.max(stats.max, value)
        ),
        // Parallel operation: combine two accumulators
        (stats1, stats2) => stats1.combine(stats2)
      )
    }
  }

  def parallelGroupStatistics[K](
    data: Vector[(K, Double)]
  ): Map[K, Statistics] = {
    data.par
      .groupBy(_._1)
      .view
      .mapValues(pairs => computeStatistics(pairs.map(_._2).toVector))
      .toMap
  }
}

// Test parallel aggregation
val statisticsData = Vector.fill(100000)(scala.util.Random.nextDouble() * 100)

timeOperation("Parallel statistics computation") {
  val stats = ParallelAggregation.computeStatistics(statisticsData)
  println(f"Count: ${stats.count}, Mean: ${stats.mean}%.2f, Min: ${stats.min}%.2f, Max: ${stats.max}%.2f")
}

val groupedData = (1 to 50000).map(i => 
  (s"Group${i % 10}", scala.util.Random.nextDouble() * 100)
).toVector

timeOperation("Parallel group statistics") {
  val groupStats = ParallelAggregation.parallelGroupStatistics(groupedData)
  println(s"Computed statistics for ${groupStats.size} groups")
}

Summary

In this lesson, you've mastered parallel collections and data parallelism:

Parallel Basics: Converting collections and basic operations
Configuration: Task support and thread pool management
Performance Analysis: When and how to use parallelization
Guidelines: Identifying good candidates for parallelization
Advanced Patterns: Custom parallel algorithms and aggregations
Best Practices: Avoiding pitfalls and optimizing performance
Real-world Applications: Matrix operations, search, and statistics

Parallel collections provide an easy way to leverage multi-core processors, but understanding when and how to use them effectively is crucial for building high-performance applications.

What's Next

In the next lesson, we'll explore the Actor model and Akka, learning how to build concurrent, distributed systems using message-passing concurrency that scales from single machines to clusters.