Big Data Processing: Apache Spark and Distributed Computing

Apache Spark has revolutionized big data processing by providing a unified analytics engine for large-scale data processing. Written in Scala, Spark offers high-performance distributed computing with APIs for batch processing, streaming, machine learning, and graph processing. In this lesson, we'll explore comprehensive Spark programming in Scala.

Understanding Apache Spark Architecture

Core Components

  • Spark Core: Foundation with RDDs and basic operations
  • Spark SQL: Structured data processing with DataFrames and Datasets
  • Spark Streaming: Real-time stream processing
  • MLlib: Machine learning library
  • GraphX: Graph processing framework

Execution Model

  • Driver Program: Coordinates execution and maintains application state
  • Cluster Manager: Allocates resources (YARN, Mesos, Kubernetes, Standalone)
  • Executors: Worker processes that execute tasks and store data
  • Tasks: Units of work sent to executors

Setting Up Spark Environment

Dependencies and Configuration

// build.sbt
val SparkVersion = "3.5.0"
val HadoopVersion = "3.3.4"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % SparkVersion,
  "org.apache.spark" %% "spark-sql" % SparkVersion,
  "org.apache.spark" %% "spark-streaming" % SparkVersion,
  "org.apache.spark" %% "spark-mllib" % SparkVersion,
  "org.apache.spark" %% "spark-graphx" % SparkVersion,

  // Connectors
  "org.apache.spark" %% "spark-sql-kafka-0-10" % SparkVersion,
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % SparkVersion,
  "mysql" % "mysql-connector-java" % "8.0.33",
  "org.postgresql" % "postgresql" % "42.6.0",

  // File formats
  "org.apache.spark" %% "spark-avro" % SparkVersion,
  "com.databricks" %% "spark-xml" % "0.16.0",

  // Testing
  "com.holdenkarau" %% "spark-testing-base" % s"${SparkVersion}_1.4.7" % Test
)

Spark Session Configuration

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// Create optimized Spark session
def createSparkSession(appName: String): SparkSession = {
  SparkSession.builder()
    .appName(appName)
    .master("local[*]") // Use all available cores locally
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .config("spark.sql.adaptive.skewJoin.enabled", "true")
    .config("spark.sql.parquet.compression.codec", "snappy")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")
    .getOrCreate()
}

implicit val spark: SparkSession = createSparkSession("BigDataProcessing")
import spark.implicits._

// For RDD operations
implicit val sc: SparkContext = spark.sparkContext

Working with RDDs (Resilient Distributed Datasets)

Creating and Basic Operations

import org.apache.spark.rdd.RDD

// Create RDDs from collections
val numbersRDD: RDD[Int] = sc.parallelize(1 to 1000000)
val textRDD: RDD[String] = sc.parallelize(Seq("hello", "world", "spark", "scala"))

// Create RDD from external data
val fileRDD: RDD[String] = sc.textFile("hdfs://path/to/large/file.txt")
val linesRDD: RDD[String] = sc.wholeTextFiles("hdfs://path/to/directory/*").values

// Transformations (lazy evaluation)
val squaredRDD = numbersRDD.map(x => x * x)
val evenRDD = numbersRDD.filter(_ % 2 == 0)
val wordCountRDD = textRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

// Actions (trigger computation)
val sum = numbersRDD.reduce(_ + _)
val count = numbersRDD.count()
val first10 = numbersRDD.take(10)
val samples = numbersRDD.sample(withReplacement = false, fraction = 0.1, seed = 42)

println(s"Sum: $sum, Count: $count")

Advanced RDD Operations

// Key-value pair operations
case class Sale(product: String, amount: Double, date: String, region: String)

val salesData = Seq(
  Sale("laptop", 1200.0, "2023-01-15", "North"),
  Sale("phone", 800.0, "2023-01-16", "South"),
  Sale("laptop", 1100.0, "2023-01-17", "North"),
  Sale("tablet", 600.0, "2023-01-18", "East")
)

