Machine Learning in Scala: Spark MLlib and Deep Learning

Scala's strong type system, functional programming paradigms, and excellent performance make it an ideal language for machine learning applications. In this lesson, we'll explore machine learning in Scala using Apache Spark MLlib for distributed machine learning and various deep learning frameworks.

Overview of ML in Scala

Why Scala for Machine Learning?

  • Type Safety: Catch errors at compile time
  • Functional Programming: Immutable data structures and pure functions
  • JVM Performance: High-performance execution
  • Spark Integration: Native integration with Apache Spark
  • Concurrent Processing: Built-in support for parallel computation

Key Libraries and Frameworks

  • Apache Spark MLlib: Distributed machine learning
  • Smile: Statistical Machine Intelligence and Learning Engine
  • Breeze: Numerical processing library
  • BigDL: Distributed deep learning for Apache Spark
  • DJL (Deep Java Library): Deep learning framework

Setting Up ML Environment

Dependencies

// build.sbt
val SparkVersion = "3.5.0"
val BreezeVersion = "2.1.0"

libraryDependencies ++= Seq(
  // Spark MLlib
  "org.apache.spark" %% "spark-core" % SparkVersion,
  "org.apache.spark" %% "spark-sql" % SparkVersion,
  "org.apache.spark" %% "spark-mllib" % SparkVersion,

  // Numerical computing
  "org.scalanlp" %% "breeze" % BreezeVersion,
  "org.scalanlp" %% "breeze-natives" % BreezeVersion,

  // ML Libraries
  "com.github.haifengl" %% "smile-scala" % "3.0.2",
  "com.intel.analytics.bigdl" % "bigdl-SPARK_3.1" % "2.1.0",

  // Data processing
  "org.apache.spark" %% "spark-sql" % SparkVersion,
  "com.typesafe" % "config" % "1.4.2"
)

Basic Setup

import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature._
import org.apache.spark.ml.classification._
import org.apache.spark.ml.regression._
import org.apache.spark.ml.clustering._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

// Initialize Spark Session
val spark = SparkSession.builder()
  .appName("ML with Scala")
  .master("local[*]")
  .config("spark.sql.adaptive.enabled", "true")
  .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
  .getOrCreate()

import spark.implicits._

Data Preparation and Feature Engineering

Loading and Exploring Data

// Load data from various sources
def loadCSVData(path: String): DataFrame = {
  spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(path)
}

def loadParquetData(path: String): DataFrame = {
  spark.read.parquet(path)
}

// Example: Load and explore dataset
val rawData = loadCSVData("data/house_prices.csv")

// Basic exploration
rawData.printSchema()
rawData.show(10)

// Statistical summary
rawData.describe().show()

// Check for missing values
val missingCounts = rawData.columns.map { col =>
  val missingCount = rawData.filter(rawData(col).isNull || rawData(col) === "").count()
  (col, missingCount)
}

missingCounts.foreach { case (col, count) =>
  println(s"Column $col has $count missing values")
}

// Data profiling function
def profileDataFrame(df: DataFrame): Unit = {
  println(s"Dataset shape: ${df.count()} rows, ${df.columns.length} columns")

  // Numeric columns statistics
  val numericCols = df.dtypes.filter(_._2 == "double" || _._2 == "integer").map(_._1)
  if (numericCols.nonEmpty) {
    df.select(numericCols.map(col): _*).describe().show()
  }

  // Categorical columns cardinality
  val categoricalCols = df.dtypes.filter(_._2 == "string").map(_._1)
  categoricalCols.foreach { colName =>
    val distinctCount = df.select(colName).distinct().count()
    println(s"Column $colName has $distinctCount distinct values")
  }
}

profileDataFrame(rawData)

Feature Engineering Pipeline

// Comprehensive feature engineering pipeline
class FeatureEngineeringPipeline {

