Big Data Processing: Apache Spark and Distributed Computing
Apache Spark has revolutionized big data processing by providing a unified analytics engine for large-scale data processing. Written in Scala, Spark offers high-performance distributed computing with APIs for batch processing, streaming, machine learning, and graph processing. In this lesson, we'll explore comprehensive Spark programming in Scala.
Understanding Apache Spark Architecture
Core Components
- Spark Core: Foundation with RDDs and basic operations
- Spark SQL: Structured data processing with DataFrames and Datasets
- Spark Streaming: Real-time stream processing
- MLlib: Machine learning library
- GraphX: Graph processing framework
Execution Model
- Driver Program: Coordinates execution and maintains application state
- Cluster Manager: Allocates resources (YARN, Mesos, Kubernetes, Standalone)
- Executors: Worker processes that execute tasks and store data
- Tasks: Units of work sent to executors
Setting Up Spark Environment
Dependencies and Configuration
// build.sbt
val SparkVersion = "3.5.0"
val HadoopVersion = "3.3.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % SparkVersion,
"org.apache.spark" %% "spark-sql" % SparkVersion,
"org.apache.spark" %% "spark-streaming" % SparkVersion,
"org.apache.spark" %% "spark-mllib" % SparkVersion,
"org.apache.spark" %% "spark-graphx" % SparkVersion,
// Connectors
"org.apache.spark" %% "spark-sql-kafka-0-10" % SparkVersion,
"org.apache.spark" %% "spark-streaming-kafka-0-10" % SparkVersion,
"mysql" % "mysql-connector-java" % "8.0.33",
"org.postgresql" % "postgresql" % "42.6.0",
// File formats
"org.apache.spark" %% "spark-avro" % SparkVersion,
"com.databricks" %% "spark-xml" % "0.16.0",
// Testing
"com.holdenkarau" %% "spark-testing-base" % s"${SparkVersion}_1.4.7" % Test
)
Spark Session Configuration
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
// Create optimized Spark session
def createSparkSession(appName: String): SparkSession = {
SparkSession.builder()
.appName(appName)
.master("local[*]") // Use all available cores locally
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.adaptive.skewJoin.enabled", "true")
.config("spark.sql.parquet.compression.codec", "snappy")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.execution.arrow.pyspark.enabled", "true")
.config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")
.getOrCreate()
}
implicit val spark: SparkSession = createSparkSession("BigDataProcessing")
import spark.implicits._
// For RDD operations
implicit val sc: SparkContext = spark.sparkContext
Working with RDDs (Resilient Distributed Datasets)
Creating and Basic Operations
import org.apache.spark.rdd.RDD
// Create RDDs from collections
val numbersRDD: RDD[Int] = sc.parallelize(1 to 1000000)
val textRDD: RDD[String] = sc.parallelize(Seq("hello", "world", "spark", "scala"))
// Create RDD from external data
val fileRDD: RDD[String] = sc.textFile("hdfs://path/to/large/file.txt")
val linesRDD: RDD[String] = sc.wholeTextFiles("hdfs://path/to/directory/*").values
// Transformations (lazy evaluation)
val squaredRDD = numbersRDD.map(x => x * x)
val evenRDD = numbersRDD.filter(_ % 2 == 0)
val wordCountRDD = textRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
// Actions (trigger computation)
val sum = numbersRDD.reduce(_ + _)
val count = numbersRDD.count()
val first10 = numbersRDD.take(10)
val samples = numbersRDD.sample(withReplacement = false, fraction = 0.1, seed = 42)
println(s"Sum: $sum, Count: $count")
Advanced RDD Operations
// Key-value pair operations
case class Sale(product: String, amount: Double, date: String, region: String)
val salesData = Seq(
Sale("laptop", 1200.0, "2023-01-15", "North"),
Sale("phone", 800.0, "2023-01-16", "South"),
Sale("laptop", 1100.0, "2023-01-17", "North"),
Sale("tablet", 600.0, "2023-01-18", "East")
)
val salesRDD = sc.parallelize(salesData)
// Group by key operations
val salesByProduct = salesRDD
.map(sale => (sale.product, sale.amount))
.groupByKey()
.mapValues(amounts => amounts.sum)
val salesByRegion = salesRDD
.map(sale => (sale.region, sale.amount))
.reduceByKey(_ + _)
// Join operations
val productInfo = sc.parallelize(Seq(
("laptop", "Electronics"),
("phone", "Electronics"),
("tablet", "Electronics")
))
val salesWithCategory = salesRDD
.map(sale => (sale.product, sale))
.join(productInfo)
.map { case (product, (sale, category)) =>
(product, sale.amount, category, sale.region)
}
// Aggregation operations
val salesStats = salesRDD
.map(sale => (sale.region, sale.amount))
.aggregateByKey((0.0, 0, 0.0, Double.MaxValue, Double.MinValue))(
// Combine within partition
{ case ((sum, count, _, min, max), amount) =>
(sum + amount, count + 1, 0.0, math.min(min, amount), math.max(max, amount))
},
// Combine across partitions
{ case ((sum1, count1, _, min1, max1), (sum2, count2, _, min2, max2)) =>
val totalSum = sum1 + sum2
val totalCount = count1 + count2
val avg = totalSum / totalCount
(totalSum, totalCount, avg, math.min(min1, min2), math.max(max1, max2))
}
)
.mapValues { case (sum, count, _, min, max) =>
(sum, count, sum / count, min, max)
}
salesStats.collect().foreach { case (region, (sum, count, avg, min, max)) =>
println(f"Region: $region, Sum: $sum%.2f, Count: $count, Avg: $avg%.2f, Min: $min%.2f, Max: $max%.2f")
}
RDD Persistence and Caching
// Different storage levels
val cachedRDD = numbersRDD
.filter(_ % 2 == 0)
.cache() // Default: MEMORY_ONLY
val persistedRDD = numbersRDD
.map(_ * 2)
.persist(StorageLevel.MEMORY_AND_DISK_SER)
// Custom partitioning
import org.apache.spark.HashPartitioner
val partitionedRDD = salesRDD
.map(sale => (sale.region, sale))
.partitionBy(new HashPartitioner(4))
.persist(StorageLevel.MEMORY_ONLY)
// Check partitioning
println(s"Number of partitions: ${partitionedRDD.getNumPartitions}")
println(s"Partitioner: ${partitionedRDD.partitioner}")
// Custom partitioner
class RegionPartitioner(numPartitions: Int) extends org.apache.spark.Partitioner {
override def numPartitions: Int = numPartitions
override def getPartition(key: Any): Int = {
key.toString match {
case "North" => 0
case "South" => 1
case "East" => 2
case "West" => 3
case _ => 0
}
}
}
val customPartitionedRDD = salesRDD
.map(sale => (sale.region, sale))
.partitionBy(new RegionPartitioner(4))
DataFrames and Datasets
Creating DataFrames
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.types._
// From case classes (automatic schema inference)
case class Person(id: Long, name: String, age: Int, salary: Double)
val people = Seq(
Person(1, "John", 25, 50000),
Person(2, "Jane", 30, 60000),
Person(3, "Bob", 35, 70000)
)
val peopleDF: DataFrame = people.toDF()
val peopleDS: Dataset[Person] = people.toDS()
// From external sources
val jsonDF = spark.read
.option("multiline", "true")
.json("path/to/data.json")
val csvDF = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("path/to/data.csv")
val parquetDF = spark.read.parquet("path/to/data.parquet")
// With explicit schema
val schema = StructType(Array(
StructField("id", LongType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false),
StructField("salary", DoubleType, nullable = false)
))
val csvWithSchemaDF = spark.read
.option("header", "true")
.schema(schema)
.csv("path/to/data.csv")
// From database
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://localhost:5432/mydb")
.option("dbtable", "employees")
.option("user", "username")
.option("password", "password")
.option("driver", "org.postgresql.Driver")
.load()
DataFrame Operations
// Basic operations
peopleDF.show()
peopleDF.printSchema()
peopleDF.describe().show()
// Selecting columns
val nameAndAge = peopleDF.select("name", "age")
val selectedCols = peopleDF.select(col("name"), col("age") + 1)
// Filtering
val adults = peopleDF.filter(col("age") >= 18)
val highEarners = peopleDF.where(col("salary") > 55000)
// Adding and modifying columns
val withBonus = peopleDF.withColumn("bonus", col("salary") * 0.1)
val withAgeGroup = peopleDF.withColumn("ageGroup",
when(col("age") < 30, "Young")
.when(col("age") < 40, "Middle")
.otherwise("Senior")
)
// Aggregations
val salaryStats = peopleDF.agg(
avg("salary").as("avg_salary"),
max("salary").as("max_salary"),
min("salary").as("min_salary"),
count("*").as("total_count")
)
// Grouping
val ageGroupStats = withAgeGroup
.groupBy("ageGroup")
.agg(
avg("salary").as("avg_salary"),
count("*").as("count")
)
// Window functions
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("ageGroup").orderBy(desc("salary"))
val withRank = withAgeGroup.withColumn(
"salary_rank",
row_number().over(windowSpec)
)
val withRunningTotal = peopleDF.withColumn(
"running_total",
sum("salary").over(Window.orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow))
)
// Joins
case class Department(id: Long, name: String, managerId: Long)
val departments = Seq(
Department(1, "Engineering", 1),
Department(2, "Sales", 2),
Department(3, "Marketing", 3)
).toDF()
val employeesWithDept = peopleDF
.join(departments, peopleDF("id") === departments("managerId"), "inner")
.select(peopleDF("*"), departments("name").as("department"))
// Self joins
val managers = peopleDF.as("emp")
.join(peopleDF.as("mgr"), col("emp.id") === col("mgr.id"))
.select(col("emp.name").as("employee"), col("mgr.name").as("manager"))
Working with Complex Data Types
// Arrays and Maps
case class Employee(
id: Long,
name: String,
skills: Array[String],
projects: Map[String, Int]
)
val employees = Seq(
Employee(1, "Alice", Array("Scala", "Python", "SQL"), Map("ProjectA" -> 80, "ProjectB" -> 90)),
Employee(2, "Bob", Array("Java", "Scala"), Map("ProjectA" -> 70, "ProjectC" -> 85))
).toDF()
// Array operations
val withSkillCount = employees.withColumn("skill_count", size(col("skills")))
val withHasScala = employees.withColumn("has_scala", array_contains(col("skills"), "Scala"))
// Explode arrays
val skillsExploded = employees.select(col("id"), col("name"), explode(col("skills")).as("skill"))
// Map operations
val projectsExploded = employees.select(
col("id"),
col("name"),
explode(col("projects")).as(Seq("project", "score"))
)
// Struct operations
case class Address(street: String, city: String, zipCode: String)
case class PersonWithAddress(id: Long, name: String, address: Address)
val peopleWithAddress = Seq(
PersonWithAddress(1, "John", Address("123 Main St", "New York", "10001")),
PersonWithAddress(2, "Jane", Address("456 Oak Ave", "Los Angeles", "90210"))
).toDF()
val withCity = peopleWithAddress.select(col("id"), col("name"), col("address.city"))
Advanced SQL Operations
Complex Queries and CTEs
// Register DataFrame as temporary view
peopleDF.createOrReplaceTempView("people")
departments.createOrReplaceTempView("departments")
// Common Table Expressions (CTEs)
val complexQuery = spark.sql("""
WITH salary_stats AS (
SELECT
AVG(salary) as avg_salary,
STDDEV(salary) as std_salary
FROM people
),
categorized_employees AS (
SELECT
id,
name,
age,
salary,
CASE
WHEN salary > (SELECT avg_salary + std_salary FROM salary_stats) THEN 'High'
WHEN salary < (SELECT avg_salary - std_salary FROM salary_stats) THEN 'Low'
ELSE 'Medium'
END as salary_category
FROM people
)
SELECT
salary_category,
COUNT(*) as count,
AVG(age) as avg_age,
AVG(salary) as avg_salary
FROM categorized_employees
GROUP BY salary_category
ORDER BY avg_salary DESC
""")
complexQuery.show()
// Window functions in SQL
val windowQuery = spark.sql("""
SELECT
name,
age,
salary,
LAG(salary) OVER (ORDER BY age) as prev_salary,
LEAD(salary) OVER (ORDER BY age) as next_salary,
ROW_NUMBER() OVER (ORDER BY salary DESC) as salary_rank,
PERCENT_RANK() OVER (ORDER BY salary) as salary_percentile
FROM people
""")
// Pivot operations
val pivotQuery = spark.sql("""
SELECT *
FROM (
SELECT age,
CASE WHEN salary > 55000 THEN 'High' ELSE 'Normal' END as salary_level,
salary
FROM people
)
PIVOT (
AVG(salary) as avg_salary,
COUNT(*) as count
FOR salary_level IN ('High', 'Normal')
)
""")
User-Defined Functions (UDFs)
import org.apache.spark.sql.expressions.UserDefinedFunction
// Simple UDF
def calculateTax(salary: Double): Double = {
if (salary <= 50000) salary * 0.1
else if (salary <= 100000) salary * 0.2
else salary * 0.3
}
val taxUDF: UserDefinedFunction = udf(calculateTax _)
spark.udf.register("calculate_tax", taxUDF)
val withTax = peopleDF.withColumn("tax", taxUDF(col("salary")))
// Using in SQL
spark.sql("""
SELECT name, salary, calculate_tax(salary) as tax
FROM people
""").show()
// Complex UDF with multiple parameters
def categorizeEmployee(age: Int, salary: Double): String = {
(age, salary) match {
case (a, s) if a < 30 && s < 50000 => "Junior"
case (a, s) if a < 40 && s < 80000 => "Mid-level"
case (a, s) if a >= 40 || s >= 80000 => "Senior"
case _ => "Other"
}
}
val categorizeUDF = udf(categorizeEmployee _)
val withCategory = peopleDF.withColumn("category", categorizeUDF(col("age"), col("salary")))
// Aggregate UDFs (UDAFs)
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.Row
class GeometricMean extends UserDefinedAggregateFunction {
def inputSchema: StructType = StructType(StructField("value", DoubleType) :: Nil)
def bufferSchema: StructType = StructType(
StructField("product", DoubleType) ::
StructField("count", LongType) :: Nil
)
def dataType: DataType = DoubleType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 1.0
buffer(1) = 0L
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getDouble(0) * input.getDouble(0)
buffer(1) = buffer.getLong(1) + 1
}
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getDouble(0) * buffer2.getDouble(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
def evaluate(buffer: Row): Any = {
math.pow(buffer.getDouble(0), 1.0 / buffer.getLong(1))
}
}
val geometricMean = new GeometricMean
spark.udf.register("geometric_mean", geometricMean)
Streaming Data Processing
Structured Streaming
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.types._
import java.util.concurrent.TimeUnit
// Define schema for streaming data
val streamSchema = StructType(Array(
StructField("timestamp", TimestampType, true),
StructField("userId", StringType, true),
StructField("action", StringType, true),
StructField("value", DoubleType, true)
))
// Read from file stream
val fileStream = spark.readStream
.schema(streamSchema)
.option("maxFilesPerTrigger", 1)
.json("path/to/streaming/data/")
// Read from Kafka
val kafkaStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.load()
val parsedKafkaStream = kafkaStream
.select(from_json(col("value").cast("string"), streamSchema).as("data"))
.select("data.*")
// Stream processing operations
val windowedCounts = parsedKafkaStream
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "5 minutes", "1 minute"),
col("action")
)
.count()
// Advanced streaming operations
val sessionizedData = parsedKafkaStream
.withWatermark("timestamp", "30 minutes")
.groupBy(
col("userId"),
session_window(col("timestamp"), "20 minutes")
)
.agg(
count("*").as("event_count"),
sum("value").as("total_value"),
collect_list("action").as("actions")
)
// Output streams
val consoleQuery = windowedCounts.writeStream
.outputMode("update")
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
.start()
val fileQuery = sessionizedData.writeStream
.outputMode("append")
.format("parquet")
.option("path", "output/streaming-results")
.option("checkpointLocation", "checkpoints/session-data")
.trigger(Trigger.ProcessingTime(30, TimeUnit.SECONDS))
.start()
// Custom sink
import org.apache.spark.sql.ForeachWriter
class CustomSink extends ForeachWriter[Row] {
def open(partitionId: Long, epochId: Long): Boolean = {
// Initialize connection
true
}
def process(record: Row): Unit = {
// Process each record
println(s"Processing: ${record.mkString(", ")}")
}
def close(errorOrNull: Throwable): Unit = {
// Clean up resources
}
}
val customQuery = windowedCounts.writeStream
.foreach(new CustomSink)
.start()
Stream-Stream and Stream-Static Joins
// Stream-stream join
val stream1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "orders")
.load()
.select(from_json(col("value").cast("string"), orderSchema).as("data"))
.select("data.*")
.withWatermark("orderTime", "10 minutes")
val stream2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "payments")
.load()
.select(from_json(col("value").cast("string"), paymentSchema).as("data"))
.select("data.*")
.withWatermark("paymentTime", "15 minutes")
val joinedStream = stream1.join(
stream2,
expr("""
orderId = paymentOrderId AND
paymentTime >= orderTime AND
paymentTime <= orderTime + interval 1 hour
""")
)
// Stream-static join
val staticCustomers = spark.read.parquet("path/to/customers")
val enrichedStream = parsedKafkaStream.join(
staticCustomers,
"userId"
)
Performance Optimization
Catalyst Optimizer and Adaptive Query Execution
// Enable cost-based optimization
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")
spark.conf.set("spark.sql.cbo.planStats.enabled", "true")
// Analyze tables for statistics
spark.sql("ANALYZE TABLE people COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE people COMPUTE STATISTICS FOR COLUMNS id, age, salary")
// Check query plans
val query = peopleDF
.filter(col("age") > 25)
.groupBy("age")
.agg(avg("salary"))
query.explain(true) // Shows all optimization phases
query.explain("cost") // Shows cost-based optimization
// Adaptive Query Execution (AQE)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
Partitioning and Bucketing
// Write partitioned data
peopleDF.write
.partitionBy("age")
.mode("overwrite")
.parquet("output/people_partitioned")
// Write bucketed data
peopleDF.write
.bucketBy(4, "id")
.sortBy("age")
.mode("overwrite")
.option("path", "output/people_bucketed")
.saveAsTable("people_bucketed")
// Partition pruning
val youngPeople = spark.read
.parquet("output/people_partitioned")
.filter(col("age") < 30) // Only reads relevant partitions
// Dynamic partition pruning
val filtered = spark.sql("""
SELECT p.*, d.name as dept_name
FROM people_partitioned p
JOIN departments d ON p.id = d.managerId
WHERE d.name = 'Engineering'
""")
Caching and Persistence Strategies
// Different storage levels
peopleDF.cache() // MEMORY_ONLY
peopleDF.persist(StorageLevel.MEMORY_AND_DISK)
peopleDF.persist(StorageLevel.MEMORY_ONLY_SER)
// Check cache status
println(s"Is cached: ${peopleDF.storageLevel}")
// Uncache when no longer needed
peopleDF.unpersist()
// Global cache management
spark.catalog.cacheTable("people")
spark.catalog.uncacheTable("people")
spark.catalog.clearCache()
// Optimal caching strategy
class CacheManager {
def cacheStrategically(df: DataFrame, accessPattern: String): DataFrame = {
accessPattern match {
case "frequent" => df.persist(StorageLevel.MEMORY_ONLY)
case "infrequent" => df.persist(StorageLevel.MEMORY_AND_DISK_SER)
case "temporary" => df.cache()
case _ => df
}
}
}
Memory Management and Tuning
// Memory configuration
object SparkTuning {
def configureMemory(): SparkSession = {
SparkSession.builder()
.config("spark.executor.memory", "4g")
.config("spark.driver.memory", "2g")
.config("spark.executor.memoryFraction", "0.8")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.default.parallelism", "100")
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
.getOrCreate()
}
def monitorMemoryUsage(): Unit = {
val sc = spark.sparkContext
val statusTracker = sc.statusTracker
statusTracker.getExecutorInfos.foreach { executor =>
println(s"Executor ${executor.executorId}: " +
s"Memory used: ${executor.memoryUsed}, " +
s"Memory total: ${executor.maxMemory}")
}
}
}
Advanced Data Processing Patterns
ETL Pipelines
class ETLPipeline {
def extract(sources: Map[String, String]): Map[String, DataFrame] = {
sources.map { case (name, path) =>
name -> (path match {
case p if p.endsWith(".json") => spark.read.json(p)
case p if p.endsWith(".csv") => spark.read.option("header", "true").csv(p)
case p if p.endsWith(".parquet") => spark.read.parquet(p)
case _ => throw new IllegalArgumentException(s"Unsupported format: $path")
})
}
}
def transform(rawData: Map[String, DataFrame]): DataFrame = {
val customers = rawData("customers")
val orders = rawData("orders")
val products = rawData("products")
// Data quality checks
val cleanCustomers = customers
.filter(col("email").isNotNull && col("email").rlike(".*@.*\\..*"))
.withColumn("customer_name", trim(upper(col("name"))))
val cleanOrders = orders
.filter(col("order_date").isNotNull && col("amount") > 0)
.withColumn("order_year", year(col("order_date")))
// Business logic transformations
val enrichedOrders = cleanOrders
.join(cleanCustomers, "customer_id")
.join(products, "product_id")
.withColumn("total_with_tax", col("amount") * 1.08)
.withColumn("order_category",
when(col("amount") > 1000, "High Value")
.when(col("amount") > 100, "Medium Value")
.otherwise("Low Value")
)
// Aggregations
enrichedOrders
.groupBy("customer_id", "customer_name", "order_year")
.agg(
count("order_id").as("total_orders"),
sum("total_with_tax").as("total_spent"),
avg("amount").as("avg_order_value"),
collect_list("product_name").as("products_purchased")
)
}
def load(df: DataFrame, outputPath: String, format: String = "parquet"): Unit = {
df.write
.mode("overwrite")
.option("compression", "snappy")
.format(format)
.save(outputPath)
}
def runPipeline(sources: Map[String, String], outputPath: String): Unit = {
val extracted = extract(sources)
val transformed = transform(extracted)
load(transformed, outputPath)
// Data validation
val recordCount = transformed.count()
println(s"Pipeline completed successfully. Processed $recordCount records.")
}
}
// Usage
val pipeline = new ETLPipeline()
val sources = Map(
"customers" -> "data/customers.json",
"orders" -> "data/orders.csv",
"products" -> "data/products.parquet"
)
pipeline.runPipeline(sources, "output/customer_analytics")
Change Data Capture (CDC)
import org.apache.spark.sql.streaming.Trigger
import java.util.concurrent.TimeUnit
class CDCProcessor {
def processCDCStream(inputPath: String, outputPath: String): StreamingQuery = {
val cdcSchema = StructType(Array(
StructField("operation", StringType, true),
StructField("table", StringType, true),
StructField("timestamp", TimestampType, true),
StructField("before", StringType, true),
StructField("after", StringType, true),
StructField("primary_key", StringType, true)
))
val cdcStream = spark.readStream
.schema(cdcSchema)
.option("maxFilesPerTrigger", 1)
.json(inputPath)
val processedStream = cdcStream
.withWatermark("timestamp", "1 hour")
.filter(col("operation").isin("INSERT", "UPDATE", "DELETE"))
.withColumn("year", year(col("timestamp")))
.withColumn("month", month(col("timestamp")))
.withColumn("day", dayofmonth(col("timestamp")))
processedStream.writeStream
.format("delta") // Using Delta Lake for ACID transactions
.outputMode("append")
.option("checkpointLocation", s"$outputPath/checkpoints")
.option("path", s"$outputPath/cdc_data")
.partitionBy("year", "month", "day")
.trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
.start()
}
def applyChangesToTarget(cdcData: DataFrame, targetTable: String): Unit = {
// Merge changes into target table
val changes = cdcData
.groupBy("primary_key")
.agg(
max(struct(col("timestamp"), col("operation"), col("after"))).as("latest_change")
)
.select(
col("primary_key"),
col("latest_change.operation").as("operation"),
col("latest_change.after").as("data")
)
changes.write
.mode("overwrite")
.option("mergeSchema", "true")
.saveAsTable(s"${targetTable}_staging")
// Use SQL MERGE for upserts
spark.sql(s"""
MERGE INTO $targetTable target
USING ${targetTable}_staging source
ON target.id = source.primary_key
WHEN MATCHED AND source.operation = 'DELETE' THEN DELETE
WHEN MATCHED AND source.operation = 'UPDATE' THEN UPDATE SET *
WHEN NOT MATCHED AND source.operation = 'INSERT' THEN INSERT *
""")
}
}
Integration with External Systems
Database Connectivity
// JDBC optimizations
def readFromDatabase(
url: String,
table: String,
partitionColumn: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int
): DataFrame = {
spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", table)
.option("user", "username")
.option("password", "password")
.option("driver", "org.postgresql.Driver")
.option("partitionColumn", partitionColumn)
.option("lowerBound", lowerBound)
.option("upperBound", upperBound)
.option("numPartitions", numPartitions)
.option("fetchsize", 10000)
.load()
}
def writeToDatabase(df: DataFrame, url: String, table: String): Unit = {
df.write
.format("jdbc")
.option("url", url)
.option("dbtable", table)
.option("user", "username")
.option("password", "password")
.option("driver", "org.postgresql.Driver")
.option("batchsize", 10000)
.option("isolationLevel", "READ_COMMITTED")
.mode("append")
.save()
}
// Custom data source
class CustomDataSource extends org.apache.spark.sql.sources.v2.DataSourceV2
with org.apache.spark.sql.sources.v2.ReadSupport {
def createReader(options: DataSourceOptions): DataSourceReader = {
new CustomDataSourceReader(options)
}
}
class CustomDataSourceReader(options: DataSourceOptions)
extends org.apache.spark.sql.sources.v2.reader.DataSourceReader {
def readSchema(): StructType = {
// Define schema
StructType(Array(
StructField("id", LongType, false),
StructField("data", StringType, true)
))
}
def planInputPartitions(): java.util.List[org.apache.spark.sql.sources.v2.reader.InputPartition[InternalRow]] = {
// Create partitions
java.util.Arrays.asList()
}
}
Testing Spark Applications
Unit Testing with Spark
import com.holdenkarau.spark.testing.{SharedSparkContext, DataFrameSuiteBase}
import org.scalatest.{FlatSpec, Matchers}
class SparkJobTest extends FlatSpec with Matchers with DataFrameSuiteBase {
"Data transformation" should "correctly aggregate sales data" in {
val input = Seq(
("product1", 100.0, "North"),
("product1", 150.0, "South"),
("product2", 200.0, "North")
).toDF("product", "amount", "region")
val expected = Seq(
("product1", 250.0, 2),
("product2", 200.0, 1)
).toDF("product", "total_amount", "order_count")
val result = aggregateSales(input)
assertDataFrameEquals(expected, result)
}
def aggregateSales(df: DataFrame): DataFrame = {
df.groupBy("product")
.agg(
sum("amount").as("total_amount"),
count("*").as("order_count")
)
.orderBy("product")
}
}
// Property-based testing
import org.scalacheck.{Gen, Properties}
import org.scalacheck.Prop._
object SparkPropertyTests extends Properties("Spark") {
property("sum is commutative") = forAll(Gen.listOf(Gen.choose(1, 100))) { numbers =>
val df1 = numbers.toDF("value")
val df2 = numbers.reverse.toDF("value")
val sum1 = df1.agg(sum("value")).collect()(0).getLong(0)
val sum2 = df2.agg(sum("value")).collect()(0).getLong(0)
sum1 == sum2
}
}
Conclusion
Apache Spark provides a comprehensive platform for big data processing in Scala, offering:
Core Capabilities:
- Unified processing engine for batch and streaming data
- SQL interface with advanced optimization
- Machine learning and graph processing libraries
- Rich APIs for data manipulation and analysis
Performance Features:
- Catalyst optimizer for query optimization
- Adaptive Query Execution for runtime optimization
- Advanced caching and persistence strategies
- Columnar storage formats (Parquet, Delta Lake)
Scalability:
- Horizontal scaling across clusters
- Efficient resource utilization
- Dynamic resource allocation
- Integration with cluster managers
Best Practices:
- Proper partitioning and bucketing strategies
- Effective use of caching and persistence
- Memory and resource tuning
- Monitoring and debugging techniques
Enterprise Features:
- ACID transactions with Delta Lake
- Change data capture processing
- Integration with external systems
- Comprehensive testing frameworks
Spark's combination of ease of use, performance, and comprehensive feature set makes it the de facto standard for big data processing, particularly when combined with Scala's expressive syntax and type safety.
Comments
Be the first to comment on this lesson!