val salesRDD = sc.parallelize(salesData)

// Group by key operations
val salesByProduct = salesRDD
  .map(sale => (sale.product, sale.amount))
  .groupByKey()
  .mapValues(amounts => amounts.sum)

val salesByRegion = salesRDD
  .map(sale => (sale.region, sale.amount))
  .reduceByKey(_ + _)

// Join operations
val productInfo = sc.parallelize(Seq(
  ("laptop", "Electronics"),
  ("phone", "Electronics"),
  ("tablet", "Electronics")
))

val salesWithCategory = salesRDD
  .map(sale => (sale.product, sale))
  .join(productInfo)
  .map { case (product, (sale, category)) => 
    (product, sale.amount, category, sale.region)
  }

// Aggregation operations
val salesStats = salesRDD
  .map(sale => (sale.region, sale.amount))
  .aggregateByKey((0.0, 0, 0.0, Double.MaxValue, Double.MinValue))(
    // Combine within partition
    { case ((sum, count, _, min, max), amount) =>
      (sum + amount, count + 1, 0.0, math.min(min, amount), math.max(max, amount))
    },
    // Combine across partitions  
    { case ((sum1, count1, _, min1, max1), (sum2, count2, _, min2, max2)) =>
      val totalSum = sum1 + sum2
      val totalCount = count1 + count2
      val avg = totalSum / totalCount
      (totalSum, totalCount, avg, math.min(min1, min2), math.max(max1, max2))
    }
  )
  .mapValues { case (sum, count, _, min, max) => 
    (sum, count, sum / count, min, max)
  }

salesStats.collect().foreach { case (region, (sum, count, avg, min, max)) =>
  println(f"Region: $region, Sum: $sum%.2f, Count: $count, Avg: $avg%.2f, Min: $min%.2f, Max: $max%.2f")
}

RDD Persistence and Caching

// Different storage levels
val cachedRDD = numbersRDD
  .filter(_ % 2 == 0)
  .cache() // Default: MEMORY_ONLY

val persistedRDD = numbersRDD
  .map(_ * 2)
  .persist(StorageLevel.MEMORY_AND_DISK_SER)

// Custom partitioning
import org.apache.spark.HashPartitioner

val partitionedRDD = salesRDD
  .map(sale => (sale.region, sale))
  .partitionBy(new HashPartitioner(4))
  .persist(StorageLevel.MEMORY_ONLY)

// Check partitioning
println(s"Number of partitions: ${partitionedRDD.getNumPartitions}")
println(s"Partitioner: ${partitionedRDD.partitioner}")

// Custom partitioner
class RegionPartitioner(numPartitions: Int) extends org.apache.spark.Partitioner {
  override def numPartitions: Int = numPartitions

  override def getPartition(key: Any): Int = {
    key.toString match {
      case "North" => 0
      case "South" => 1
      case "East" => 2
      case "West" => 3
      case _ => 0
    }
  }
}

val customPartitionedRDD = salesRDD
  .map(sale => (sale.region, sale))
  .partitionBy(new RegionPartitioner(4))

DataFrames and Datasets

Creating DataFrames

import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.types._

// From case classes (automatic schema inference)
case class Person(id: Long, name: String, age: Int, salary: Double)

val people = Seq(
  Person(1, "John", 25, 50000),
  Person(2, "Jane", 30, 60000),
  Person(3, "Bob", 35, 70000)
)

val peopleDF: DataFrame = people.toDF()
val peopleDS: Dataset[Person] = people.toDS()

// From external sources
val jsonDF = spark.read
  .option("multiline", "true")
  .json("path/to/data.json")

val csvDF = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("path/to/data.csv")

val parquetDF = spark.read.parquet("path/to/data.parquet")

// With explicit schema
val schema = StructType(Array(
  StructField("id", LongType, nullable = false),
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false),
  StructField("salary", DoubleType, nullable = false)
))