  def buildFeaturePipeline(
    categoricalCols: Array[String],
    numericCols: Array[String],
    targetCol: String
  ): Pipeline = {

    val stages = scala.collection.mutable.ArrayBuffer[org.apache.spark.ml.PipelineStage]()

    // Handle missing values
    val numericImputer = new Imputer()
      .setInputCols(numericCols)
      .setOutputCols(numericCols.map(_ + "_imputed"))
      .setStrategy("mean")
    stages += numericImputer

    // Scale numeric features
    val assemblerForScaling = new VectorAssembler()
      .setInputCols(numericCols.map(_ + "_imputed"))
      .setOutputCol("numeric_features_raw")
    stages += assemblerForScaling

    val scaler = new StandardScaler()
      .setInputCol("numeric_features_raw")
      .setOutputCol("scaled_numeric_features")
      .setWithStd(true)
      .setWithMean(true)
    stages += scaler

    // Handle categorical features
    val stringIndexers = categoricalCols.map { colName =>
      new StringIndexer()
        .setInputCol(colName)
        .setOutputCol(s"${colName}_indexed")
        .setHandleInvalid("keep")
    }
    stages ++= stringIndexers

    val oneHotEncoders = categoricalCols.map { colName =>
      new OneHotEncoder()
        .setInputCol(s"${colName}_indexed")
        .setOutputCol(s"${colName}_encoded")
    }
    stages ++= oneHotEncoders

    // Combine all features
    val featureAssembler = new VectorAssembler()
      .setInputCols(Array("scaled_numeric_features") ++ categoricalCols.map(_ + "_encoded"))
      .setOutputCol("features")
    stages += featureAssembler

    new Pipeline().setStages(stages.toArray)
  }

  // Feature selection using correlation
  def selectFeaturesCorrelation(df: DataFrame, threshold: Double = 0.8): Array[String] = {
    val assembler = new VectorAssembler()
      .setInputCols(df.columns.filter(_ != "label"))
      .setOutputCol("temp_features")

    val assembled = assembler.transform(df)
    val correlationMatrix = org.apache.spark.ml.stat.Correlation.corr(assembled, "temp_features")

    // Extract highly correlated features (simplified)
    // In practice, you'd implement proper correlation analysis
    df.columns.filter(_ != "label")
  }

  // Feature selection using chi-square test
  def selectFeaturesChiSquare(df: DataFrame, k: Int = 10): ChiSqSelector = {
    new ChiSqSelector()
      .setNumTopFeatures(k)
      .setFeaturesCol("features")
      .setLabelCol("label")
      .setOutputCol("selected_features")
  }
}

// Usage
val featurePipeline = new FeatureEngineeringPipeline()
val pipeline = featurePipeline.buildFeaturePipeline(
  categoricalCols = Array("neighborhood", "property_type"),
  numericCols = Array("area", "bedrooms", "bathrooms", "age"),
  targetCol = "price"
)

Classification Algorithms

Logistic Regression

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

// Prepare binary classification dataset
val binaryData = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("data/titanic.csv")

// Feature preparation
val featureColumns = Array("pclass", "age", "sibsp", "parch", "fare")
val assembler = new VectorAssembler()
  .setInputCols(featureColumns)
  .setOutputCol("features")

val processedData = assembler.transform(binaryData)
  .select("features", "survived")
  .withColumnRenamed("survived", "label")

// Split data
val Array(trainingData, testData) = processedData.randomSplit(Array(0.8, 0.2), seed = 42)

// Create and train logistic regression model
val lr = new LogisticRegression()
  .setMaxIter(100)
  .setRegParam(0.01)
  .setElasticNetParam(0.8)

val lrModel = lr.fit(trainingData)

// Make predictions
val predictions = lrModel.transform(testData)

// Evaluate model
val evaluator = new BinaryClassificationEvaluator()
  .setLabelCol("label")
  .setRawPredictionCol("rawPrediction")
  .setMetricName("areaUnderROC")

val auc = evaluator.evaluate(predictions)
println(s"Area Under ROC: $auc")

// Print model coefficients
println(s"Coefficients: ${lrModel.coefficients}")
println(s"Intercept: ${lrModel.intercept}")

// Feature importance (manual calculation for logistic regression)
val coefficients = lrModel.coefficients.toArray
val featureImportance = featureColumns.zip(coefficients).sortBy(-_._2)
println("Feature Importance:")
featureImportance.foreach { case (feature, coeff) =>
  println(f"$feature: $coeff%.4f")
}

