Big Data Processing with Apache Spark and Scala

Apache Spark with Scala provides a powerful combination for processing massive datasets across distributed clusters. This comprehensive lesson covers Spark fundamentals, DataFrames, Datasets, RDDs, Spark SQL, streaming, machine learning, and optimization techniques for building scalable data processing applications.

Spark Fundamentals and Setup

Core Concepts and Architecture

import org.apache.spark.sql.{SparkSession, DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import java.sql.Timestamp
import java.time.LocalDateTime

// Spark session configuration and setup
object SparkConfiguration {

  def createSparkSession(appName: String): SparkSession = {
    SparkSession.builder()
      .appName(appName)
      .master("local[*]") // Use all available cores locally
      .config("spark.sql.adaptive.enabled", "true")
      .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
      .config("spark.sql.adaptive.skewJoin.enabled", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.execution.arrow.pyspark.enabled", "true")
      .config("spark.sql.execution.arrow.sparkr.enabled", "true")
      .getOrCreate()
  }

  def createClusterSparkSession(appName: String, masterUrl: String): SparkSession = {
    SparkSession.builder()
      .appName(appName)
      .master(masterUrl)
      .config("spark.executor.memory", "4g")
      .config("spark.executor.cores", "4")
      .config("spark.executor.instances", "10")
      .config("spark.driver.memory", "2g")
      .config("spark.driver.maxResultSize", "1g")
      .config("spark.sql.adaptive.enabled", "true")
      .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
      .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.kryo.registrationRequired", "false")
      .getOrCreate()
  }

  // Spark context utilities
  implicit class SparkSessionExtensions(spark: SparkSession) {

    def enableHiveSupport(): SparkSession = {
      spark.newSession().builder()
        .enableHiveSupport()
        .getOrCreate()
    }

    def setLogLevel(level: String): Unit = {
      spark.sparkContext.setLogLevel(level)
    }

    def getExecutorInfo: Array[String] = {
      spark.sparkContext.statusTracker.getExecutorInfos.map(_.executorId)
    }

    def showConfiguration(): Unit = {
      spark.conf.getAll.foreach { case (key, value) =>
        println(s"$key = $value")
      }
    }

    def getCacheStatistics(): String = {
      val sc = spark.sparkContext
      s"""Cache Statistics:
         |Total Storage Memory: ${sc.statusTracker.getExecutorInfos.map(_.maxMemory).sum / (1024 * 1024)} MB
         |Used Storage Memory: ${sc.statusTracker.getExecutorInfos.map(_.memoryUsed).sum / (1024 * 1024)} MB
         |Active Jobs: ${sc.statusTracker.getActiveJobIds().length}
         |Active Stages: ${sc.statusTracker.getActiveStageIds().length}
         """.stripMargin
    }
  }
}

// Domain models for data processing
case class Customer(
  customerId: String,
  firstName: String,
  lastName: String,
  email: String,
  phone: String,
  registrationDate: Timestamp,
  birthDate: Option[Timestamp],
  city: String,
  state: String,
  country: String,
  totalSpent: Double,
  orderCount: Int,
  lastOrderDate: Option[Timestamp]
)

case class Order(
  orderId: String,
  customerId: String,
  orderDate: Timestamp,
  totalAmount: Double,
  status: String,
  shippingAddress: String,
  paymentMethod: String,
  discountAmount: Double
)

case class OrderItem(
  orderId: String,
  productId: String,
  quantity: Int,
  unitPrice: Double,
  discount: Double
)

case class Product(
  productId: String,
  name: String,
  category: String,
  brand: String,
  price: Double,
  costPrice: Double,
  description: String,
  inStock: Boolean,
  stockQuantity: Int,
  supplier: String
)

case class CustomerSummary(
  customerId: String,
  fullName: String,
  email: String,
  totalOrders: Long,
  totalSpent: Double,
  averageOrderValue: Double,
  lastOrderDate: Option[Timestamp],
  favoriteCategory: String,
  customerSegment: String
)

case class ProductAnalytics(
  productId: String,
  productName: String,
  category: String,
  totalSold: Long,
  totalRevenue: Double,
  averagePrice: Double,
  profitMargin: Double,
  inventoryTurnover: Double,
  topCustomers: Seq[String]
)

DataFrames and Datasets

Advanced DataFrame Operations

object DataFrameOperations {

  // DataFrame transformations and analysis
  class ECommerceAnalytics(spark: SparkSession) {
    import spark.implicits._

    // Load data from various sources
    def loadCustomers(path: String): DataFrame = {
      spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
        .csv(path)
    }

    def loadOrders(path: String): DataFrame = {
      spark.read
        .option("multiline", "true")
        .json(path)
    }

    def loadProducts(path: String): DataFrame = {
      spark.read
        .format("parquet")
        .load(path)
    }

    def loadOrderItems(path: String): DataFrame = {
      spark.read
        .format("delta") // Delta Lake format
        .load(path)
    }

    // Complex data transformations
    def enrichCustomerData(customers: DataFrame, orders: DataFrame): DataFrame = {
      customers
        .join(
          orders.groupBy("customerId")
            .agg(
              count("orderId").as("orderCount"),
              sum("totalAmount").as("totalSpent"),
              max("orderDate").as("lastOrderDate"),
              avg("totalAmount").as("averageOrderValue")
            ),
          Seq("customerId"),
          "left"
        )
        .withColumn("customerSegment", 
          when(col("totalSpent") > 1000, "Premium")
            .when(col("totalSpent") > 500, "Gold")
            .when(col("totalSpent") > 100, "Silver")
            .otherwise("Bronze")
        )
        .withColumn("daysSinceLastOrder",
          when(col("lastOrderDate").isNotNull,
            datediff(current_date(), col("lastOrderDate"))
          ).otherwise(null)
        )
        .withColumn("isActive",
          when(col("daysSinceLastOrder") <= 90, true)
            .when(col("daysSinceLastOrder").isNull, false)
            .otherwise(false)
        )
    }

    // Advanced analytics queries
    def customerLifetimeValue(customers: DataFrame, orders: DataFrame, orderItems: DataFrame, products: DataFrame): DataFrame = {

      // Calculate detailed order metrics
      val orderDetails = orders
        .join(orderItems, "orderId")
        .join(products, "productId")
        .withColumn("itemTotal", col("quantity") * col("unitPrice") - col("discount"))
        .withColumn("itemProfit", (col("unitPrice") - col("costPrice")) * col("quantity") - col("discount"))

      // Customer CLV calculation
      orderDetails
        .groupBy("customerId")
        .agg(
          countDistinct("orderId").as("totalOrders"),
          sum("itemTotal").as("totalRevenue"),
          sum("itemProfit").as("totalProfit"),
          avg("itemTotal").as("averageOrderValue"),
          stddev("itemTotal").as("orderValueStdDev"),
          min("orderDate").as("firstOrderDate"),
          max("orderDate").as("lastOrderDate"),
          countDistinct("category").as("categoryDiversity"),
          collect_set("category").as("purchasedCategories")
        )
        .withColumn("customerTenure",
          datediff(col("lastOrderDate"), col("firstOrderDate"))
        )
        .withColumn("orderFrequency",
          when(col("customerTenure") > 0,
            col("totalOrders") / (col("customerTenure") / 365.0)
          ).otherwise(0.0)
        )
        .withColumn("predictedCLV",
          col("averageOrderValue") * col("orderFrequency") * 2.0 // 2-year prediction
        )
        .withColumn("profitMargin",
          when(col("totalRevenue") > 0,
            col("totalProfit") / col("totalRevenue")
          ).otherwise(0.0)
        )
    }

    // Cohort analysis
    def cohortAnalysis(customers: DataFrame, orders: DataFrame): DataFrame = {

      // Define customer cohorts by registration month
      val customerCohorts = customers
        .withColumn("cohortMonth", date_format(col("registrationDate"), "yyyy-MM"))
        .select("customerId", "cohortMonth")

      // Calculate monthly order activity
      val monthlyActivity = orders
        .join(customerCohorts, "customerId")
        .withColumn("orderMonth", date_format(col("orderDate"), "yyyy-MM"))
        .withColumn("periodNumber",
          months_between(col("orderMonth"), col("cohortMonth"))
        )
        .groupBy("cohortMonth", "periodNumber")
        .agg(
          countDistinct("customerId").as("activeCustomers"),
          sum("totalAmount").as("cohortRevenue"),
          avg("totalAmount").as("avgOrderValue")
        )

      // Calculate cohort sizes
      val cohortSizes = customerCohorts
        .groupBy("cohortMonth")
        .agg(countDistinct("customerId").as("cohortSize"))

      // Calculate retention rates
      monthlyActivity
        .join(cohortSizes, "cohortMonth")
        .withColumn("retentionRate",
          col("activeCustomers") / col("cohortSize")
        )
        .withColumn("revenuePerCustomer",
          col("cohortRevenue") / col("cohortSize")
        )
        .orderBy("cohortMonth", "periodNumber")
    }

    // Product recommendation analysis
    def marketBasketAnalysis(orderItems: DataFrame, products: DataFrame, minSupport: Double = 0.01): DataFrame = {

      // Calculate item frequencies
      val itemFrequency = orderItems
        .join(products, "productId")
        .groupBy("category")
        .agg(
          countDistinct("orderId").as("orderCount"),
          sum("quantity").as("totalQuantity")
        )

      val totalOrders = orderItems.select("orderId").distinct().count()

      // Find frequent itemsets (categories in this example)
      val frequentItems = itemFrequency
        .withColumn("support", col("orderCount") / totalOrders)
        .filter(col("support") >= minSupport)

      // Calculate association rules between categories
      val orderCategories = orderItems
        .join(products, "productId")
        .groupBy("orderId")
        .agg(collect_set("category").as("categories"))

      // Explode to get category pairs
      val categoryPairs = orderCategories
        .select(col("orderId"), explode(col("categories")).as("categoryA"))
        .join(
          orderCategories.select(col("orderId"), explode(col("categories")).as("categoryB")),
          "orderId"
        )
        .filter(col("categoryA") < col("categoryB")) // Avoid duplicates and self-pairs

      // Calculate confidence and lift
      categoryPairs
        .groupBy("categoryA", "categoryB")
        .agg(countDistinct("orderId").as("coOccurrence"))
        .join(
          itemFrequency.select(col("category").as("categoryA"), col("orderCount").as("countA")),
          "categoryA"
        )
        .join(
          itemFrequency.select(col("category").as("categoryB"), col("orderCount").as("countB")),
          "categoryB"
        )
        .withColumn("confidence", col("coOccurrence") / col("countA"))
        .withColumn("expectedCoOccurrence", 
          (col("countA") * col("countB")) / totalOrders
        )
        .withColumn("lift", 
          col("coOccurrence") / col("expectedCoOccurrence")
        )
        .withColumn("support", col("coOccurrence") / totalOrders)
        .filter(col("confidence") >= 0.1 && col("lift") > 1.0)
        .orderBy(desc("lift"))
    }

    // Time series analysis
    def salesTrendAnalysis(orders: DataFrame, orderItems: DataFrame, products: DataFrame): DataFrame = {

      val dailySales = orders
        .join(orderItems, "orderId")
        .join(products, "productId")
        .withColumn("orderDate", to_date(col("orderDate")))
        .withColumn("revenue", col("quantity") * col("unitPrice"))
        .withColumn("profit", (col("unitPrice") - col("costPrice")) * col("quantity"))
        .groupBy("orderDate", "category")
        .agg(
          sum("revenue").as("dailyRevenue"),
          sum("profit").as("dailyProfit"),
          sum("quantity").as("dailyQuantity"),
          countDistinct("orderId").as("dailyOrders"),
          countDistinct("customerId").as("dailyCustomers")
        )

      // Add moving averages and trends
      val windowSpec = Window.partitionBy("category").orderBy("orderDate").rowsBetween(-6, 0)

      dailySales
        .withColumn("revenue7DayMA", avg("dailyRevenue").over(windowSpec))
        .withColumn("revenueGrowth",
          lag("dailyRevenue", 1).over(windowSpec.rowsBetween(-1, -1)) match {
            case lagRevenue if lagRevenue != null =>
              (col("dailyRevenue") - lagRevenue) / lagRevenue * 100
            case _ => lit(0.0)
          }
        )
        .withColumn("profitMargin", col("dailyProfit") / col("dailyRevenue") * 100)
        .withColumn("avgOrderValue", col("dailyRevenue") / col("dailyOrders"))
        .orderBy("category", "orderDate")
    }

    // Advanced window functions
    def customerRankingAnalysis(customers: DataFrame, orders: DataFrame): DataFrame = {

      val customerMetrics = orders
        .groupBy("customerId")
        .agg(
          sum("totalAmount").as("totalSpent"),
          count("orderId").as("orderCount"),
          avg("totalAmount").as("avgOrderValue"),
          max("orderDate").as("lastOrderDate")
        )
        .join(customers.select("customerId", "registrationDate", "city", "state"), "customerId")

      val stateWindow = Window.partitionBy("state").orderBy(desc("totalSpent"))
      val globalWindow = Window.orderBy(desc("totalSpent"))

      customerMetrics
        .withColumn("stateRank", row_number().over(stateWindow))
        .withColumn("statePercentile", percent_rank().over(stateWindow))
        .withColumn("globalRank", row_number().over(globalWindow))
        .withColumn("globalPercentile", percent_rank().over(globalWindow))
        .withColumn("isTopCustomerInState", col("stateRank") <= 10)
        .withColumn("isTop1PercentGlobal", col("globalPercentile") >= 0.99)
        .withColumn("customerTier",
          when(col("globalPercentile") >= 0.95, "Platinum")
            .when(col("globalPercentile") >= 0.80, "Gold")
            .when(col("globalPercentile") >= 0.60, "Silver")
            .otherwise("Bronze")
        )
    }
  }

  // Performance optimization utilities
  object DataFrameOptimization {

    implicit class DataFrameOptimizations(df: DataFrame) {

      def optimizePartitioning(partitionColumns: String*): DataFrame = {
        df.repartition(partitionColumns.map(col): _*)
      }

      def optimizeForJoins(keyColumn: String, numPartitions: Int = 200): DataFrame = {
        df.repartition(numPartitions, col(keyColumn))
      }

      def cacheWithStorageLevel(level: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER): DataFrame = {
        df.persist(level)
      }

      def analyzeSkew(keyColumn: String): DataFrame = {
        df.groupBy(keyColumn)
          .count()
          .orderBy(desc("count"))
          .limit(100)
      }

      def broadcastIfSmall(maxSizeBytes: Long = 20 * 1024 * 1024): DataFrame = {
        // Check if DataFrame is small enough to broadcast
        val sizeEstimate = df.queryExecution.logical.stats.sizeInBytes
        if (sizeEstimate <= maxSizeBytes) {
          broadcast(df)
        } else {
          df
        }
      }

      def showExecutionPlan(): Unit = {
        df.explain(true)
      }

      def getPartitionInfo(): String = {
        s"""Partition Information:
           |Number of partitions: ${df.rdd.getNumPartitions}
           |Partition sizes: ${df.rdd.mapPartitions(iter => Iterator(iter.size)).collect().mkString(", ")}
           |Preferred locations: ${df.rdd.preferredLocations(df.rdd.partitions.head).mkString(", ")}
           """.stripMargin
      }
    }
  }
}

RDD Operations and Low-Level Programming

Advanced RDD Patterns

object RDDOperations {

  // Low-level RDD operations for performance-critical scenarios
  class AdvancedRDDProcessing(spark: SparkSession) {

    val sc = spark.sparkContext

    // Custom partitioner for optimal data distribution
    class CustomerPartitioner(numPartitions: Int) extends org.apache.spark.Partitioner {
      override def numPartitions: Int = numPartitions

      override def getPartition(key: Any): Int = key match {
        case customerId: String =>
          // Hash-based partitioning with consistent distribution
          Math.abs(customerId.hashCode) % numPartitions
        case _ => 0
      }

      override def equals(other: Any): Boolean = other match {
        case h: CustomerPartitioner => h.numPartitions == numPartitions
        case _ => false
      }

      override def hashCode: Int = numPartitions
    }

    // Advanced RDD transformations
    def processLargeDataset[T](data: RDD[T], partitions: Int = 200): RDD[T] = {
      data
        .coalesce(partitions) // Optimize partition count
        .mapPartitions(processPartition, preservesPartitioning = true)
        .cache() // Cache intermediate results
    }

    private def processPartition[T](iterator: Iterator[T]): Iterator[T] = {
      // Process entire partition in memory for efficiency
      val batch = iterator.toList
      val processed = batch.map(processRecord)
      processed.iterator
    }

    private def processRecord[T](record: T): T = {
      // Simulate complex processing
      Thread.sleep(1) // Simulate work
      record
    }

    // Complex RDD joins with custom partitioners
    def efficientJoin(
      customers: RDD[(String, Customer)],
      orders: RDD[(String, Order)]
    ): RDD[(String, (Customer, Order))] = {

      val partitioner = new CustomerPartitioner(200)

      val partitionedCustomers = customers.partitionBy(partitioner).cache()
      val partitionedOrders = orders.partitionBy(partitioner).cache()

      partitionedCustomers.join(partitionedOrders)
    }

    // Accumulator-based analytics
    def calculateMetricsWithAccumulators(orders: RDD[Order]): Map[String, Long] = {

      val totalOrdersAcc = sc.longAccumulator("Total Orders")
      val totalRevenueAcc = sc.doubleAccumulator("Total Revenue")
      val errorCountAcc = sc.longAccumulator("Error Count")

      val statusCounts = Map(
        "pending" -> sc.longAccumulator("Pending Orders"),
        "completed" -> sc.longAccumulator("Completed Orders"),
        "cancelled" -> sc.longAccumulator("Cancelled Orders")
      )

      orders.foreach { order =>
        try {
          totalOrdersAcc.add(1)
          totalRevenueAcc.add(order.totalAmount)

          statusCounts.get(order.status.toLowerCase) match {
            case Some(acc) => acc.add(1)
            case None => errorCountAcc.add(1)
          }
        } catch {
          case _: Exception => errorCountAcc.add(1)
        }
      }

      Map(
        "totalOrders" -> totalOrdersAcc.value,
        "totalRevenue" -> totalRevenueAcc.value.toLong,
        "pendingOrders" -> statusCounts("pending").value,
        "completedOrders" -> statusCounts("completed").value,
        "cancelledOrders" -> statusCounts("cancelled").value,
        "errors" -> errorCountAcc.value
      )
    }

    // Custom aggregation functions
    def customAggregation(data: RDD[(String, Double)]): RDD[(String, (Double, Double, Long))] = {
      data.aggregateByKey(
        (0.0, 0.0, 0L) // (sum, sumSquares, count)
      )(
        // Sequence operation (within partition)
        { case ((sum, sumSquares, count), value) =>
          (sum + value, sumSquares + value * value, count + 1)
        },
        // Combine operation (across partitions)
        { case ((sum1, sumSquares1, count1), (sum2, sumSquares2, count2)) =>
          (sum1 + sum2, sumSquares1 + sumSquares2, count1 + count2)
        }
      ).mapValues { case (sum, sumSquares, count) =>
        val mean = sum / count
        val variance = (sumSquares / count) - (mean * mean)
        (mean, math.sqrt(variance), count)
      }
    }

    // Memory-efficient large dataset processing
    def processLargeFiles(filePaths: Seq[String]): RDD[String] = {
      sc.parallelize(filePaths, filePaths.length)
        .flatMap { path =>
          // Process each file independently
          scala.io.Source.fromFile(path).getLines().toIterator
        }
        .mapPartitions(processLinesInBatches)
    }

    private def processLinesInBatches(lines: Iterator[String]): Iterator[String] = {
      lines.grouped(1000).flatMap { batch =>
        // Process batch efficiently
        batch.filter(_.nonEmpty).map(_.toUpperCase)
      }
    }

    // Graph processing with RDDs
    def pageRank(
      vertices: RDD[(String, String)], // (vertexId, vertexData)
      edges: RDD[(String, String)],    // (source, destination)
      numIterations: Int = 10,
      dampingFactor: Double = 0.85
    ): RDD[(String, Double)] = {

      // Initialize ranks
      var ranks = vertices.mapValues(_ => 1.0)

      // Build adjacency list
      val links = edges.groupByKey().cache()

      for (iteration <- 1 to numIterations) {
        // Calculate contributions
        val contributions = links.join(ranks).flatMap {
          case (vertex, (destinations, rank)) =>
            destinations.map(dest => (dest, rank / destinations.size))
        }

        // Update ranks
        ranks = contributions.reduceByKey(_ + _).mapValues { contribution =>
          (1.0 - dampingFactor) + dampingFactor * contribution
        }
      }

      ranks
    }

    // Time series processing
    def rollingWindowAnalysis(
      timeSeries: RDD[(Long, Double)], // (timestamp, value)
      windowSize: Int = 7
    ): RDD[(Long, (Double, Double))] = { // (timestamp, (average, stddev))

      val sortedData = timeSeries.sortByKey()

      sortedData.mapPartitions { partition =>
        val data = partition.toArray
        val results = scala.collection.mutable.ArrayBuffer[(Long, (Double, Double))]()

        for (i <- data.indices) {
          val windowStart = Math.max(0, i - windowSize + 1)
          val window = data.slice(windowStart, i + 1)

          val values = window.map(_._2)
          val mean = values.sum / values.length
          val variance = values.map(v => math.pow(v - mean, 2)).sum / values.length
          val stddev = math.sqrt(variance)

          results += ((data(i)._1, (mean, stddev)))
        }

        results.iterator
      }
    }
  }

  // Performance monitoring and optimization
  object RDDPerformanceUtils {

    def analyzeRDDLineage[T](rdd: RDD[T]): String = {
      s"""RDD Lineage Analysis:
         |Debug String: ${rdd.toDebugString}
         |Dependencies: ${rdd.dependencies.map(_.getClass.getSimpleName).mkString(", ")}
         |Partitions: ${rdd.getNumPartitions}
         |Storage Level: ${rdd.getStorageLevel}
         |Checkpointed: ${rdd.isCheckpointed}
         """.stripMargin
    }

    def optimizeRDDPartitioning[T](rdd: RDD[T], targetPartitions: Option[Int] = None): RDD[T] = {
      val currentPartitions = rdd.getNumPartitions
      val optimalPartitions = targetPartitions.getOrElse(
        Math.max(2, Runtime.getRuntime.availableProcessors() * 2)
      )

      if (currentPartitions != optimalPartitions) {
        if (currentPartitions > optimalPartitions) {
          rdd.coalesce(optimalPartitions)
        } else {
          rdd.repartition(optimalPartitions)
        }
      } else {
        rdd
      }
    }

    def measureRDDOperation[T, R](rdd: RDD[T])(operation: RDD[T] => R): (R, Long) = {
      val startTime = System.currentTimeMillis()
      val result = operation(rdd)
      val endTime = System.currentTimeMillis()
      (result, endTime - startTime)
    }

    def cacheStrategy[T](rdd: RDD[T], reusageCount: Int): RDD[T] = {
      if (reusageCount > 2) {
        rdd.cache()
      } else {
        rdd
      }
    }
  }
}

Spark SQL and Advanced Querying

Complex SQL Operations

object SparkSQLOperations {

  class AdvancedSQLAnalytics(spark: SparkSession) {
    import spark.implicits._

    // Register temporary views for SQL access
    def setupTables(
      customers: DataFrame,
      orders: DataFrame,
      orderItems: DataFrame,
      products: DataFrame
    ): Unit = {
      customers.createOrReplaceTempView("customers")
      orders.createOrReplaceTempView("orders")
      orderItems.createOrReplaceTempView("order_items")
      products.createOrReplaceTempView("products")
    }

    // Complex analytical queries
    def customerSegmentationSQL(): DataFrame = {
      spark.sql("""
        WITH customer_metrics AS (
          SELECT 
            c.customerId,
            c.firstName,
            c.lastName,
            c.email,
            c.registrationDate,
            COUNT(o.orderId) as order_count,
            SUM(o.totalAmount) as total_spent,
            AVG(o.totalAmount) as avg_order_value,
            MAX(o.orderDate) as last_order_date,
            DATEDIFF(CURRENT_DATE(), MAX(o.orderDate)) as days_since_last_order,
            STDDEV(o.totalAmount) as order_value_stddev,
            COUNT(DISTINCT DATE_FORMAT(o.orderDate, 'yyyy-MM')) as active_months
          FROM customers c
          LEFT JOIN orders o ON c.customerId = o.customerId
          GROUP BY c.customerId, c.firstName, c.lastName, c.email, c.registrationDate
        ),
        rfm_scores AS (
          SELECT *,
            NTILE(5) OVER (ORDER BY days_since_last_order ASC) as recency_score,
            NTILE(5) OVER (ORDER BY order_count DESC) as frequency_score,
            NTILE(5) OVER (ORDER BY total_spent DESC) as monetary_score
          FROM customer_metrics
        )
        SELECT *,
          CONCAT(recency_score, frequency_score, monetary_score) as rfm_segment,
          CASE 
            WHEN recency_score >= 4 AND frequency_score >= 4 AND monetary_score >= 4 THEN 'Champions'
            WHEN recency_score >= 3 AND frequency_score >= 3 AND monetary_score >= 3 THEN 'Loyal Customers'
            WHEN recency_score >= 3 AND frequency_score <= 2 AND monetary_score >= 3 THEN 'Potential Loyalists'
            WHEN recency_score >= 4 AND frequency_score <= 2 THEN 'New Customers'
            WHEN recency_score >= 3 AND frequency_score >= 3 AND monetary_score <= 2 THEN 'Promising'
            WHEN recency_score <= 2 AND frequency_score >= 3 AND monetary_score >= 3 THEN 'Need Attention'
            WHEN recency_score <= 2 AND frequency_score >= 2 AND monetary_score >= 2 THEN 'About to Sleep'
            WHEN recency_score <= 2 AND frequency_score <= 2 AND monetary_score >= 3 THEN 'At Risk'
            WHEN recency_score <= 1 AND frequency_score >= 2 AND monetary_score <= 2 THEN 'Cannot Lose Them'
            ELSE 'Lost'
          END as customer_segment
        FROM rfm_scores
        ORDER BY total_spent DESC
      """)
    }

    def productPerformanceSQL(): DataFrame = {
      spark.sql("""
        WITH product_metrics AS (
          SELECT 
            p.productId,
            p.name as product_name,
            p.category,
            p.brand,
            p.price,
            p.costPrice,
            COUNT(DISTINCT oi.orderId) as orders_count,
            SUM(oi.quantity) as total_quantity_sold,
            SUM(oi.quantity * oi.unitPrice) as total_revenue,
            SUM(oi.quantity * (oi.unitPrice - p.costPrice)) as total_profit,
            AVG(oi.unitPrice) as avg_selling_price,
            COUNT(DISTINCT o.customerId) as unique_customers,
            MIN(o.orderDate) as first_sale_date,
            MAX(o.orderDate) as last_sale_date
          FROM products p
          LEFT JOIN order_items oi ON p.productId = oi.productId
          LEFT JOIN orders o ON oi.orderId = o.orderId
          GROUP BY p.productId, p.name, p.category, p.brand, p.price, p.costPrice
        ),
        category_totals AS (
          SELECT 
            category,
            SUM(total_revenue) as category_revenue,
            SUM(total_quantity_sold) as category_quantity
          FROM product_metrics
          GROUP BY category
        )
        SELECT 
          pm.*,
          ROUND((total_revenue / category_revenue) * 100, 2) as category_revenue_share,
          ROUND((total_quantity_sold / category_quantity) * 100, 2) as category_quantity_share,
          ROUND((total_profit / total_revenue) * 100, 2) as profit_margin_percent,
          DENSE_RANK() OVER (PARTITION BY category ORDER BY total_revenue DESC) as category_rank,
          CASE 
            WHEN total_revenue > 0 AND total_quantity_sold > 0 THEN 'Active'
            WHEN total_revenue = 0 THEN 'No Sales'
            ELSE 'Unknown'
          END as product_status,
          DATEDIFF(CURRENT_DATE(), last_sale_date) as days_since_last_sale
        FROM product_metrics pm
        JOIN category_totals ct ON pm.category = ct.category
        ORDER BY total_revenue DESC
      """)
    }

    def salesTrendSQL(): DataFrame = {
      spark.sql("""
        WITH daily_sales AS (
          SELECT 
            DATE(o.orderDate) as sale_date,
            p.category,
            COUNT(DISTINCT o.orderId) as daily_orders,
            COUNT(DISTINCT o.customerId) as daily_customers,
            SUM(oi.quantity * oi.unitPrice) as daily_revenue,
            SUM(oi.quantity) as daily_quantity,
            AVG(o.totalAmount) as avg_order_value
          FROM orders o
          JOIN order_items oi ON o.orderId = oi.orderId
          JOIN products p ON oi.productId = p.productId
          GROUP BY DATE(o.orderDate), p.category
        ),
        sales_with_trends AS (
          SELECT *,
            LAG(daily_revenue, 1) OVER (PARTITION BY category ORDER BY sale_date) as prev_day_revenue,
            LAG(daily_revenue, 7) OVER (PARTITION BY category ORDER BY sale_date) as prev_week_revenue,
            AVG(daily_revenue) OVER (
              PARTITION BY category 
              ORDER BY sale_date 
              ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
            ) as revenue_7day_ma,
            AVG(daily_revenue) OVER (
              PARTITION BY category 
              ORDER BY sale_date 
              ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
            ) as revenue_30day_ma
          FROM daily_sales
        )
        SELECT *,
          CASE 
            WHEN prev_day_revenue > 0 THEN 
              ROUND(((daily_revenue - prev_day_revenue) / prev_day_revenue) * 100, 2)
            ELSE 0 
          END as day_over_day_growth,
          CASE 
            WHEN prev_week_revenue > 0 THEN 
              ROUND(((daily_revenue - prev_week_revenue) / prev_week_revenue) * 100, 2)
            ELSE 0 
          END as week_over_week_growth,
          CASE 
            WHEN revenue_30day_ma > 0 THEN 
              ROUND(((daily_revenue - revenue_30day_ma) / revenue_30day_ma) * 100, 2)
            ELSE 0 
          END as vs_30day_average
        FROM sales_with_trends
        ORDER BY category, sale_date
      """)
    }

    def cohortAnalysisSQL(): DataFrame = {
      spark.sql("""
        WITH customer_orders AS (
          SELECT 
            c.customerId,
            DATE_FORMAT(c.registrationDate, 'yyyy-MM') as cohort_month,
            o.orderId,
            o.orderDate,
            o.totalAmount,
            MONTHS_BETWEEN(o.orderDate, c.registrationDate) as period_number
          FROM customers c
          JOIN orders o ON c.customerId = o.customerId
        ),
        cohort_data AS (
          SELECT 
            cohort_month,
            period_number,
            COUNT(DISTINCT customerId) as customers,
            COUNT(DISTINCT orderId) as orders,
            SUM(totalAmount) as revenue,
            AVG(totalAmount) as avg_order_value
          FROM customer_orders
          GROUP BY cohort_month, period_number
        ),
        cohort_sizes AS (
          SELECT 
            cohort_month,
            COUNT(DISTINCT customerId) as cohort_size
          FROM customers
          GROUP BY DATE_FORMAT(registrationDate, 'yyyy-MM')
        )
        SELECT 
          cd.*,
          cs.cohort_size,
          ROUND((cd.customers / cs.cohort_size) * 100, 2) as retention_rate,
          ROUND(cd.revenue / cs.cohort_size, 2) as revenue_per_customer,
          ROUND(cd.orders / cs.cohort_size, 2) as orders_per_customer
        FROM cohort_data cd
        JOIN cohort_sizes cs ON cd.cohort_month = cs.cohort_month
        ORDER BY cohort_month, period_number
      """)
    }

    // Advanced window functions and analytics
    def advancedWindowFunctionsSQL(): DataFrame = {
      spark.sql("""
        WITH ranked_customers AS (
          SELECT 
            customerId,
            firstName,
            lastName,
            state,
            SUM(totalAmount) as total_spent,
            COUNT(orderId) as order_count,
            ROW_NUMBER() OVER (PARTITION BY state ORDER BY SUM(totalAmount) DESC) as state_rank,
            DENSE_RANK() OVER (ORDER BY SUM(totalAmount) DESC) as global_rank,
            PERCENT_RANK() OVER (ORDER BY SUM(totalAmount)) as percentile_rank,
            NTILE(10) OVER (ORDER BY SUM(totalAmount) DESC) as decile,
            LAG(SUM(totalAmount), 1) OVER (PARTITION BY state ORDER BY SUM(totalAmount) DESC) as next_customer_spent,
            LEAD(SUM(totalAmount), 1) OVER (PARTITION BY state ORDER BY SUM(totalAmount) DESC) as prev_customer_spent
          FROM customers c
          JOIN orders o ON c.customerId = o.customerId
          GROUP BY customerId, firstName, lastName, state
        )
        SELECT *,
          CASE 
            WHEN state_rank <= 5 THEN 'Top 5 in State'
            WHEN state_rank <= 10 THEN 'Top 10 in State'
            WHEN global_rank <= 100 THEN 'Top 100 Global'
            WHEN percentile_rank >= 0.95 THEN 'Top 5% Global'
            WHEN percentile_rank >= 0.80 THEN 'Top 20% Global'
            ELSE 'Standard'
          END as customer_tier,
          COALESCE(total_spent - next_customer_spent, 0) as gap_to_next_rank,
          COALESCE(prev_customer_spent - total_spent, 0) as gap_to_prev_rank
        FROM ranked_customers
        ORDER BY global_rank
      """)
    }

    // User-defined functions for complex logic
    def registerUDFs(): Unit = {

      // Customer lifetime value calculation
      spark.udf.register("calculate_clv", (
        totalSpent: Double,
        orderCount: Int,
        customerTenure: Int,
        avgOrderValue: Double
      ) => {
        if (customerTenure > 0) {
          val orderFrequency = orderCount.toDouble / (customerTenure / 365.0)
          val predictedLifetime = 2.0 // years
          avgOrderValue * orderFrequency * predictedLifetime
        } else {
          avgOrderValue * 2.0 // Default prediction for new customers
        }
      })

      // Churn probability calculation
      spark.udf.register("churn_probability", (
        daysSinceLastOrder: Int,
        orderFrequency: Double,
        totalSpent: Double
      ) => {
        val recencyScore = Math.min(daysSinceLastOrder / 90.0, 1.0)
        val frequencyScore = Math.max(1.0 - (orderFrequency / 12.0), 0.0)
        val monetaryScore = Math.max(1.0 - (totalSpent / 1000.0), 0.0)

        (recencyScore * 0.5 + frequencyScore * 0.3 + monetaryScore * 0.2) * 100
      })

      // Product recommendation score
      spark.udf.register("recommendation_score", (
        categoryPurchaseFreq: Int,
        productPrice: Double,
        customerAvgOrderValue: Double,
        productRating: Double
      ) => {
        val affinityScore = Math.min(categoryPurchaseFreq / 5.0, 1.0)
        val priceScore = if (customerAvgOrderValue > 0) {
          Math.max(1.0 - Math.abs(productPrice - customerAvgOrderValue) / customerAvgOrderValue, 0.0)
        } else 0.5
        val qualityScore = productRating / 5.0

        (affinityScore * 0.4 + priceScore * 0.3 + qualityScore * 0.3) * 100
      })
    }

    // Performance optimization hints
    def optimizedQuery(): DataFrame = {
      spark.sql("""
        /*+ BROADCAST(p) */
        SELECT /*+ REPARTITION(100) */
          c.customerId,
          c.firstName,
          c.lastName,
          COUNT(DISTINCT o.orderId) as order_count,
          SUM(oi.quantity * oi.unitPrice) as total_revenue
        FROM customers c
        JOIN orders o ON c.customerId = o.customerId
        JOIN order_items oi ON o.orderId = oi.orderId
        JOIN products p ON oi.productId = p.productId
        GROUP BY c.customerId, c.firstName, c.lastName
        HAVING COUNT(DISTINCT o.orderId) > 5
        ORDER BY total_revenue DESC
        LIMIT 1000
      """)
    }
  }
}

Spark Streaming and Real-Time Processing

Structured Streaming Operations

import org.apache.spark.sql.streaming.Trigger
import java.util.concurrent.TimeUnit

object SparkStreamingOperations {

  class RealTimeAnalytics(spark: SparkSession) {
    import spark.implicits._

    // Kafka streaming setup
    def createKafkaStream(
      bootstrapServers: String,
      topics: String,
      startingOffsets: String = "latest"
    ): DataFrame = {
      spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("startingOffsets", startingOffsets)
        .option("failOnDataLoss", "false")
        .load()
    }

    // Real-time order processing
    def processOrderStream(kafkaDF: DataFrame): StreamingQuery = {

      val orderStream = kafkaDF
        .select(
          col("timestamp").as("kafka_timestamp"),
          from_json(col("value").cast("string"), orderSchema).as("order")
        )
        .select("kafka_timestamp", "order.*")
        .withColumn("processing_time", current_timestamp())
        .withColumn("hour", hour(col("order_date")))
        .withColumn("day_of_week", dayofweek(col("order_date")))

      // Real-time aggregations
      val orderMetrics = orderStream
        .withWatermark("order_date", "10 minutes")
        .groupBy(
          window(col("order_date"), "5 minutes"),
          col("hour"),
          col("day_of_week")
        )
        .agg(
          count("order_id").as("order_count"),
          sum("total_amount").as("total_revenue"),
          avg("total_amount").as("avg_order_value"),
          countDistinct("customer_id").as("unique_customers"),
          stddev("total_amount").as("order_value_stddev")
        )
        .withColumn("revenue_per_customer", col("total_revenue") / col("unique_customers"))

      orderMetrics
        .writeStream
        .outputMode("update")
        .format("console")
        .option("truncate", false)
        .trigger(Trigger.ProcessingTime("30 seconds"))
        .start()
    }

    // Real-time fraud detection
    def fraudDetectionStream(orderStream: DataFrame): StreamingQuery = {

      // Define fraud detection rules
      val suspiciousOrders = orderStream
        .withWatermark("order_date", "5 minutes")
        .groupBy(
          col("customer_id"),
          window(col("order_date"), "10 minutes")
        )
        .agg(
          count("order_id").as("orders_in_window"),
          sum("total_amount").as("total_spent_in_window"),
          max("total_amount").as("max_order_value"),
          countDistinct("payment_method").as("payment_methods_used")
        )
        .filter(
          col("orders_in_window") > 5 || 
          col("total_spent_in_window") > 5000 ||
          col("max_order_value") > 2000 ||
          col("payment_methods_used") > 2
        )
        .withColumn("fraud_score", 
          when(col("orders_in_window") > 10, 100)
            .when(col("total_spent_in_window") > 10000, 95)
            .when(col("max_order_value") > 5000, 90)
            .otherwise(
              (col("orders_in_window") * 10) +
              (col("total_spent_in_window") / 100) +
              (col("payment_methods_used") * 15)
            )
        )
        .withColumn("risk_level",
          when(col("fraud_score") >= 90, "HIGH")
            .when(col("fraud_score") >= 70, "MEDIUM")
            .when(col("fraud_score") >= 50, "LOW")
            .otherwise("NORMAL")
        )

      suspiciousOrders
        .writeStream
        .outputMode("update")
        .format("delta")
        .option("path", "/data/fraud_alerts")
        .option("checkpointLocation", "/checkpoints/fraud_detection")
        .trigger(Trigger.ProcessingTime("1 minute"))
        .start()
    }

    // Customer behavior analytics
    def customerBehaviorStream(orderStream: DataFrame): StreamingQuery = {

      val customerMetrics = orderStream
        .withWatermark("order_date", "1 hour")
        .groupBy(
          col("customer_id"),
          window(col("order_date"), "1 hour", "15 minutes")
        )
        .agg(
          count("order_id").as("hourly_orders"),
          sum("total_amount").as("hourly_spent"),
          avg("total_amount").as("avg_order_value"),
          collect_list("product_category").as("categories_purchased"),
          min("order_date").as("first_order_in_window"),
          max("order_date").as("last_order_in_window")
        )
        .withColumn("session_duration_minutes",
          (unix_timestamp(col("last_order_in_window")) - 
           unix_timestamp(col("first_order_in_window"))) / 60
        )
        .withColumn("category_diversity", size(array_distinct(col("categories_purchased"))))
        .withColumn("shopping_intensity",
          when(col("session_duration_minutes") > 0,
            col("hourly_orders") / col("session_duration_minutes")
          ).otherwise(0)
        )
        .withColumn("customer_segment",
          when(col("hourly_spent") > 500 && col("shopping_intensity") > 0.1, "HIGH_VALUE_ACTIVE")
            .when(col("hourly_spent") > 200, "MEDIUM_VALUE")
            .when(col("shopping_intensity") > 0.2, "HIGH_ACTIVITY")
            .otherwise("REGULAR")
        )

      customerMetrics
        .writeStream
        .outputMode("update")
        .format("delta")
        .option("path", "/data/customer_behavior")
        .option("checkpointLocation", "/checkpoints/customer_behavior")
        .trigger(Trigger.ProcessingTime("5 minutes"))
        .start()
    }

    // Product recommendation engine
    def recommendationStream(orderStream: DataFrame): StreamingQuery = {

      // Real-time collaborative filtering
      val productPairs = orderStream
        .withWatermark("order_date", "2 hours")
        .groupBy("order_id")
        .agg(collect_list("product_id").as("products"))
        .select(
          col("order_id"),
          explode(col("products")).as("product_a")
        )
        .join(
          orderStream.groupBy("order_id")
            .agg(collect_list("product_id").as("products"))
            .select(
              col("order_id"),
              explode(col("products")).as("product_b")
            ),
          "order_id"
        )
        .filter(col("product_a") < col("product_b")) // Avoid duplicates
        .groupBy("product_a", "product_b")
        .agg(
          count("order_id").as("co_occurrence_count"),
          countDistinct("order_id").as("unique_orders")
        )
        .withColumn("confidence", col("co_occurrence_count") / col("unique_orders"))
        .filter(col("confidence") > 0.1 && col("co_occurrence_count") > 5)

      productPairs
        .writeStream
        .outputMode("update")
        .format("delta")
        .option("path", "/data/product_recommendations")
        .option("checkpointLocation", "/checkpoints/recommendations")
        .trigger(Trigger.ProcessingTime("10 minutes"))
        .start()
    }

    // Advanced streaming aggregations
    def advancedStreamingAggregations(orderStream: DataFrame): StreamingQuery = {

      val multiLevelAggregations = orderStream
        .withWatermark("order_date", "30 minutes")
        .groupBy(
          window(col("order_date"), "1 hour"),
          col("product_category"),
          col("customer_segment")
        )
        .agg(
          count("order_id").as("order_count"),
          sum("total_amount").as("revenue"),
          countDistinct("customer_id").as("unique_customers"),
          avg("total_amount").as("avg_order_value"),
          stddev("total_amount").as("order_value_stddev"),
          percentile_approx(col("total_amount"), 0.5).as("median_order_value"),
          percentile_approx(col("total_amount"), 0.95).as("p95_order_value")
        )
        .withColumn("revenue_share",
          col("revenue") / sum("revenue").over(Window.partitionBy("window"))
        )
        .withColumn("customer_concentration",
          col("unique_customers") / sum("unique_customers").over(Window.partitionBy("window"))
        )
        .withColumn("market_position",
          dense_rank().over(Window.partitionBy("window").orderBy(desc("revenue")))
        )

      multiLevelAggregations
        .writeStream
        .outputMode("update")
        .format("delta")
        .option("path", "/data/streaming_analytics")
        .option("checkpointLocation", "/checkpoints/streaming_analytics")
        .trigger(Trigger.ProcessingTime("2 minutes"))
        .start()
    }

    // Error handling and monitoring
    def monitoringStream(kafkaDF: DataFrame): StreamingQuery = {

      val errorMetrics = kafkaDF
        .select(
          col("timestamp"),
          col("partition"),
          col("offset"),
          when(col("value").isNull, 1).otherwise(0).as("null_messages"),
          when(length(col("value")) = 0, 1).otherwise(0).as("empty_messages"),
          when(col("value").rlike("\\{.*\\}"), 0).otherwise(1).as("invalid_json")
        )
        .withWatermark("timestamp", "5 minutes")
        .groupBy(
          window(col("timestamp"), "1 minute"),
          col("partition")
        )
        .agg(
          count("*").as("total_messages"),
          sum("null_messages").as("null_count"),
          sum("empty_messages").as("empty_count"),
          sum("invalid_json").as("invalid_json_count"),
          min("offset").as("min_offset"),
          max("offset").as("max_offset")
        )
        .withColumn("error_rate",
          (col("null_count") + col("empty_count") + col("invalid_json_count")) / col("total_messages")
        )
        .withColumn("throughput_per_second",
          (col("max_offset") - col("min_offset")) / 60.0
        )

      errorMetrics
        .writeStream
        .outputMode("update")
        .format("console")
        .option("truncate", false)
        .trigger(Trigger.ProcessingTime("30 seconds"))
        .start()
    }

    // Schema definition for order data
    private val orderSchema = StructType(Array(
      StructField("order_id", StringType, true),
      StructField("customer_id", StringType, true),
      StructField("order_date", TimestampType, true),
      StructField("total_amount", DoubleType, true),
      StructField("product_id", StringType, true),
      StructField("product_category", StringType, true),
      StructField("payment_method", StringType, true),
      StructField("customer_segment", StringType, true)
    ))
  }

  // Streaming utilities
  object StreamingUtils {

    def gracefulShutdown(queries: Seq[StreamingQuery]): Unit = {
      sys.addShutdownHook {
        println("Shutting down streaming queries...")
        queries.foreach { query =>
          try {
            query.stop()
            println(s"Stopped query: ${query.name}")
          } catch {
            case e: Exception => println(s"Error stopping query ${query.name}: ${e.getMessage}")
          }
        }
      }
    }

    def monitorQueryProgress(query: StreamingQuery): Unit = {
      new Thread(() => {
        while (query.isActive) {
          Thread.sleep(10000) // Check every 10 seconds
          val progress = query.lastProgress
          if (progress != null) {
            println(s"""Query Progress:
                       |Batch ID: ${progress.batchId}
                       |Input Rows: ${progress.inputRowsPerSecond}
                       |Processing Rate: ${progress.inputRowsPerSecond}
                       |Batch Duration: ${progress.batchDuration}
                       |""".stripMargin)
          }
        }
      }).start()
    }

    def createCheckpointCleanup(checkpointPath: String, retentionHours: Int = 24): Unit = {
      // Cleanup old checkpoint files to prevent disk space issues
      val cleanupInterval = java.time.Duration.ofHours(retentionHours)
      val scheduler = java.util.concurrent.Executors.newScheduledThreadPool(1)

      scheduler.scheduleAtFixedRate(() => {
        try {
          val fs = org.apache.hadoop.fs.FileSystem.get(new org.apache.hadoop.conf.Configuration())
          val path = new org.apache.hadoop.fs.Path(checkpointPath)
          if (fs.exists(path)) {
            val cutoffTime = System.currentTimeMillis() - cleanupInterval.toMillis
            // Implementation would clean up old checkpoint files
            println(s"Checkpoint cleanup completed for path: $checkpointPath")
          }
        } catch {
          case e: Exception => println(s"Checkpoint cleanup failed: ${e.getMessage}")
        }
      }, 1, 24, TimeUnit.HOURS)
    }
  }
}

Conclusion

Apache Spark with Scala provides a comprehensive platform for big data processing and analytics. Key concepts include:

Core Architecture:

  • SparkSession as the entry point for all operations
  • RDDs for low-level distributed computing
  • DataFrames and Datasets for structured data processing
  • Catalyst optimizer for query optimization

Data Processing Patterns:

  • Batch processing for historical data analysis
  • Streaming processing for real-time analytics
  • Complex transformations and aggregations
  • Machine learning pipelines with MLlib

Performance Optimization:

  • Partitioning strategies for data distribution
  • Caching and persistence for repeated operations
  • Broadcast variables for efficient joins
  • Accumulator variables for distributed counters

Advanced Analytics:

  • Customer segmentation and lifetime value
  • Cohort analysis and retention metrics
  • Real-time fraud detection
  • Product recommendation systems

Best Practices:

  • Proper resource configuration for cluster environments
  • Error handling and monitoring for production systems
  • Checkpoint management for streaming applications
  • Data quality validation and cleansing

Integration Patterns:

  • Kafka for real-time data streaming
  • Delta Lake for reliable data storage
  • Hive for metadata management
  • External databases for operational integration

Monitoring and Operations:

  • Query execution plan analysis
  • Performance metrics collection
  • Resource utilization monitoring
  • Error tracking and alerting

Spark's unified analytics engine enables organizations to build sophisticated data processing pipelines that scale from development to production, handling petabytes of data across distributed clusters while maintaining high performance and reliability.