val csvWithSchemaDF = spark.read
  .option("header", "true")
  .schema(schema)
  .csv("path/to/data.csv")

// From database
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://localhost:5432/mydb")
  .option("dbtable", "employees")
  .option("user", "username")
  .option("password", "password")
  .option("driver", "org.postgresql.Driver")
  .load()

DataFrame Operations

// Basic operations
peopleDF.show()
peopleDF.printSchema()
peopleDF.describe().show()

// Selecting columns
val nameAndAge = peopleDF.select("name", "age")
val selectedCols = peopleDF.select(col("name"), col("age") + 1)

// Filtering
val adults = peopleDF.filter(col("age") >= 18)
val highEarners = peopleDF.where(col("salary") > 55000)

// Adding and modifying columns
val withBonus = peopleDF.withColumn("bonus", col("salary") * 0.1)
val withAgeGroup = peopleDF.withColumn("ageGroup", 
  when(col("age") < 30, "Young")
    .when(col("age") < 40, "Middle")
    .otherwise("Senior")
)

// Aggregations
val salaryStats = peopleDF.agg(
  avg("salary").as("avg_salary"),
  max("salary").as("max_salary"),
  min("salary").as("min_salary"),
  count("*").as("total_count")
)

// Grouping
val ageGroupStats = withAgeGroup
  .groupBy("ageGroup")
  .agg(
    avg("salary").as("avg_salary"),
    count("*").as("count")
  )

// Window functions
import org.apache.spark.sql.expressions.Window

val windowSpec = Window.partitionBy("ageGroup").orderBy(desc("salary"))

val withRank = withAgeGroup.withColumn(
  "salary_rank",
  row_number().over(windowSpec)
)

val withRunningTotal = peopleDF.withColumn(
  "running_total",
  sum("salary").over(Window.orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow))
)

// Joins
case class Department(id: Long, name: String, managerId: Long)
val departments = Seq(
  Department(1, "Engineering", 1),
  Department(2, "Sales", 2),
  Department(3, "Marketing", 3)
).toDF()

val employeesWithDept = peopleDF
  .join(departments, peopleDF("id") === departments("managerId"), "inner")
  .select(peopleDF("*"), departments("name").as("department"))

// Self joins
val managers = peopleDF.as("emp")
  .join(peopleDF.as("mgr"), col("emp.id") === col("mgr.id"))
  .select(col("emp.name").as("employee"), col("mgr.name").as("manager"))

Working with Complex Data Types

// Arrays and Maps
case class Employee(
  id: Long, 
  name: String, 
  skills: Array[String], 
  projects: Map[String, Int]
)

val employees = Seq(
  Employee(1, "Alice", Array("Scala", "Python", "SQL"), Map("ProjectA" -> 80, "ProjectB" -> 90)),
  Employee(2, "Bob", Array("Java", "Scala"), Map("ProjectA" -> 70, "ProjectC" -> 85))
).toDF()

// Array operations
val withSkillCount = employees.withColumn("skill_count", size(col("skills")))
val withHasScala = employees.withColumn("has_scala", array_contains(col("skills"), "Scala"))

// Explode arrays
val skillsExploded = employees.select(col("id"), col("name"), explode(col("skills")).as("skill"))

// Map operations
val projectsExploded = employees.select(
  col("id"), 
  col("name"), 
  explode(col("projects")).as(Seq("project", "score"))
)

// Struct operations
case class Address(street: String, city: String, zipCode: String)
case class PersonWithAddress(id: Long, name: String, address: Address)

val peopleWithAddress = Seq(
  PersonWithAddress(1, "John", Address("123 Main St", "New York", "10001")),
  PersonWithAddress(2, "Jane", Address("456 Oak Ave", "Los Angeles", "90210"))
).toDF()

val withCity = peopleWithAddress.select(col("id"), col("name"), col("address.city"))

Advanced SQL Operations

Complex Queries and CTEs