Random Forest Classification

import org.apache.spark.ml.classification.{RandomForestClassifier, RandomForestClassificationModel}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

// Multi-class classification with Random Forest
val rf = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setNumTrees(100)
  .setMaxDepth(10)
  .setMinInstancesPerNode(1)
  .setSubsamplingRate(0.8)
  .setFeatureSubsetStrategy("sqrt")

val rfModel = rf.fit(trainingData)

val rfPredictions = rfModel.transform(testData)

// Evaluate with multiple metrics
val multiclassEvaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")

val accuracy = multiclassEvaluator.setMetricName("accuracy").evaluate(rfPredictions)
val f1Score = multiclassEvaluator.setMetricName("f1").evaluate(rfPredictions)
val weightedPrecision = multiclassEvaluator.setMetricName("weightedPrecision").evaluate(rfPredictions)
val weightedRecall = multiclassEvaluator.setMetricName("weightedRecall").evaluate(rfPredictions)

println(f"Random Forest Results:")
println(f"Accuracy: $accuracy%.4f")
println(f"F1 Score: $f1Score%.4f")
println(f"Weighted Precision: $weightedPrecision%.4f")
println(f"Weighted Recall: $weightedRecall%.4f")

// Feature importance
val importances = rfModel.featureImportances.toArray
val featureImportance = featureColumns.zip(importances).sortBy(-_._2)
println("\nFeature Importance (Random Forest):")
featureImportance.foreach { case (feature, importance) =>
  println(f"$feature: $importance%.4f")
}

Gradient Boosted Trees

import org.apache.spark.ml.classification.GBTClassifier

val gbt = new GBTClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setMaxIter(100)
  .setMaxDepth(6)
  .setStepSize(0.1)
  .setSubsamplingRate(0.8)

val gbtModel = gbt.fit(trainingData)
val gbtPredictions = gbtModel.transform(testData)

val gbtAccuracy = multiclassEvaluator.setMetricName("accuracy").evaluate(gbtPredictions)
println(f"GBT Accuracy: $gbtAccuracy%.4f")

// Feature importance for GBT
val gbtImportances = gbtModel.featureImportances.toArray
val gbtFeatureImportance = featureColumns.zip(gbtImportances).sortBy(-_._2)
println("\nFeature Importance (GBT):")
gbtFeatureImportance.foreach { case (feature, importance) =>
  println(f"$feature: $importance%.4f")
}

Regression Algorithms

Linear Regression

import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.evaluation.RegressionEvaluator

// Prepare regression dataset
val regressionData = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("data/boston_housing.csv")

val regressionFeatures = Array("crim", "zn", "indus", "chas", "nox", "rm", "age", "dis", "rad", "tax", "ptratio", "b", "lstat")
val regressionAssembler = new VectorAssembler()
  .setInputCols(regressionFeatures)
  .setOutputCol("features")

val regressionProcessed = regressionAssembler.transform(regressionData)
  .select("features", "medv")
  .withColumnRenamed("medv", "label")

val Array(trainRegression, testRegression) = regressionProcessed.randomSplit(Array(0.8, 0.2), seed = 42)

// Linear regression with regularization
val linearRegression = new LinearRegression()
  .setMaxIter(100)
  .setRegParam(0.01)
  .setElasticNetParam(0.8)

val lrModelRegression = linearRegression.fit(trainRegression)
val lrPredictionsRegression = lrModelRegression.transform(testRegression)

// Evaluate regression model
val regressionEvaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")

val rmse = regressionEvaluator.setMetricName("rmse").evaluate(lrPredictionsRegression)
val mae = regressionEvaluator.setMetricName("mae").evaluate(lrPredictionsRegression)
val r2 = regressionEvaluator.setMetricName("r2").evaluate(lrPredictionsRegression)

println(f"Linear Regression Results:")
println(f"RMSE: $rmse%.4f")
println(f"MAE: $mae%.4f")
println(f"R²: $r2%.4f")

