Big Data Processing with Apache Spark and Scala
Apache Spark with Scala provides a powerful combination for processing massive datasets across distributed clusters. This comprehensive lesson covers Spark fundamentals, DataFrames, Datasets, RDDs, Spark SQL, streaming, machine learning, and optimization techniques for building scalable data processing applications.
Spark Fundamentals and Setup
Core Concepts and Architecture
import org.apache.spark.sql.{SparkSession, DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import java.sql.Timestamp
import java.time.LocalDateTime
// Spark session configuration and setup
object SparkConfiguration {
def createSparkSession(appName: String): SparkSession = {
SparkSession.builder()
.appName(appName)
.master("local[*]") // Use all available cores locally
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.adaptive.skewJoin.enabled", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.execution.arrow.pyspark.enabled", "true")
.config("spark.sql.execution.arrow.sparkr.enabled", "true")
.getOrCreate()
}
def createClusterSparkSession(appName: String, masterUrl: String): SparkSession = {
SparkSession.builder()
.appName(appName)
.master(masterUrl)
.config("spark.executor.memory", "4g")
.config("spark.executor.cores", "4")
.config("spark.executor.instances", "10")
.config("spark.driver.memory", "2g")
.config("spark.driver.maxResultSize", "1g")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrationRequired", "false")
.getOrCreate()
}
// Spark context utilities
implicit class SparkSessionExtensions(spark: SparkSession) {
def enableHiveSupport(): SparkSession = {
spark.newSession().builder()
.enableHiveSupport()
.getOrCreate()
}
def setLogLevel(level: String): Unit = {
spark.sparkContext.setLogLevel(level)
}
def getExecutorInfo: Array[String] = {
spark.sparkContext.statusTracker.getExecutorInfos.map(_.executorId)
}
def showConfiguration(): Unit = {
spark.conf.getAll.foreach { case (key, value) =>
println(s"$key = $value")
}
}
def getCacheStatistics(): String = {
val sc = spark.sparkContext
s"""Cache Statistics:
|Total Storage Memory: ${sc.statusTracker.getExecutorInfos.map(_.maxMemory).sum / (1024 * 1024)} MB
|Used Storage Memory: ${sc.statusTracker.getExecutorInfos.map(_.memoryUsed).sum / (1024 * 1024)} MB
|Active Jobs: ${sc.statusTracker.getActiveJobIds().length}
|Active Stages: ${sc.statusTracker.getActiveStageIds().length}
""".stripMargin
}
}
}
// Domain models for data processing
case class Customer(
customerId: String,
firstName: String,
lastName: String,
email: String,
phone: String,
registrationDate: Timestamp,
birthDate: Option[Timestamp],
city: String,
state: String,
country: String,
totalSpent: Double,
orderCount: Int,
lastOrderDate: Option[Timestamp]
)
case class Order(
orderId: String,
customerId: String,
orderDate: Timestamp,
totalAmount: Double,
status: String,
shippingAddress: String,
paymentMethod: String,
discountAmount: Double
)
case class OrderItem(
orderId: String,
productId: String,
quantity: Int,
unitPrice: Double,
discount: Double
)
case class Product(
productId: String,
name: String,
category: String,
brand: String,
price: Double,
costPrice: Double,
description: String,
inStock: Boolean,
stockQuantity: Int,
supplier: String
)
case class CustomerSummary(
customerId: String,
fullName: String,
email: String,
totalOrders: Long,
totalSpent: Double,
averageOrderValue: Double,
lastOrderDate: Option[Timestamp],
favoriteCategory: String,
customerSegment: String
)
case class ProductAnalytics(
productId: String,
productName: String,
category: String,
totalSold: Long,
totalRevenue: Double,
averagePrice: Double,
profitMargin: Double,
inventoryTurnover: Double,
topCustomers: Seq[String]
)
DataFrames and Datasets
Advanced DataFrame Operations
object DataFrameOperations {
// DataFrame transformations and analysis
class ECommerceAnalytics(spark: SparkSession) {
import spark.implicits._
// Load data from various sources
def loadCustomers(path: String): DataFrame = {
spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.csv(path)
}
def loadOrders(path: String): DataFrame = {
spark.read
.option("multiline", "true")
.json(path)
}
def loadProducts(path: String): DataFrame = {
spark.read
.format("parquet")
.load(path)
}
def loadOrderItems(path: String): DataFrame = {
spark.read
.format("delta") // Delta Lake format
.load(path)
}
// Complex data transformations
def enrichCustomerData(customers: DataFrame, orders: DataFrame): DataFrame = {
customers
.join(
orders.groupBy("customerId")
.agg(
count("orderId").as("orderCount"),
sum("totalAmount").as("totalSpent"),
max("orderDate").as("lastOrderDate"),
avg("totalAmount").as("averageOrderValue")
),
Seq("customerId"),
"left"
)
.withColumn("customerSegment",
when(col("totalSpent") > 1000, "Premium")
.when(col("totalSpent") > 500, "Gold")
.when(col("totalSpent") > 100, "Silver")
.otherwise("Bronze")
)
.withColumn("daysSinceLastOrder",
when(col("lastOrderDate").isNotNull,
datediff(current_date(), col("lastOrderDate"))
).otherwise(null)
)
.withColumn("isActive",
when(col("daysSinceLastOrder") <= 90, true)
.when(col("daysSinceLastOrder").isNull, false)
.otherwise(false)
)
}
// Advanced analytics queries
def customerLifetimeValue(customers: DataFrame, orders: DataFrame, orderItems: DataFrame, products: DataFrame): DataFrame = {
// Calculate detailed order metrics
val orderDetails = orders
.join(orderItems, "orderId")
.join(products, "productId")
.withColumn("itemTotal", col("quantity") * col("unitPrice") - col("discount"))
.withColumn("itemProfit", (col("unitPrice") - col("costPrice")) * col("quantity") - col("discount"))
// Customer CLV calculation
orderDetails
.groupBy("customerId")
.agg(
countDistinct("orderId").as("totalOrders"),
sum("itemTotal").as("totalRevenue"),
sum("itemProfit").as("totalProfit"),
avg("itemTotal").as("averageOrderValue"),
stddev("itemTotal").as("orderValueStdDev"),
min("orderDate").as("firstOrderDate"),
max("orderDate").as("lastOrderDate"),
countDistinct("category").as("categoryDiversity"),
collect_set("category").as("purchasedCategories")
)
.withColumn("customerTenure",
datediff(col("lastOrderDate"), col("firstOrderDate"))
)
.withColumn("orderFrequency",
when(col("customerTenure") > 0,
col("totalOrders") / (col("customerTenure") / 365.0)
).otherwise(0.0)
)
.withColumn("predictedCLV",
col("averageOrderValue") * col("orderFrequency") * 2.0 // 2-year prediction
)
.withColumn("profitMargin",
when(col("totalRevenue") > 0,
col("totalProfit") / col("totalRevenue")
).otherwise(0.0)
)
}
// Cohort analysis
def cohortAnalysis(customers: DataFrame, orders: DataFrame): DataFrame = {
// Define customer cohorts by registration month
val customerCohorts = customers
.withColumn("cohortMonth", date_format(col("registrationDate"), "yyyy-MM"))
.select("customerId", "cohortMonth")
// Calculate monthly order activity
val monthlyActivity = orders
.join(customerCohorts, "customerId")
.withColumn("orderMonth", date_format(col("orderDate"), "yyyy-MM"))
.withColumn("periodNumber",
months_between(col("orderMonth"), col("cohortMonth"))
)
.groupBy("cohortMonth", "periodNumber")
.agg(
countDistinct("customerId").as("activeCustomers"),
sum("totalAmount").as("cohortRevenue"),
avg("totalAmount").as("avgOrderValue")
)
// Calculate cohort sizes
val cohortSizes = customerCohorts
.groupBy("cohortMonth")
.agg(countDistinct("customerId").as("cohortSize"))
// Calculate retention rates
monthlyActivity
.join(cohortSizes, "cohortMonth")
.withColumn("retentionRate",
col("activeCustomers") / col("cohortSize")
)
.withColumn("revenuePerCustomer",
col("cohortRevenue") / col("cohortSize")
)
.orderBy("cohortMonth", "periodNumber")
}
// Product recommendation analysis
def marketBasketAnalysis(orderItems: DataFrame, products: DataFrame, minSupport: Double = 0.01): DataFrame = {
// Calculate item frequencies
val itemFrequency = orderItems
.join(products, "productId")
.groupBy("category")
.agg(
countDistinct("orderId").as("orderCount"),
sum("quantity").as("totalQuantity")
)
val totalOrders = orderItems.select("orderId").distinct().count()
// Find frequent itemsets (categories in this example)
val frequentItems = itemFrequency
.withColumn("support", col("orderCount") / totalOrders)
.filter(col("support") >= minSupport)
// Calculate association rules between categories
val orderCategories = orderItems
.join(products, "productId")
.groupBy("orderId")
.agg(collect_set("category").as("categories"))
// Explode to get category pairs
val categoryPairs = orderCategories
.select(col("orderId"), explode(col("categories")).as("categoryA"))
.join(
orderCategories.select(col("orderId"), explode(col("categories")).as("categoryB")),
"orderId"
)
.filter(col("categoryA") < col("categoryB")) // Avoid duplicates and self-pairs
// Calculate confidence and lift
categoryPairs
.groupBy("categoryA", "categoryB")
.agg(countDistinct("orderId").as("coOccurrence"))
.join(
itemFrequency.select(col("category").as("categoryA"), col("orderCount").as("countA")),
"categoryA"
)
.join(
itemFrequency.select(col("category").as("categoryB"), col("orderCount").as("countB")),
"categoryB"
)
.withColumn("confidence", col("coOccurrence") / col("countA"))
.withColumn("expectedCoOccurrence",
(col("countA") * col("countB")) / totalOrders
)
.withColumn("lift",
col("coOccurrence") / col("expectedCoOccurrence")
)
.withColumn("support", col("coOccurrence") / totalOrders)
.filter(col("confidence") >= 0.1 && col("lift") > 1.0)
.orderBy(desc("lift"))
}
// Time series analysis
def salesTrendAnalysis(orders: DataFrame, orderItems: DataFrame, products: DataFrame): DataFrame = {
val dailySales = orders
.join(orderItems, "orderId")
.join(products, "productId")
.withColumn("orderDate", to_date(col("orderDate")))
.withColumn("revenue", col("quantity") * col("unitPrice"))
.withColumn("profit", (col("unitPrice") - col("costPrice")) * col("quantity"))
.groupBy("orderDate", "category")
.agg(
sum("revenue").as("dailyRevenue"),
sum("profit").as("dailyProfit"),
sum("quantity").as("dailyQuantity"),
countDistinct("orderId").as("dailyOrders"),
countDistinct("customerId").as("dailyCustomers")
)
// Add moving averages and trends
val windowSpec = Window.partitionBy("category").orderBy("orderDate").rowsBetween(-6, 0)
dailySales
.withColumn("revenue7DayMA", avg("dailyRevenue").over(windowSpec))
.withColumn("revenueGrowth",
lag("dailyRevenue", 1).over(windowSpec.rowsBetween(-1, -1)) match {
case lagRevenue if lagRevenue != null =>
(col("dailyRevenue") - lagRevenue) / lagRevenue * 100
case _ => lit(0.0)
}
)
.withColumn("profitMargin", col("dailyProfit") / col("dailyRevenue") * 100)
.withColumn("avgOrderValue", col("dailyRevenue") / col("dailyOrders"))
.orderBy("category", "orderDate")
}
// Advanced window functions
def customerRankingAnalysis(customers: DataFrame, orders: DataFrame): DataFrame = {
val customerMetrics = orders
.groupBy("customerId")
.agg(
sum("totalAmount").as("totalSpent"),
count("orderId").as("orderCount"),
avg("totalAmount").as("avgOrderValue"),
max("orderDate").as("lastOrderDate")
)
.join(customers.select("customerId", "registrationDate", "city", "state"), "customerId")
val stateWindow = Window.partitionBy("state").orderBy(desc("totalSpent"))
val globalWindow = Window.orderBy(desc("totalSpent"))
customerMetrics
.withColumn("stateRank", row_number().over(stateWindow))
.withColumn("statePercentile", percent_rank().over(stateWindow))
.withColumn("globalRank", row_number().over(globalWindow))
.withColumn("globalPercentile", percent_rank().over(globalWindow))
.withColumn("isTopCustomerInState", col("stateRank") <= 10)
.withColumn("isTop1PercentGlobal", col("globalPercentile") >= 0.99)
.withColumn("customerTier",
when(col("globalPercentile") >= 0.95, "Platinum")
.when(col("globalPercentile") >= 0.80, "Gold")
.when(col("globalPercentile") >= 0.60, "Silver")
.otherwise("Bronze")
)
}
}
// Performance optimization utilities
object DataFrameOptimization {
implicit class DataFrameOptimizations(df: DataFrame) {
def optimizePartitioning(partitionColumns: String*): DataFrame = {
df.repartition(partitionColumns.map(col): _*)
}
def optimizeForJoins(keyColumn: String, numPartitions: Int = 200): DataFrame = {
df.repartition(numPartitions, col(keyColumn))
}
def cacheWithStorageLevel(level: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER): DataFrame = {
df.persist(level)
}
def analyzeSkew(keyColumn: String): DataFrame = {
df.groupBy(keyColumn)
.count()
.orderBy(desc("count"))
.limit(100)
}
def broadcastIfSmall(maxSizeBytes: Long = 20 * 1024 * 1024): DataFrame = {
// Check if DataFrame is small enough to broadcast
val sizeEstimate = df.queryExecution.logical.stats.sizeInBytes
if (sizeEstimate <= maxSizeBytes) {
broadcast(df)
} else {
df
}
}
def showExecutionPlan(): Unit = {
df.explain(true)
}
def getPartitionInfo(): String = {
s"""Partition Information:
|Number of partitions: ${df.rdd.getNumPartitions}
|Partition sizes: ${df.rdd.mapPartitions(iter => Iterator(iter.size)).collect().mkString(", ")}
|Preferred locations: ${df.rdd.preferredLocations(df.rdd.partitions.head).mkString(", ")}
""".stripMargin
}
}
}
}
RDD Operations and Low-Level Programming
Advanced RDD Patterns
object RDDOperations {
// Low-level RDD operations for performance-critical scenarios
class AdvancedRDDProcessing(spark: SparkSession) {
val sc = spark.sparkContext
// Custom partitioner for optimal data distribution
class CustomerPartitioner(numPartitions: Int) extends org.apache.spark.Partitioner {
override def numPartitions: Int = numPartitions
override def getPartition(key: Any): Int = key match {
case customerId: String =>
// Hash-based partitioning with consistent distribution
Math.abs(customerId.hashCode) % numPartitions
case _ => 0
}
override def equals(other: Any): Boolean = other match {
case h: CustomerPartitioner => h.numPartitions == numPartitions
case _ => false
}
override def hashCode: Int = numPartitions
}
// Advanced RDD transformations
def processLargeDataset[T](data: RDD[T], partitions: Int = 200): RDD[T] = {
data
.coalesce(partitions) // Optimize partition count
.mapPartitions(processPartition, preservesPartitioning = true)
.cache() // Cache intermediate results
}
private def processPartition[T](iterator: Iterator[T]): Iterator[T] = {
// Process entire partition in memory for efficiency
val batch = iterator.toList
val processed = batch.map(processRecord)
processed.iterator
}
private def processRecord[T](record: T): T = {
// Simulate complex processing
Thread.sleep(1) // Simulate work
record
}
// Complex RDD joins with custom partitioners
def efficientJoin(
customers: RDD[(String, Customer)],
orders: RDD[(String, Order)]
): RDD[(String, (Customer, Order))] = {
val partitioner = new CustomerPartitioner(200)
val partitionedCustomers = customers.partitionBy(partitioner).cache()
val partitionedOrders = orders.partitionBy(partitioner).cache()
partitionedCustomers.join(partitionedOrders)
}
// Accumulator-based analytics
def calculateMetricsWithAccumulators(orders: RDD[Order]): Map[String, Long] = {
val totalOrdersAcc = sc.longAccumulator("Total Orders")
val totalRevenueAcc = sc.doubleAccumulator("Total Revenue")
val errorCountAcc = sc.longAccumulator("Error Count")
val statusCounts = Map(
"pending" -> sc.longAccumulator("Pending Orders"),
"completed" -> sc.longAccumulator("Completed Orders"),
"cancelled" -> sc.longAccumulator("Cancelled Orders")
)
orders.foreach { order =>
try {
totalOrdersAcc.add(1)
totalRevenueAcc.add(order.totalAmount)
statusCounts.get(order.status.toLowerCase) match {
case Some(acc) => acc.add(1)
case None => errorCountAcc.add(1)
}
} catch {
case _: Exception => errorCountAcc.add(1)
}
}
Map(
"totalOrders" -> totalOrdersAcc.value,
"totalRevenue" -> totalRevenueAcc.value.toLong,
"pendingOrders" -> statusCounts("pending").value,
"completedOrders" -> statusCounts("completed").value,
"cancelledOrders" -> statusCounts("cancelled").value,
"errors" -> errorCountAcc.value
)
}
// Custom aggregation functions
def customAggregation(data: RDD[(String, Double)]): RDD[(String, (Double, Double, Long))] = {
data.aggregateByKey(
(0.0, 0.0, 0L) // (sum, sumSquares, count)
)(
// Sequence operation (within partition)
{ case ((sum, sumSquares, count), value) =>
(sum + value, sumSquares + value * value, count + 1)
},
// Combine operation (across partitions)
{ case ((sum1, sumSquares1, count1), (sum2, sumSquares2, count2)) =>
(sum1 + sum2, sumSquares1 + sumSquares2, count1 + count2)
}
).mapValues { case (sum, sumSquares, count) =>
val mean = sum / count
val variance = (sumSquares / count) - (mean * mean)
(mean, math.sqrt(variance), count)
}
}
// Memory-efficient large dataset processing
def processLargeFiles(filePaths: Seq[String]): RDD[String] = {
sc.parallelize(filePaths, filePaths.length)
.flatMap { path =>
// Process each file independently
scala.io.Source.fromFile(path).getLines().toIterator
}
.mapPartitions(processLinesInBatches)
}
private def processLinesInBatches(lines: Iterator[String]): Iterator[String] = {
lines.grouped(1000).flatMap { batch =>
// Process batch efficiently
batch.filter(_.nonEmpty).map(_.toUpperCase)
}
}
// Graph processing with RDDs
def pageRank(
vertices: RDD[(String, String)], // (vertexId, vertexData)
edges: RDD[(String, String)], // (source, destination)
numIterations: Int = 10,
dampingFactor: Double = 0.85
): RDD[(String, Double)] = {
// Initialize ranks
var ranks = vertices.mapValues(_ => 1.0)
// Build adjacency list
val links = edges.groupByKey().cache()
for (iteration <- 1 to numIterations) {
// Calculate contributions
val contributions = links.join(ranks).flatMap {
case (vertex, (destinations, rank)) =>
destinations.map(dest => (dest, rank / destinations.size))
}
// Update ranks
ranks = contributions.reduceByKey(_ + _).mapValues { contribution =>
(1.0 - dampingFactor) + dampingFactor * contribution
}
}
ranks
}
// Time series processing
def rollingWindowAnalysis(
timeSeries: RDD[(Long, Double)], // (timestamp, value)
windowSize: Int = 7
): RDD[(Long, (Double, Double))] = { // (timestamp, (average, stddev))
val sortedData = timeSeries.sortByKey()
sortedData.mapPartitions { partition =>
val data = partition.toArray
val results = scala.collection.mutable.ArrayBuffer[(Long, (Double, Double))]()
for (i <- data.indices) {
val windowStart = Math.max(0, i - windowSize + 1)
val window = data.slice(windowStart, i + 1)
val values = window.map(_._2)
val mean = values.sum / values.length
val variance = values.map(v => math.pow(v - mean, 2)).sum / values.length
val stddev = math.sqrt(variance)
results += ((data(i)._1, (mean, stddev)))
}
results.iterator
}
}
}
// Performance monitoring and optimization
object RDDPerformanceUtils {
def analyzeRDDLineage[T](rdd: RDD[T]): String = {
s"""RDD Lineage Analysis:
|Debug String: ${rdd.toDebugString}
|Dependencies: ${rdd.dependencies.map(_.getClass.getSimpleName).mkString(", ")}
|Partitions: ${rdd.getNumPartitions}
|Storage Level: ${rdd.getStorageLevel}
|Checkpointed: ${rdd.isCheckpointed}
""".stripMargin
}
def optimizeRDDPartitioning[T](rdd: RDD[T], targetPartitions: Option[Int] = None): RDD[T] = {
val currentPartitions = rdd.getNumPartitions
val optimalPartitions = targetPartitions.getOrElse(
Math.max(2, Runtime.getRuntime.availableProcessors() * 2)
)
if (currentPartitions != optimalPartitions) {
if (currentPartitions > optimalPartitions) {
rdd.coalesce(optimalPartitions)
} else {
rdd.repartition(optimalPartitions)
}
} else {
rdd
}
}
def measureRDDOperation[T, R](rdd: RDD[T])(operation: RDD[T] => R): (R, Long) = {
val startTime = System.currentTimeMillis()
val result = operation(rdd)
val endTime = System.currentTimeMillis()
(result, endTime - startTime)
}
def cacheStrategy[T](rdd: RDD[T], reusageCount: Int): RDD[T] = {
if (reusageCount > 2) {
rdd.cache()
} else {
rdd
}
}
}
}
Spark SQL and Advanced Querying
Complex SQL Operations
object SparkSQLOperations {
class AdvancedSQLAnalytics(spark: SparkSession) {
import spark.implicits._
// Register temporary views for SQL access
def setupTables(
customers: DataFrame,
orders: DataFrame,
orderItems: DataFrame,
products: DataFrame
): Unit = {
customers.createOrReplaceTempView("customers")
orders.createOrReplaceTempView("orders")
orderItems.createOrReplaceTempView("order_items")
products.createOrReplaceTempView("products")
}
// Complex analytical queries
def customerSegmentationSQL(): DataFrame = {
spark.sql("""
WITH customer_metrics AS (
SELECT
c.customerId,
c.firstName,
c.lastName,
c.email,
c.registrationDate,
COUNT(o.orderId) as order_count,
SUM(o.totalAmount) as total_spent,
AVG(o.totalAmount) as avg_order_value,
MAX(o.orderDate) as last_order_date,
DATEDIFF(CURRENT_DATE(), MAX(o.orderDate)) as days_since_last_order,
STDDEV(o.totalAmount) as order_value_stddev,
COUNT(DISTINCT DATE_FORMAT(o.orderDate, 'yyyy-MM')) as active_months
FROM customers c
LEFT JOIN orders o ON c.customerId = o.customerId
GROUP BY c.customerId, c.firstName, c.lastName, c.email, c.registrationDate
),
rfm_scores AS (
SELECT *,
NTILE(5) OVER (ORDER BY days_since_last_order ASC) as recency_score,
NTILE(5) OVER (ORDER BY order_count DESC) as frequency_score,
NTILE(5) OVER (ORDER BY total_spent DESC) as monetary_score
FROM customer_metrics
)
SELECT *,
CONCAT(recency_score, frequency_score, monetary_score) as rfm_segment,
CASE
WHEN recency_score >= 4 AND frequency_score >= 4 AND monetary_score >= 4 THEN 'Champions'
WHEN recency_score >= 3 AND frequency_score >= 3 AND monetary_score >= 3 THEN 'Loyal Customers'
WHEN recency_score >= 3 AND frequency_score <= 2 AND monetary_score >= 3 THEN 'Potential Loyalists'
WHEN recency_score >= 4 AND frequency_score <= 2 THEN 'New Customers'
WHEN recency_score >= 3 AND frequency_score >= 3 AND monetary_score <= 2 THEN 'Promising'
WHEN recency_score <= 2 AND frequency_score >= 3 AND monetary_score >= 3 THEN 'Need Attention'
WHEN recency_score <= 2 AND frequency_score >= 2 AND monetary_score >= 2 THEN 'About to Sleep'
WHEN recency_score <= 2 AND frequency_score <= 2 AND monetary_score >= 3 THEN 'At Risk'
WHEN recency_score <= 1 AND frequency_score >= 2 AND monetary_score <= 2 THEN 'Cannot Lose Them'
ELSE 'Lost'
END as customer_segment
FROM rfm_scores
ORDER BY total_spent DESC
""")
}
def productPerformanceSQL(): DataFrame = {
spark.sql("""
WITH product_metrics AS (
SELECT
p.productId,
p.name as product_name,
p.category,
p.brand,
p.price,
p.costPrice,
COUNT(DISTINCT oi.orderId) as orders_count,
SUM(oi.quantity) as total_quantity_sold,
SUM(oi.quantity * oi.unitPrice) as total_revenue,
SUM(oi.quantity * (oi.unitPrice - p.costPrice)) as total_profit,
AVG(oi.unitPrice) as avg_selling_price,
COUNT(DISTINCT o.customerId) as unique_customers,
MIN(o.orderDate) as first_sale_date,
MAX(o.orderDate) as last_sale_date
FROM products p
LEFT JOIN order_items oi ON p.productId = oi.productId
LEFT JOIN orders o ON oi.orderId = o.orderId
GROUP BY p.productId, p.name, p.category, p.brand, p.price, p.costPrice
),
category_totals AS (
SELECT
category,
SUM(total_revenue) as category_revenue,
SUM(total_quantity_sold) as category_quantity
FROM product_metrics
GROUP BY category
)
SELECT
pm.*,
ROUND((total_revenue / category_revenue) * 100, 2) as category_revenue_share,
ROUND((total_quantity_sold / category_quantity) * 100, 2) as category_quantity_share,
ROUND((total_profit / total_revenue) * 100, 2) as profit_margin_percent,
DENSE_RANK() OVER (PARTITION BY category ORDER BY total_revenue DESC) as category_rank,
CASE
WHEN total_revenue > 0 AND total_quantity_sold > 0 THEN 'Active'
WHEN total_revenue = 0 THEN 'No Sales'
ELSE 'Unknown'
END as product_status,
DATEDIFF(CURRENT_DATE(), last_sale_date) as days_since_last_sale
FROM product_metrics pm
JOIN category_totals ct ON pm.category = ct.category
ORDER BY total_revenue DESC
""")
}
def salesTrendSQL(): DataFrame = {
spark.sql("""
WITH daily_sales AS (
SELECT
DATE(o.orderDate) as sale_date,
p.category,
COUNT(DISTINCT o.orderId) as daily_orders,
COUNT(DISTINCT o.customerId) as daily_customers,
SUM(oi.quantity * oi.unitPrice) as daily_revenue,
SUM(oi.quantity) as daily_quantity,
AVG(o.totalAmount) as avg_order_value
FROM orders o
JOIN order_items oi ON o.orderId = oi.orderId
JOIN products p ON oi.productId = p.productId
GROUP BY DATE(o.orderDate), p.category
),
sales_with_trends AS (
SELECT *,
LAG(daily_revenue, 1) OVER (PARTITION BY category ORDER BY sale_date) as prev_day_revenue,
LAG(daily_revenue, 7) OVER (PARTITION BY category ORDER BY sale_date) as prev_week_revenue,
AVG(daily_revenue) OVER (
PARTITION BY category
ORDER BY sale_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as revenue_7day_ma,
AVG(daily_revenue) OVER (
PARTITION BY category
ORDER BY sale_date
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
) as revenue_30day_ma
FROM daily_sales
)
SELECT *,
CASE
WHEN prev_day_revenue > 0 THEN
ROUND(((daily_revenue - prev_day_revenue) / prev_day_revenue) * 100, 2)
ELSE 0
END as day_over_day_growth,
CASE
WHEN prev_week_revenue > 0 THEN
ROUND(((daily_revenue - prev_week_revenue) / prev_week_revenue) * 100, 2)
ELSE 0
END as week_over_week_growth,
CASE
WHEN revenue_30day_ma > 0 THEN
ROUND(((daily_revenue - revenue_30day_ma) / revenue_30day_ma) * 100, 2)
ELSE 0
END as vs_30day_average
FROM sales_with_trends
ORDER BY category, sale_date
""")
}
def cohortAnalysisSQL(): DataFrame = {
spark.sql("""
WITH customer_orders AS (
SELECT
c.customerId,
DATE_FORMAT(c.registrationDate, 'yyyy-MM') as cohort_month,
o.orderId,
o.orderDate,
o.totalAmount,
MONTHS_BETWEEN(o.orderDate, c.registrationDate) as period_number
FROM customers c
JOIN orders o ON c.customerId = o.customerId
),
cohort_data AS (
SELECT
cohort_month,
period_number,
COUNT(DISTINCT customerId) as customers,
COUNT(DISTINCT orderId) as orders,
SUM(totalAmount) as revenue,
AVG(totalAmount) as avg_order_value
FROM customer_orders
GROUP BY cohort_month, period_number
),
cohort_sizes AS (
SELECT
cohort_month,
COUNT(DISTINCT customerId) as cohort_size
FROM customers
GROUP BY DATE_FORMAT(registrationDate, 'yyyy-MM')
)
SELECT
cd.*,
cs.cohort_size,
ROUND((cd.customers / cs.cohort_size) * 100, 2) as retention_rate,
ROUND(cd.revenue / cs.cohort_size, 2) as revenue_per_customer,
ROUND(cd.orders / cs.cohort_size, 2) as orders_per_customer
FROM cohort_data cd
JOIN cohort_sizes cs ON cd.cohort_month = cs.cohort_month
ORDER BY cohort_month, period_number
""")
}
// Advanced window functions and analytics
def advancedWindowFunctionsSQL(): DataFrame = {
spark.sql("""
WITH ranked_customers AS (
SELECT
customerId,
firstName,
lastName,
state,
SUM(totalAmount) as total_spent,
COUNT(orderId) as order_count,
ROW_NUMBER() OVER (PARTITION BY state ORDER BY SUM(totalAmount) DESC) as state_rank,
DENSE_RANK() OVER (ORDER BY SUM(totalAmount) DESC) as global_rank,
PERCENT_RANK() OVER (ORDER BY SUM(totalAmount)) as percentile_rank,
NTILE(10) OVER (ORDER BY SUM(totalAmount) DESC) as decile,
LAG(SUM(totalAmount), 1) OVER (PARTITION BY state ORDER BY SUM(totalAmount) DESC) as next_customer_spent,
LEAD(SUM(totalAmount), 1) OVER (PARTITION BY state ORDER BY SUM(totalAmount) DESC) as prev_customer_spent
FROM customers c
JOIN orders o ON c.customerId = o.customerId
GROUP BY customerId, firstName, lastName, state
)
SELECT *,
CASE
WHEN state_rank <= 5 THEN 'Top 5 in State'
WHEN state_rank <= 10 THEN 'Top 10 in State'
WHEN global_rank <= 100 THEN 'Top 100 Global'
WHEN percentile_rank >= 0.95 THEN 'Top 5% Global'
WHEN percentile_rank >= 0.80 THEN 'Top 20% Global'
ELSE 'Standard'
END as customer_tier,
COALESCE(total_spent - next_customer_spent, 0) as gap_to_next_rank,
COALESCE(prev_customer_spent - total_spent, 0) as gap_to_prev_rank
FROM ranked_customers
ORDER BY global_rank
""")
}
// User-defined functions for complex logic
def registerUDFs(): Unit = {
// Customer lifetime value calculation
spark.udf.register("calculate_clv", (
totalSpent: Double,
orderCount: Int,
customerTenure: Int,
avgOrderValue: Double
) => {
if (customerTenure > 0) {
val orderFrequency = orderCount.toDouble / (customerTenure / 365.0)
val predictedLifetime = 2.0 // years
avgOrderValue * orderFrequency * predictedLifetime
} else {
avgOrderValue * 2.0 // Default prediction for new customers
}
})
// Churn probability calculation
spark.udf.register("churn_probability", (
daysSinceLastOrder: Int,
orderFrequency: Double,
totalSpent: Double
) => {
val recencyScore = Math.min(daysSinceLastOrder / 90.0, 1.0)
val frequencyScore = Math.max(1.0 - (orderFrequency / 12.0), 0.0)
val monetaryScore = Math.max(1.0 - (totalSpent / 1000.0), 0.0)
(recencyScore * 0.5 + frequencyScore * 0.3 + monetaryScore * 0.2) * 100
})
// Product recommendation score
spark.udf.register("recommendation_score", (
categoryPurchaseFreq: Int,
productPrice: Double,
customerAvgOrderValue: Double,
productRating: Double
) => {
val affinityScore = Math.min(categoryPurchaseFreq / 5.0, 1.0)
val priceScore = if (customerAvgOrderValue > 0) {
Math.max(1.0 - Math.abs(productPrice - customerAvgOrderValue) / customerAvgOrderValue, 0.0)
} else 0.5
val qualityScore = productRating / 5.0
(affinityScore * 0.4 + priceScore * 0.3 + qualityScore * 0.3) * 100
})
}
// Performance optimization hints
def optimizedQuery(): DataFrame = {
spark.sql("""
/*+ BROADCAST(p) */
SELECT /*+ REPARTITION(100) */
c.customerId,
c.firstName,
c.lastName,
COUNT(DISTINCT o.orderId) as order_count,
SUM(oi.quantity * oi.unitPrice) as total_revenue
FROM customers c
JOIN orders o ON c.customerId = o.customerId
JOIN order_items oi ON o.orderId = oi.orderId
JOIN products p ON oi.productId = p.productId
GROUP BY c.customerId, c.firstName, c.lastName
HAVING COUNT(DISTINCT o.orderId) > 5
ORDER BY total_revenue DESC
LIMIT 1000
""")
}
}
}
Spark Streaming and Real-Time Processing
Structured Streaming Operations
import org.apache.spark.sql.streaming.Trigger
import java.util.concurrent.TimeUnit
object SparkStreamingOperations {
class RealTimeAnalytics(spark: SparkSession) {
import spark.implicits._
// Kafka streaming setup
def createKafkaStream(
bootstrapServers: String,
topics: String,
startingOffsets: String = "latest"
): DataFrame = {
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.option("startingOffsets", startingOffsets)
.option("failOnDataLoss", "false")
.load()
}
// Real-time order processing
def processOrderStream(kafkaDF: DataFrame): StreamingQuery = {
val orderStream = kafkaDF
.select(
col("timestamp").as("kafka_timestamp"),
from_json(col("value").cast("string"), orderSchema).as("order")
)
.select("kafka_timestamp", "order.*")
.withColumn("processing_time", current_timestamp())
.withColumn("hour", hour(col("order_date")))
.withColumn("day_of_week", dayofweek(col("order_date")))
// Real-time aggregations
val orderMetrics = orderStream
.withWatermark("order_date", "10 minutes")
.groupBy(
window(col("order_date"), "5 minutes"),
col("hour"),
col("day_of_week")
)
.agg(
count("order_id").as("order_count"),
sum("total_amount").as("total_revenue"),
avg("total_amount").as("avg_order_value"),
countDistinct("customer_id").as("unique_customers"),
stddev("total_amount").as("order_value_stddev")
)
.withColumn("revenue_per_customer", col("total_revenue") / col("unique_customers"))
orderMetrics
.writeStream
.outputMode("update")
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
}
// Real-time fraud detection
def fraudDetectionStream(orderStream: DataFrame): StreamingQuery = {
// Define fraud detection rules
val suspiciousOrders = orderStream
.withWatermark("order_date", "5 minutes")
.groupBy(
col("customer_id"),
window(col("order_date"), "10 minutes")
)
.agg(
count("order_id").as("orders_in_window"),
sum("total_amount").as("total_spent_in_window"),
max("total_amount").as("max_order_value"),
countDistinct("payment_method").as("payment_methods_used")
)
.filter(
col("orders_in_window") > 5 ||
col("total_spent_in_window") > 5000 ||
col("max_order_value") > 2000 ||
col("payment_methods_used") > 2
)
.withColumn("fraud_score",
when(col("orders_in_window") > 10, 100)
.when(col("total_spent_in_window") > 10000, 95)
.when(col("max_order_value") > 5000, 90)
.otherwise(
(col("orders_in_window") * 10) +
(col("total_spent_in_window") / 100) +
(col("payment_methods_used") * 15)
)
)
.withColumn("risk_level",
when(col("fraud_score") >= 90, "HIGH")
.when(col("fraud_score") >= 70, "MEDIUM")
.when(col("fraud_score") >= 50, "LOW")
.otherwise("NORMAL")
)
suspiciousOrders
.writeStream
.outputMode("update")
.format("delta")
.option("path", "/data/fraud_alerts")
.option("checkpointLocation", "/checkpoints/fraud_detection")
.trigger(Trigger.ProcessingTime("1 minute"))
.start()
}
// Customer behavior analytics
def customerBehaviorStream(orderStream: DataFrame): StreamingQuery = {
val customerMetrics = orderStream
.withWatermark("order_date", "1 hour")
.groupBy(
col("customer_id"),
window(col("order_date"), "1 hour", "15 minutes")
)
.agg(
count("order_id").as("hourly_orders"),
sum("total_amount").as("hourly_spent"),
avg("total_amount").as("avg_order_value"),
collect_list("product_category").as("categories_purchased"),
min("order_date").as("first_order_in_window"),
max("order_date").as("last_order_in_window")
)
.withColumn("session_duration_minutes",
(unix_timestamp(col("last_order_in_window")) -
unix_timestamp(col("first_order_in_window"))) / 60
)
.withColumn("category_diversity", size(array_distinct(col("categories_purchased"))))
.withColumn("shopping_intensity",
when(col("session_duration_minutes") > 0,
col("hourly_orders") / col("session_duration_minutes")
).otherwise(0)
)
.withColumn("customer_segment",
when(col("hourly_spent") > 500 && col("shopping_intensity") > 0.1, "HIGH_VALUE_ACTIVE")
.when(col("hourly_spent") > 200, "MEDIUM_VALUE")
.when(col("shopping_intensity") > 0.2, "HIGH_ACTIVITY")
.otherwise("REGULAR")
)
customerMetrics
.writeStream
.outputMode("update")
.format("delta")
.option("path", "/data/customer_behavior")
.option("checkpointLocation", "/checkpoints/customer_behavior")
.trigger(Trigger.ProcessingTime("5 minutes"))
.start()
}
// Product recommendation engine
def recommendationStream(orderStream: DataFrame): StreamingQuery = {
// Real-time collaborative filtering
val productPairs = orderStream
.withWatermark("order_date", "2 hours")
.groupBy("order_id")
.agg(collect_list("product_id").as("products"))
.select(
col("order_id"),
explode(col("products")).as("product_a")
)
.join(
orderStream.groupBy("order_id")
.agg(collect_list("product_id").as("products"))
.select(
col("order_id"),
explode(col("products")).as("product_b")
),
"order_id"
)
.filter(col("product_a") < col("product_b")) // Avoid duplicates
.groupBy("product_a", "product_b")
.agg(
count("order_id").as("co_occurrence_count"),
countDistinct("order_id").as("unique_orders")
)
.withColumn("confidence", col("co_occurrence_count") / col("unique_orders"))
.filter(col("confidence") > 0.1 && col("co_occurrence_count") > 5)
productPairs
.writeStream
.outputMode("update")
.format("delta")
.option("path", "/data/product_recommendations")
.option("checkpointLocation", "/checkpoints/recommendations")
.trigger(Trigger.ProcessingTime("10 minutes"))
.start()
}
// Advanced streaming aggregations
def advancedStreamingAggregations(orderStream: DataFrame): StreamingQuery = {
val multiLevelAggregations = orderStream
.withWatermark("order_date", "30 minutes")
.groupBy(
window(col("order_date"), "1 hour"),
col("product_category"),
col("customer_segment")
)
.agg(
count("order_id").as("order_count"),
sum("total_amount").as("revenue"),
countDistinct("customer_id").as("unique_customers"),
avg("total_amount").as("avg_order_value"),
stddev("total_amount").as("order_value_stddev"),
percentile_approx(col("total_amount"), 0.5).as("median_order_value"),
percentile_approx(col("total_amount"), 0.95).as("p95_order_value")
)
.withColumn("revenue_share",
col("revenue") / sum("revenue").over(Window.partitionBy("window"))
)
.withColumn("customer_concentration",
col("unique_customers") / sum("unique_customers").over(Window.partitionBy("window"))
)
.withColumn("market_position",
dense_rank().over(Window.partitionBy("window").orderBy(desc("revenue")))
)
multiLevelAggregations
.writeStream
.outputMode("update")
.format("delta")
.option("path", "/data/streaming_analytics")
.option("checkpointLocation", "/checkpoints/streaming_analytics")
.trigger(Trigger.ProcessingTime("2 minutes"))
.start()
}
// Error handling and monitoring
def monitoringStream(kafkaDF: DataFrame): StreamingQuery = {
val errorMetrics = kafkaDF
.select(
col("timestamp"),
col("partition"),
col("offset"),
when(col("value").isNull, 1).otherwise(0).as("null_messages"),
when(length(col("value")) = 0, 1).otherwise(0).as("empty_messages"),
when(col("value").rlike("\\{.*\\}"), 0).otherwise(1).as("invalid_json")
)
.withWatermark("timestamp", "5 minutes")
.groupBy(
window(col("timestamp"), "1 minute"),
col("partition")
)
.agg(
count("*").as("total_messages"),
sum("null_messages").as("null_count"),
sum("empty_messages").as("empty_count"),
sum("invalid_json").as("invalid_json_count"),
min("offset").as("min_offset"),
max("offset").as("max_offset")
)
.withColumn("error_rate",
(col("null_count") + col("empty_count") + col("invalid_json_count")) / col("total_messages")
)
.withColumn("throughput_per_second",
(col("max_offset") - col("min_offset")) / 60.0
)
errorMetrics
.writeStream
.outputMode("update")
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
}
// Schema definition for order data
private val orderSchema = StructType(Array(
StructField("order_id", StringType, true),
StructField("customer_id", StringType, true),
StructField("order_date", TimestampType, true),
StructField("total_amount", DoubleType, true),
StructField("product_id", StringType, true),
StructField("product_category", StringType, true),
StructField("payment_method", StringType, true),
StructField("customer_segment", StringType, true)
))
}
// Streaming utilities
object StreamingUtils {
def gracefulShutdown(queries: Seq[StreamingQuery]): Unit = {
sys.addShutdownHook {
println("Shutting down streaming queries...")
queries.foreach { query =>
try {
query.stop()
println(s"Stopped query: ${query.name}")
} catch {
case e: Exception => println(s"Error stopping query ${query.name}: ${e.getMessage}")
}
}
}
}
def monitorQueryProgress(query: StreamingQuery): Unit = {
new Thread(() => {
while (query.isActive) {
Thread.sleep(10000) // Check every 10 seconds
val progress = query.lastProgress
if (progress != null) {
println(s"""Query Progress:
|Batch ID: ${progress.batchId}
|Input Rows: ${progress.inputRowsPerSecond}
|Processing Rate: ${progress.inputRowsPerSecond}
|Batch Duration: ${progress.batchDuration}
|""".stripMargin)
}
}
}).start()
}
def createCheckpointCleanup(checkpointPath: String, retentionHours: Int = 24): Unit = {
// Cleanup old checkpoint files to prevent disk space issues
val cleanupInterval = java.time.Duration.ofHours(retentionHours)
val scheduler = java.util.concurrent.Executors.newScheduledThreadPool(1)
scheduler.scheduleAtFixedRate(() => {
try {
val fs = org.apache.hadoop.fs.FileSystem.get(new org.apache.hadoop.conf.Configuration())
val path = new org.apache.hadoop.fs.Path(checkpointPath)
if (fs.exists(path)) {
val cutoffTime = System.currentTimeMillis() - cleanupInterval.toMillis
// Implementation would clean up old checkpoint files
println(s"Checkpoint cleanup completed for path: $checkpointPath")
}
} catch {
case e: Exception => println(s"Checkpoint cleanup failed: ${e.getMessage}")
}
}, 1, 24, TimeUnit.HOURS)
}
}
}
Conclusion
Apache Spark with Scala provides a comprehensive platform for big data processing and analytics. Key concepts include:
Core Architecture:
- SparkSession as the entry point for all operations
- RDDs for low-level distributed computing
- DataFrames and Datasets for structured data processing
- Catalyst optimizer for query optimization
Data Processing Patterns:
- Batch processing for historical data analysis
- Streaming processing for real-time analytics
- Complex transformations and aggregations
- Machine learning pipelines with MLlib
Performance Optimization:
- Partitioning strategies for data distribution
- Caching and persistence for repeated operations
- Broadcast variables for efficient joins
- Accumulator variables for distributed counters
Advanced Analytics:
- Customer segmentation and lifetime value
- Cohort analysis and retention metrics
- Real-time fraud detection
- Product recommendation systems
Best Practices:
- Proper resource configuration for cluster environments
- Error handling and monitoring for production systems
- Checkpoint management for streaming applications
- Data quality validation and cleansing
Integration Patterns:
- Kafka for real-time data streaming
- Delta Lake for reliable data storage
- Hive for metadata management
- External databases for operational integration
Monitoring and Operations:
- Query execution plan analysis
- Performance metrics collection
- Resource utilization monitoring
- Error tracking and alerting
Spark's unified analytics engine enables organizations to build sophisticated data processing pipelines that scale from development to production, handling petabytes of data across distributed clusters while maintaining high performance and reliability.
Comments
Be the first to comment on this lesson!