// Register DataFrame as temporary view
peopleDF.createOrReplaceTempView("people")
departments.createOrReplaceTempView("departments")

// Common Table Expressions (CTEs)
val complexQuery = spark.sql("""
  WITH salary_stats AS (
    SELECT 
      AVG(salary) as avg_salary,
      STDDEV(salary) as std_salary
    FROM people
  ),
  categorized_employees AS (
    SELECT 
      id,
      name,
      age,
      salary,
      CASE 
        WHEN salary > (SELECT avg_salary + std_salary FROM salary_stats) THEN 'High'
        WHEN salary < (SELECT avg_salary - std_salary FROM salary_stats) THEN 'Low'
        ELSE 'Medium'
      END as salary_category
    FROM people
  )
  SELECT 
    salary_category,
    COUNT(*) as count,
    AVG(age) as avg_age,
    AVG(salary) as avg_salary
  FROM categorized_employees
  GROUP BY salary_category
  ORDER BY avg_salary DESC
""")

complexQuery.show()

// Window functions in SQL
val windowQuery = spark.sql("""
  SELECT 
    name,
    age,
    salary,
    LAG(salary) OVER (ORDER BY age) as prev_salary,
    LEAD(salary) OVER (ORDER BY age) as next_salary,
    ROW_NUMBER() OVER (ORDER BY salary DESC) as salary_rank,
    PERCENT_RANK() OVER (ORDER BY salary) as salary_percentile
  FROM people
""")

// Pivot operations
val pivotQuery = spark.sql("""
  SELECT *
  FROM (
    SELECT age, 
           CASE WHEN salary > 55000 THEN 'High' ELSE 'Normal' END as salary_level,
           salary
    FROM people
  )
  PIVOT (
    AVG(salary) as avg_salary,
    COUNT(*) as count
    FOR salary_level IN ('High', 'Normal')
  )
""")

User-Defined Functions (UDFs)

import org.apache.spark.sql.expressions.UserDefinedFunction

// Simple UDF
def calculateTax(salary: Double): Double = {
  if (salary <= 50000) salary * 0.1
  else if (salary <= 100000) salary * 0.2
  else salary * 0.3
}

val taxUDF: UserDefinedFunction = udf(calculateTax _)
spark.udf.register("calculate_tax", taxUDF)

val withTax = peopleDF.withColumn("tax", taxUDF(col("salary")))

// Using in SQL
spark.sql("""
  SELECT name, salary, calculate_tax(salary) as tax
  FROM people
""").show()

// Complex UDF with multiple parameters
def categorizeEmployee(age: Int, salary: Double): String = {
  (age, salary) match {
    case (a, s) if a < 30 && s < 50000 => "Junior"
    case (a, s) if a < 40 && s < 80000 => "Mid-level"
    case (a, s) if a >= 40 || s >= 80000 => "Senior"
    case _ => "Other"
  }
}

val categorizeUDF = udf(categorizeEmployee _)
val withCategory = peopleDF.withColumn("category", categorizeUDF(col("age"), col("salary")))

// Aggregate UDFs (UDAFs)
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.Row

class GeometricMean extends UserDefinedAggregateFunction {
  def inputSchema: StructType = StructType(StructField("value", DoubleType) :: Nil)
  def bufferSchema: StructType = StructType(
    StructField("product", DoubleType) :: 
    StructField("count", LongType) :: Nil
  )
  def dataType: DataType = DoubleType
  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 1.0
    buffer(1) = 0L
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getDouble(0) * input.getDouble(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getDouble(0) * buffer2.getDouble(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }

  def evaluate(buffer: Row): Any = {
    math.pow(buffer.getDouble(0), 1.0 / buffer.getLong(1))
  }
}

val geometricMean = new GeometricMean
spark.udf.register("geometric_mean", geometricMean)

Streaming Data Processing

Structured Streaming

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.types._
import java.util.concurrent.TimeUnit

// Define schema for streaming data
val streamSchema = StructType(Array(
  StructField("timestamp", TimestampType, true),
  StructField("userId", StringType, true),
  StructField("action", StringType, true),
  StructField("value", DoubleType, true)
))

// Read from file stream
val fileStream = spark.readStream
  .schema(streamSchema)
  .option("maxFilesPerTrigger", 1)
  .json("path/to/streaming/data/")

// Read from Kafka
val kafkaStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "user-events")
  .load()