// Model summary
val trainingSummary = lrModelRegression.summary
println(s"Training RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"Training R²: ${trainingSummary.r2}")

Random Forest Regression

import org.apache.spark.ml.regression.RandomForestRegressor

val rfRegressor = new RandomForestRegressor()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setNumTrees(100)
  .setMaxDepth(10)
  .setSubsamplingRate(0.8)

val rfRegressionModel = rfRegressor.fit(trainRegression)
val rfRegressionPredictions = rfRegressionModel.transform(testRegression)

val rfRmse = regressionEvaluator.setMetricName("rmse").evaluate(rfRegressionPredictions)
val rfR2 = regressionEvaluator.setMetricName("r2").evaluate(rfRegressionPredictions)

println(f"Random Forest Regression Results:")
println(f"RMSE: $rfRmse%.4f")
println(f"R²: $rfR2%.4f")

Clustering Algorithms

K-Means Clustering

import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Prepare clustering dataset
val clusteringData = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("data/customer_data.csv")

val clusteringFeatures = Array("annual_spending", "age", "income")
val clusteringAssembler = new VectorAssembler()
  .setInputCols(clusteringFeatures)
  .setOutputCol("features")

val clusteringProcessed = clusteringAssembler.transform(clusteringData)

// Scale features for clustering
val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(true)

val scalerModel = scaler.fit(clusteringProcessed)
val scaledData = scalerModel.transform(clusteringProcessed)

// Find optimal number of clusters using elbow method
def findOptimalK(data: DataFrame, maxK: Int = 10): Unit = {
  val costs = (2 to maxK).map { k =>
    val kmeans = new KMeans()
      .setK(k)
      .setFeaturesCol("scaledFeatures")
      .setSeed(42)

    val model = kmeans.fit(data)
    val cost = model.computeCost(data)
    (k, cost)
  }

  println("K-Means Cost for different K values:")
  costs.foreach { case (k, cost) =>
    println(f"K = $k, Cost = $cost%.2f")
  }
}

findOptimalK(scaledData)

// Train K-Means with optimal K
val kmeans = new KMeans()
  .setK(4)
  .setFeaturesCol("scaledFeatures")
  .setPredictionCol("cluster")
  .setSeed(42)

val kmeansModel = kmeans.fit(scaledData)
val clusterPredictions = kmeansModel.transform(scaledData)

// Evaluate clustering
val clusteringEvaluator = new ClusteringEvaluator()
  .setFeaturesCol("scaledFeatures")
  .setPredictionCol("cluster")

val silhouette = clusteringEvaluator.evaluate(clusterPredictions)
println(f"Silhouette Score: $silhouette%.4f")

// Analyze clusters
clusterPredictions.groupBy("cluster").agg(
  count("*").as("count"),
  avg("annual_spending").as("avg_spending"),
  avg("age").as("avg_age"),
  avg("income").as("avg_income")
).show()

// Print cluster centers
println("Cluster Centers:")
kmeansModel.clusterCenters.zipWithIndex.foreach { case (center, index) =>
  println(s"Cluster $index: ${center.toArray.mkString(", ")}")
}

Gaussian Mixture Model

import org.apache.spark.ml.clustering.GaussianMixture

val gmm = new GaussianMixture()
  .setK(4)
  .setFeaturesCol("scaledFeatures")
  .setPredictionCol("cluster")
  .setProbabilityCol("probability")
  .setSeed(42)

val gmmModel = gmm.fit(scaledData)
val gmmPredictions = gmmModel.transform(scaledData)

// Analyze GMM results
println("GMM Cluster Analysis:")
gmmPredictions.groupBy("cluster").agg(
  count("*").as("count"),
  avg("annual_spending").as("avg_spending"),
  avg("age").as("avg_age"),
  avg("income").as("avg_income")
).show()

// Show probability distributions
gmmPredictions.select("cluster", "probability").show(10, truncate = false)

Model Selection and Hyperparameter Tuning

Cross-Validation and Grid Search

import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

// Set up parameter grid for Random Forest
val rf = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")

val paramGrid = new ParamGridBuilder()
  .addGrid(rf.numTrees, Array(50, 100, 200))
  .addGrid(rf.maxDepth, Array(5, 10, 15))
  .addGrid(rf.minInstancesPerNode, Array(1, 5, 10))
  .build()

// Set up cross-validator
val cv = new CrossValidator()
  .setEstimator(rf)
  .setEvaluator(new BinaryClassificationEvaluator())
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(5)
  .setParallelism(4)

// Train model with cross-validation
val cvModel = cv.fit(trainingData)

// Get best model
val bestModel = cvModel.bestModel.asInstanceOf[RandomForestClassificationModel]

println("Best model parameters:")
println(s"Number of trees: ${bestModel.getNumTrees}")
println(s"Max depth: ${bestModel.getMaxDepth}")
println(s"Min instances per node: ${bestModel.getMinInstancesPerNode}")

// Evaluate best model
val bestPredictions = cvModel.transform(testData)
val bestAccuracy = multiclassEvaluator.setMetricName("accuracy").evaluate(bestPredictions)
println(f"Best model accuracy: $bestAccuracy%.4f")

Train-Validation Split

import org.apache.spark.ml.tuning.TrainValidationSplit

val trainValidationSplit = new TrainValidationSplit()
  .setEstimator(rf)
  .setEvaluator(new BinaryClassificationEvaluator())
  .setEstimatorParamMaps(paramGrid)
  .setTrainRatio(0.8)
  .setParallelism(4)

val tvModel = trainValidationSplit.fit(trainingData)
val tvPredictions = tvModel.transform(testData)
val tvAccuracy = multiclassEvaluator.setMetricName("accuracy").evaluate(tvPredictions)
println(f"Train-Validation Split accuracy: $tvAccuracy%.4f")

Deep Learning with BigDL

Setting up BigDL

import com.intel.analytics.bigdl.dllib.nn.{Module, Sequential, Linear, ReLU, LogSoftMax}
import com.intel.analytics.bigdl.dllib.nn.abstractnn.Activity
import com.intel.analytics.bigdl.dllib.optim._
import com.intel.analytics.bigdl.dllib.tensor.Tensor
import com.intel.analytics.bigdl.dllib.utils.Engine

// Initialize BigDL engine
Engine.init

// Create a simple neural network
def createMLP(inputSize: Int, hiddenSize: Int, outputSize: Int): Module[Float] = {
  Sequential[Float]()
    .add(Linear(inputSize, hiddenSize))
    .add(ReLU())
    .add(Linear(hiddenSize, hiddenSize))
    .add(ReLU())
    .add(Linear(hiddenSize, outputSize))
    .add(LogSoftMax())
}

// Prepare data for BigDL
def prepareDataForBigDL(df: DataFrame): RDD[Sample[Float]] = {
  df.rdd.map { row =>
    val features = row.getAs[org.apache.spark.ml.linalg.Vector]("features").toArray.map(_.toFloat)
    val label = row.getAs[Double]("label").toFloat + 1 // BigDL labels start from 1

    Sample[Float](
      featureTensor = Tensor(features, Array(features.length)),
      labelTensor = Tensor(Array(label), Array(1))
    )
  }
}

val trainingSamples = prepareDataForBigDL(trainingData)
val testSamples = prepareDataForBigDL(testData)

// Create model
val model = createMLP(inputSize = featureColumns.length, hiddenSize = 64, outputSize = 2)

// Configure optimizer
val optimizer = Optimizer(
  model = model,
  sampleRDD = trainingSamples,
  criterion = ClassNLLCriterion[Float](),
  batchSize = 32
)

optimizer
  .setOptimMethod(new Adam[Float](learningRate = 0.001))
  .setEndWhen(Trigger.maxEpoch(50))
  .setValidation(
    trigger = Trigger.everyEpoch,
    dataset = testSamples,
    vMethods = Array(new Top1Accuracy[Float]())
  )

// Train the model
val trainedModel = optimizer.optimize()

// Make predictions
val predictions = trainedModel.predict(testSamples)

Advanced ML Techniques

Ensemble Methods

// Custom ensemble of multiple models
class ModelEnsemble {

  def createEnsemble(trainingData: DataFrame): EnsembleModel = {
    // Train multiple models
    val rf = new RandomForestClassifier()
      .setNumTrees(100)
      .setMaxDepth(10)
      .fit(trainingData)

    val gbt = new GBTClassifier()
      .setMaxIter(50)
      .setMaxDepth(6)
      .fit(trainingData)

    val lr = new LogisticRegression()
      .setMaxIter(100)
      .setRegParam(0.01)
      .fit(trainingData)

    new EnsembleModel(Array(rf, gbt, lr))
  }
}

class EnsembleModel(models: Array[org.apache.spark.ml.classification.ClassificationModel[_, _]]) {

  def transform(dataset: DataFrame): DataFrame = {
    // Get predictions from all models
    val predictions = models.zipWithIndex.map { case (model, index) =>
      model.transform(dataset)
        .select("prediction")
        .withColumnRenamed("prediction", s"prediction_$index")
    }

    // Combine predictions (majority voting)
    val combined = predictions.reduce(_.join(_, Seq()))

    // Implement majority voting
    val predictionCols = (0 until models.length).map(i => col(s"prediction_$i"))
    val majorityVote = when(
      (predictionCols(0) + predictionCols(1) + predictionCols(2)) > 1.5, 1.0
    ).otherwise(0.0)

    combined.withColumn("ensemble_prediction", majorityVote)
  }
}

Feature Selection and Engineering

// Advanced feature engineering
class AdvancedFeatureEngineering {

  def createPolynomialFeatures(df: DataFrame, inputCol: String, degree: Int): DataFrame = {
    val polyExpansion = new PolynomialExpansion()
      .setInputCol(inputCol)
      .setOutputCol(s"${inputCol}_poly")
      .setDegree(degree)

    polyExpansion.transform(df)
  }

  def createInteractionFeatures(df: DataFrame, cols: Array[String]): DataFrame = {
    val assembler = new VectorAssembler()
      .setInputCols(cols)
      .setOutputCol("interaction_input")

    val interaction = new Interaction()
      .setInputCols(Array("interaction_input"))
      .setOutputCol("interaction_features")

    val pipeline = new Pipeline().setStages(Array(assembler, interaction))
    pipeline.fit(df).transform(df)
  }

  def selectFeaturesRFE(df: DataFrame, k: Int): DataFrame = {
    // Recursive Feature Elimination (simplified)
    val selector = new UnivariateFeatureSelector()
      .setFeatureType("continuous")
      .setLabelType("categorical")
      .setSelectionMode("numTopFeatures")
      .setSelectionThreshold(k)
      .setFeaturesCol("features")
      .setOutputCol("selected_features")

    selector.fit(df).transform(df)
  }
}

Model Persistence and Deployment

Saving and Loading Models

// Save trained models
val modelPath = "models/random_forest_model"
rfModel.write.overwrite().save(modelPath)

// Save pipeline
val pipelinePath = "models/feature_pipeline"
val fittedPipeline = pipeline.fit(trainingData)
fittedPipeline.write.overwrite().save(pipelinePath)

// Load models
val loadedRFModel = RandomForestClassificationModel.load(modelPath)
val loadedPipeline = PipelineModel.load(pipelinePath)

// Create prediction service
class MLPredictionService {

  val model = RandomForestClassificationModel.load(modelPath)
  val featurePipeline = PipelineModel.load(pipelinePath)

  def predict(inputData: DataFrame): DataFrame = {
    val features = featurePipeline.transform(inputData)
    model.transform(features)
  }

  def predictSingle(features: Map[String, Any]): Double = {
    val df = spark.createDataFrame(Seq(features)).toDF()
    val prediction = predict(df)
    prediction.select("prediction").collect()(0).getDouble(0)
  }
}

val predictionService = new MLPredictionService()

Model Monitoring and Evaluation

// Comprehensive model evaluation
class ModelEvaluator {

  def evaluateClassificationModel(
    predictions: DataFrame, 
    labelCol: String = "label",
    predictionCol: String = "prediction"
  ): Map[String, Double] = {

    val binaryEvaluator = new BinaryClassificationEvaluator()
      .setLabelCol(labelCol)
      .setRawPredictionCol("rawPrediction")

    val multiclassEvaluator = new MulticlassClassificationEvaluator()
      .setLabelCol(labelCol)
      .setPredictionCol(predictionCol)

    Map(
      "auc" -> binaryEvaluator.setMetricName("areaUnderROC").evaluate(predictions),
      "pr_auc" -> binaryEvaluator.setMetricName("areaUnderPR").evaluate(predictions),
      "accuracy" -> multiclassEvaluator.setMetricName("accuracy").evaluate(predictions),
      "precision" -> multiclassEvaluator.setMetricName("weightedPrecision").evaluate(predictions),
      "recall" -> multiclassEvaluator.setMetricName("weightedRecall").evaluate(predictions),
      "f1" -> multiclassEvaluator.setMetricName("f1").evaluate(predictions)
    )
  }

  def evaluateRegressionModel(
    predictions: DataFrame,
    labelCol: String = "label",
    predictionCol: String = "prediction"
  ): Map[String, Double] = {

    val evaluator = new RegressionEvaluator()
      .setLabelCol(labelCol)
      .setPredictionCol(predictionCol)

    Map(
      "rmse" -> evaluator.setMetricName("rmse").evaluate(predictions),
      "mae" -> evaluator.setMetricName("mae").evaluate(predictions),
      "r2" -> evaluator.setMetricName("r2").evaluate(predictions),
      "var" -> evaluator.setMetricName("var").evaluate(predictions)
    )
  }

  def calculateConfusionMatrix(predictions: DataFrame): DataFrame = {
    predictions.groupBy("label", "prediction").count()
      .orderBy("label", "prediction")
  }
}

val evaluator = new ModelEvaluator()
val metrics = evaluator.evaluateClassificationModel(predictions)
metrics.foreach { case (metric, value) =>
  println(f"$metric: $value%.4f")
}

Distributed Computing Best Practices

Optimizing Spark ML Performance

// Performance optimization tips
object SparkMLOptimization {

  def optimizeSparkSession(): SparkSession = {
    SparkSession.builder()
      .appName("OptimizedML")
      .config("spark.sql.adaptive.enabled", "true")
      .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
      .config("spark.sql.adaptive.skewJoin.enabled", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.execution.arrow.pyspark.enabled", "true")
      .config("spark.sql.parquet.compression.codec", "snappy")
      .getOrCreate()
  }

  def optimizeDataframe(df: DataFrame): DataFrame = {
    df.cache() // Cache frequently accessed DataFrames
      .repartition(200) // Optimize partitioning
  }

  def parallelModelTraining(data: DataFrame, models: Array[Any]): Array[Any] = {
    // Train multiple models in parallel
    models.par.map { model =>
      // Train model (pseudo-code)
      model
    }.toArray
  }
}

Conclusion

Machine learning in Scala offers powerful capabilities for building scalable, type-safe ML applications. Key advantages include:

Scala Strengths for ML:

  • Type safety prevents runtime errors
  • Functional programming paradigms promote clean, maintainable code
  • Excellent performance on the JVM
  • Native integration with Apache Spark for distributed computing

MLlib Capabilities:

  • Comprehensive suite of ML algorithms
  • Built-in feature engineering and selection tools
  • Scalable to large datasets
  • Pipeline API for workflow management

Deep Learning Options:

  • BigDL for distributed deep learning on Spark
  • Integration with popular frameworks
  • GPU acceleration support

Best Practices:

  • Use appropriate data structures and caching
  • Implement proper cross-validation and model selection
  • Monitor model performance and drift
  • Optimize Spark configurations for ML workloads
  • Design reusable, composable ML pipelines

Production Considerations:

  • Model versioning and persistence
  • Monitoring and logging
  • A/B testing frameworks
  • Real-time prediction services
  • Batch vs. streaming prediction patterns

Scala's combination of type safety, performance, and distributed computing capabilities makes it an excellent choice for enterprise-scale machine learning applications, especially when working with large datasets and requiring high-performance, reliable ML systems.