val parsedKafkaStream = kafkaStream
  .select(from_json(col("value").cast("string"), streamSchema).as("data"))
  .select("data.*")

// Stream processing operations
val windowedCounts = parsedKafkaStream
  .withWatermark("timestamp", "10 minutes")
  .groupBy(
    window(col("timestamp"), "5 minutes", "1 minute"),
    col("action")
  )
  .count()

// Advanced streaming operations
val sessionizedData = parsedKafkaStream
  .withWatermark("timestamp", "30 minutes")
  .groupBy(
    col("userId"),
    session_window(col("timestamp"), "20 minutes")
  )
  .agg(
    count("*").as("event_count"),
    sum("value").as("total_value"),
    collect_list("action").as("actions")
  )

// Output streams
val consoleQuery = windowedCounts.writeStream
  .outputMode("update")
  .format("console")
  .option("truncate", false)
  .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
  .start()

val fileQuery = sessionizedData.writeStream
  .outputMode("append")
  .format("parquet")
  .option("path", "output/streaming-results")
  .option("checkpointLocation", "checkpoints/session-data")
  .trigger(Trigger.ProcessingTime(30, TimeUnit.SECONDS))
  .start()

// Custom sink
import org.apache.spark.sql.ForeachWriter

class CustomSink extends ForeachWriter[Row] {
  def open(partitionId: Long, epochId: Long): Boolean = {
    // Initialize connection
    true
  }

  def process(record: Row): Unit = {
    // Process each record
    println(s"Processing: ${record.mkString(", ")}")
  }

  def close(errorOrNull: Throwable): Unit = {
    // Clean up resources
  }
}

val customQuery = windowedCounts.writeStream
  .foreach(new CustomSink)
  .start()

Stream-Stream and Stream-Static Joins

// Stream-stream join
val stream1 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "orders")
  .load()
  .select(from_json(col("value").cast("string"), orderSchema).as("data"))
  .select("data.*")
  .withWatermark("orderTime", "10 minutes")

val stream2 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "payments")
  .load()
  .select(from_json(col("value").cast("string"), paymentSchema).as("data"))
  .select("data.*")
  .withWatermark("paymentTime", "15 minutes")

val joinedStream = stream1.join(
  stream2,
  expr("""
    orderId = paymentOrderId AND
    paymentTime >= orderTime AND
    paymentTime <= orderTime + interval 1 hour
  """)
)

// Stream-static join
val staticCustomers = spark.read.parquet("path/to/customers")

val enrichedStream = parsedKafkaStream.join(
  staticCustomers,
  "userId"
)

Performance Optimization

Catalyst Optimizer and Adaptive Query Execution

// Enable cost-based optimization
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")
spark.conf.set("spark.sql.cbo.planStats.enabled", "true")

// Analyze tables for statistics
spark.sql("ANALYZE TABLE people COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE people COMPUTE STATISTICS FOR COLUMNS id, age, salary")

// Check query plans
val query = peopleDF
  .filter(col("age") > 25)
  .groupBy("age")
  .agg(avg("salary"))

query.explain(true) // Shows all optimization phases
query.explain("cost") // Shows cost-based optimization

// Adaptive Query Execution (AQE)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

Partitioning and Bucketing

// Write partitioned data
peopleDF.write
  .partitionBy("age")
  .mode("overwrite")
  .parquet("output/people_partitioned")

// Write bucketed data
peopleDF.write
  .bucketBy(4, "id")
  .sortBy("age")
  .mode("overwrite")
  .option("path", "output/people_bucketed")
  .saveAsTable("people_bucketed")

// Partition pruning
val youngPeople = spark.read
  .parquet("output/people_partitioned")
  .filter(col("age") < 30) // Only reads relevant partitions

// Dynamic partition pruning
val filtered = spark.sql("""
  SELECT p.*, d.name as dept_name
  FROM people_partitioned p
  JOIN departments d ON p.id = d.managerId
  WHERE d.name = 'Engineering'
""")

Caching and Persistence Strategies

// Different storage levels
peopleDF.cache() // MEMORY_ONLY
peopleDF.persist(StorageLevel.MEMORY_AND_DISK)
peopleDF.persist(StorageLevel.MEMORY_ONLY_SER)

// Check cache status
println(s"Is cached: ${peopleDF.storageLevel}")

// Uncache when no longer needed
peopleDF.unpersist()

// Global cache management
spark.catalog.cacheTable("people")
spark.catalog.uncacheTable("people")
spark.catalog.clearCache()

// Optimal caching strategy
class CacheManager {
  def cacheStrategically(df: DataFrame, accessPattern: String): DataFrame = {
    accessPattern match {
      case "frequent" => df.persist(StorageLevel.MEMORY_ONLY)
      case "infrequent" => df.persist(StorageLevel.MEMORY_AND_DISK_SER)
      case "temporary" => df.cache()
      case _ => df
    }
  }
}

Memory Management and Tuning

// Memory configuration
object SparkTuning {
  def configureMemory(): SparkSession = {
    SparkSession.builder()
      .config("spark.executor.memory", "4g")
      .config("spark.driver.memory", "2g")
      .config("spark.executor.memoryFraction", "0.8")
      .config("spark.sql.shuffle.partitions", "200")
      .config("spark.default.parallelism", "100")
      .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
      .getOrCreate()
  }

  def monitorMemoryUsage(): Unit = {
    val sc = spark.sparkContext
    val statusTracker = sc.statusTracker

    statusTracker.getExecutorInfos.foreach { executor =>
      println(s"Executor ${executor.executorId}: " +
              s"Memory used: ${executor.memoryUsed}, " +
              s"Memory total: ${executor.maxMemory}")
    }
  }
}

Advanced Data Processing Patterns

ETL Pipelines

class ETLPipeline {

  def extract(sources: Map[String, String]): Map[String, DataFrame] = {
    sources.map { case (name, path) =>
      name -> (path match {
        case p if p.endsWith(".json") => spark.read.json(p)
        case p if p.endsWith(".csv") => spark.read.option("header", "true").csv(p)
        case p if p.endsWith(".parquet") => spark.read.parquet(p)
        case _ => throw new IllegalArgumentException(s"Unsupported format: $path")
      })
    }
  }

  def transform(rawData: Map[String, DataFrame]): DataFrame = {
    val customers = rawData("customers")
    val orders = rawData("orders")
    val products = rawData("products")

    // Data quality checks
    val cleanCustomers = customers
      .filter(col("email").isNotNull && col("email").rlike(".*@.*\\..*"))
      .withColumn("customer_name", trim(upper(col("name"))))

    val cleanOrders = orders
      .filter(col("order_date").isNotNull && col("amount") > 0)
      .withColumn("order_year", year(col("order_date")))

    // Business logic transformations
    val enrichedOrders = cleanOrders
      .join(cleanCustomers, "customer_id")
      .join(products, "product_id")
      .withColumn("total_with_tax", col("amount") * 1.08)
      .withColumn("order_category", 
        when(col("amount") > 1000, "High Value")
          .when(col("amount") > 100, "Medium Value")
          .otherwise("Low Value")
      )

    // Aggregations
    enrichedOrders
      .groupBy("customer_id", "customer_name", "order_year")
      .agg(
        count("order_id").as("total_orders"),
        sum("total_with_tax").as("total_spent"),
        avg("amount").as("avg_order_value"),
        collect_list("product_name").as("products_purchased")
      )
  }

  def load(df: DataFrame, outputPath: String, format: String = "parquet"): Unit = {
    df.write
      .mode("overwrite")
      .option("compression", "snappy")
      .format(format)
      .save(outputPath)
  }

  def runPipeline(sources: Map[String, String], outputPath: String): Unit = {
    val extracted = extract(sources)
    val transformed = transform(extracted)
    load(transformed, outputPath)

    // Data validation
    val recordCount = transformed.count()
    println(s"Pipeline completed successfully. Processed $recordCount records.")
  }
}

// Usage
val pipeline = new ETLPipeline()
val sources = Map(
  "customers" -> "data/customers.json",
  "orders" -> "data/orders.csv",
  "products" -> "data/products.parquet"
)

pipeline.runPipeline(sources, "output/customer_analytics")

Change Data Capture (CDC)

import org.apache.spark.sql.streaming.Trigger
import java.util.concurrent.TimeUnit

class CDCProcessor {

  def processCDCStream(inputPath: String, outputPath: String): StreamingQuery = {
    val cdcSchema = StructType(Array(
      StructField("operation", StringType, true),
      StructField("table", StringType, true),
      StructField("timestamp", TimestampType, true),
      StructField("before", StringType, true),
      StructField("after", StringType, true),
      StructField("primary_key", StringType, true)
    ))

    val cdcStream = spark.readStream
      .schema(cdcSchema)
      .option("maxFilesPerTrigger", 1)
      .json(inputPath)

    val processedStream = cdcStream
      .withWatermark("timestamp", "1 hour")
      .filter(col("operation").isin("INSERT", "UPDATE", "DELETE"))
      .withColumn("year", year(col("timestamp")))
      .withColumn("month", month(col("timestamp")))
      .withColumn("day", dayofmonth(col("timestamp")))

    processedStream.writeStream
      .format("delta") // Using Delta Lake for ACID transactions
      .outputMode("append")
      .option("checkpointLocation", s"$outputPath/checkpoints")
      .option("path", s"$outputPath/cdc_data")
      .partitionBy("year", "month", "day")
      .trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
      .start()
  }

  def applyChangesToTarget(cdcData: DataFrame, targetTable: String): Unit = {
    // Merge changes into target table
    val changes = cdcData
      .groupBy("primary_key")
      .agg(
        max(struct(col("timestamp"), col("operation"), col("after"))).as("latest_change")
      )
      .select(
        col("primary_key"),
        col("latest_change.operation").as("operation"),
        col("latest_change.after").as("data")
      )

    changes.write
      .mode("overwrite")
      .option("mergeSchema", "true")
      .saveAsTable(s"${targetTable}_staging")

    // Use SQL MERGE for upserts
    spark.sql(s"""
      MERGE INTO $targetTable target
      USING ${targetTable}_staging source
      ON target.id = source.primary_key
      WHEN MATCHED AND source.operation = 'DELETE' THEN DELETE
      WHEN MATCHED AND source.operation = 'UPDATE' THEN UPDATE SET *
      WHEN NOT MATCHED AND source.operation = 'INSERT' THEN INSERT *
    """)
  }
}

Integration with External Systems

Database Connectivity

// JDBC optimizations
def readFromDatabase(
  url: String,
  table: String,
  partitionColumn: String,
  lowerBound: Long,
  upperBound: Long,
  numPartitions: Int
): DataFrame = {
  spark.read
    .format("jdbc")
    .option("url", url)
    .option("dbtable", table)
    .option("user", "username")
    .option("password", "password")
    .option("driver", "org.postgresql.Driver")
    .option("partitionColumn", partitionColumn)
    .option("lowerBound", lowerBound)
    .option("upperBound", upperBound)
    .option("numPartitions", numPartitions)
    .option("fetchsize", 10000)
    .load()
}

def writeToDatabase(df: DataFrame, url: String, table: String): Unit = {
  df.write
    .format("jdbc")
    .option("url", url)
    .option("dbtable", table)
    .option("user", "username")
    .option("password", "password")
    .option("driver", "org.postgresql.Driver")
    .option("batchsize", 10000)
    .option("isolationLevel", "READ_COMMITTED")
    .mode("append")
    .save()
}

// Custom data source
class CustomDataSource extends org.apache.spark.sql.sources.v2.DataSourceV2 
  with org.apache.spark.sql.sources.v2.ReadSupport {

  def createReader(options: DataSourceOptions): DataSourceReader = {
    new CustomDataSourceReader(options)
  }
}

class CustomDataSourceReader(options: DataSourceOptions) 
  extends org.apache.spark.sql.sources.v2.reader.DataSourceReader {

  def readSchema(): StructType = {
    // Define schema
    StructType(Array(
      StructField("id", LongType, false),
      StructField("data", StringType, true)
    ))
  }

  def planInputPartitions(): java.util.List[org.apache.spark.sql.sources.v2.reader.InputPartition[InternalRow]] = {
    // Create partitions
    java.util.Arrays.asList()
  }
}

Testing Spark Applications

Unit Testing with Spark

import com.holdenkarau.spark.testing.{SharedSparkContext, DataFrameSuiteBase}
import org.scalatest.{FlatSpec, Matchers}

class SparkJobTest extends FlatSpec with Matchers with DataFrameSuiteBase {

  "Data transformation" should "correctly aggregate sales data" in {
    val input = Seq(
      ("product1", 100.0, "North"),
      ("product1", 150.0, "South"),
      ("product2", 200.0, "North")
    ).toDF("product", "amount", "region")

    val expected = Seq(
      ("product1", 250.0, 2),
      ("product2", 200.0, 1)
    ).toDF("product", "total_amount", "order_count")

    val result = aggregateSales(input)

    assertDataFrameEquals(expected, result)
  }

  def aggregateSales(df: DataFrame): DataFrame = {
    df.groupBy("product")
      .agg(
        sum("amount").as("total_amount"),
        count("*").as("order_count")
      )
      .orderBy("product")
  }
}

// Property-based testing
import org.scalacheck.{Gen, Properties}
import org.scalacheck.Prop._

object SparkPropertyTests extends Properties("Spark") {

  property("sum is commutative") = forAll(Gen.listOf(Gen.choose(1, 100))) { numbers =>
    val df1 = numbers.toDF("value")
    val df2 = numbers.reverse.toDF("value")

    val sum1 = df1.agg(sum("value")).collect()(0).getLong(0)
    val sum2 = df2.agg(sum("value")).collect()(0).getLong(0)

    sum1 == sum2
  }
}

Conclusion

Apache Spark provides a comprehensive platform for big data processing in Scala, offering:

Core Capabilities:

  • Unified processing engine for batch and streaming data
  • SQL interface with advanced optimization
  • Machine learning and graph processing libraries
  • Rich APIs for data manipulation and analysis

Performance Features:

  • Catalyst optimizer for query optimization
  • Adaptive Query Execution for runtime optimization
  • Advanced caching and persistence strategies
  • Columnar storage formats (Parquet, Delta Lake)

Scalability:

  • Horizontal scaling across clusters
  • Efficient resource utilization
  • Dynamic resource allocation
  • Integration with cluster managers

Best Practices:

  • Proper partitioning and bucketing strategies
  • Effective use of caching and persistence
  • Memory and resource tuning
  • Monitoring and debugging techniques

Enterprise Features:

  • ACID transactions with Delta Lake
  • Change data capture processing
  • Integration with external systems
  • Comprehensive testing frameworks

Spark's combination of ease of use, performance, and comprehensive feature set makes it the de facto standard for big data processing, particularly when combined with Scala's expressive syntax and type safety.