Machine Learning in Scala: Spark MLlib and Deep Learning
Scala's strong type system, functional programming paradigms, and excellent performance make it an ideal language for machine learning applications. In this lesson, we'll explore machine learning in Scala using Apache Spark MLlib for distributed machine learning and various deep learning frameworks.
Overview of ML in Scala
Why Scala for Machine Learning?
- Type Safety: Catch errors at compile time
- Functional Programming: Immutable data structures and pure functions
- JVM Performance: High-performance execution
- Spark Integration: Native integration with Apache Spark
- Concurrent Processing: Built-in support for parallel computation
Key Libraries and Frameworks
- Apache Spark MLlib: Distributed machine learning
- Smile: Statistical Machine Intelligence and Learning Engine
- Breeze: Numerical processing library
- BigDL: Distributed deep learning for Apache Spark
- DJL (Deep Java Library): Deep learning framework
Setting Up ML Environment
Dependencies
// build.sbt
val SparkVersion = "3.5.0"
val BreezeVersion = "2.1.0"
libraryDependencies ++= Seq(
// Spark MLlib
"org.apache.spark" %% "spark-core" % SparkVersion,
"org.apache.spark" %% "spark-sql" % SparkVersion,
"org.apache.spark" %% "spark-mllib" % SparkVersion,
// Numerical computing
"org.scalanlp" %% "breeze" % BreezeVersion,
"org.scalanlp" %% "breeze-natives" % BreezeVersion,
// ML Libraries
"com.github.haifengl" %% "smile-scala" % "3.0.2",
"com.intel.analytics.bigdl" % "bigdl-SPARK_3.1" % "2.1.0",
// Data processing
"org.apache.spark" %% "spark-sql" % SparkVersion,
"com.typesafe" % "config" % "1.4.2"
)
Basic Setup
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature._
import org.apache.spark.ml.classification._
import org.apache.spark.ml.regression._
import org.apache.spark.ml.clustering._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
// Initialize Spark Session
val spark = SparkSession.builder()
.appName("ML with Scala")
.master("local[*]")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.getOrCreate()
import spark.implicits._
Data Preparation and Feature Engineering
Loading and Exploring Data
// Load data from various sources
def loadCSVData(path: String): DataFrame = {
spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(path)
}
def loadParquetData(path: String): DataFrame = {
spark.read.parquet(path)
}
// Example: Load and explore dataset
val rawData = loadCSVData("data/house_prices.csv")
// Basic exploration
rawData.printSchema()
rawData.show(10)
// Statistical summary
rawData.describe().show()
// Check for missing values
val missingCounts = rawData.columns.map { col =>
val missingCount = rawData.filter(rawData(col).isNull || rawData(col) === "").count()
(col, missingCount)
}
missingCounts.foreach { case (col, count) =>
println(s"Column $col has $count missing values")
}
// Data profiling function
def profileDataFrame(df: DataFrame): Unit = {
println(s"Dataset shape: ${df.count()} rows, ${df.columns.length} columns")
// Numeric columns statistics
val numericCols = df.dtypes.filter(_._2 == "double" || _._2 == "integer").map(_._1)
if (numericCols.nonEmpty) {
df.select(numericCols.map(col): _*).describe().show()
}
// Categorical columns cardinality
val categoricalCols = df.dtypes.filter(_._2 == "string").map(_._1)
categoricalCols.foreach { colName =>
val distinctCount = df.select(colName).distinct().count()
println(s"Column $colName has $distinctCount distinct values")
}
}
profileDataFrame(rawData)
Feature Engineering Pipeline
// Comprehensive feature engineering pipeline
class FeatureEngineeringPipeline {
def buildFeaturePipeline(
categoricalCols: Array[String],
numericCols: Array[String],
targetCol: String
): Pipeline = {
val stages = scala.collection.mutable.ArrayBuffer[org.apache.spark.ml.PipelineStage]()
// Handle missing values
val numericImputer = new Imputer()
.setInputCols(numericCols)
.setOutputCols(numericCols.map(_ + "_imputed"))
.setStrategy("mean")
stages += numericImputer
// Scale numeric features
val assemblerForScaling = new VectorAssembler()
.setInputCols(numericCols.map(_ + "_imputed"))
.setOutputCol("numeric_features_raw")
stages += assemblerForScaling
val scaler = new StandardScaler()
.setInputCol("numeric_features_raw")
.setOutputCol("scaled_numeric_features")
.setWithStd(true)
.setWithMean(true)
stages += scaler
// Handle categorical features
val stringIndexers = categoricalCols.map { colName =>
new StringIndexer()
.setInputCol(colName)
.setOutputCol(s"${colName}_indexed")
.setHandleInvalid("keep")
}
stages ++= stringIndexers
val oneHotEncoders = categoricalCols.map { colName =>
new OneHotEncoder()
.setInputCol(s"${colName}_indexed")
.setOutputCol(s"${colName}_encoded")
}
stages ++= oneHotEncoders
// Combine all features
val featureAssembler = new VectorAssembler()
.setInputCols(Array("scaled_numeric_features") ++ categoricalCols.map(_ + "_encoded"))
.setOutputCol("features")
stages += featureAssembler
new Pipeline().setStages(stages.toArray)
}
// Feature selection using correlation
def selectFeaturesCorrelation(df: DataFrame, threshold: Double = 0.8): Array[String] = {
val assembler = new VectorAssembler()
.setInputCols(df.columns.filter(_ != "label"))
.setOutputCol("temp_features")
val assembled = assembler.transform(df)
val correlationMatrix = org.apache.spark.ml.stat.Correlation.corr(assembled, "temp_features")
// Extract highly correlated features (simplified)
// In practice, you'd implement proper correlation analysis
df.columns.filter(_ != "label")
}
// Feature selection using chi-square test
def selectFeaturesChiSquare(df: DataFrame, k: Int = 10): ChiSqSelector = {
new ChiSqSelector()
.setNumTopFeatures(k)
.setFeaturesCol("features")
.setLabelCol("label")
.setOutputCol("selected_features")
}
}
// Usage
val featurePipeline = new FeatureEngineeringPipeline()
val pipeline = featurePipeline.buildFeaturePipeline(
categoricalCols = Array("neighborhood", "property_type"),
numericCols = Array("area", "bedrooms", "bathrooms", "age"),
targetCol = "price"
)
Classification Algorithms
Logistic Regression
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
// Prepare binary classification dataset
val binaryData = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("data/titanic.csv")
// Feature preparation
val featureColumns = Array("pclass", "age", "sibsp", "parch", "fare")
val assembler = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
val processedData = assembler.transform(binaryData)
.select("features", "survived")
.withColumnRenamed("survived", "label")
// Split data
val Array(trainingData, testData) = processedData.randomSplit(Array(0.8, 0.2), seed = 42)
// Create and train logistic regression model
val lr = new LogisticRegression()
.setMaxIter(100)
.setRegParam(0.01)
.setElasticNetParam(0.8)
val lrModel = lr.fit(trainingData)
// Make predictions
val predictions = lrModel.transform(testData)
// Evaluate model
val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
.setMetricName("areaUnderROC")
val auc = evaluator.evaluate(predictions)
println(s"Area Under ROC: $auc")
// Print model coefficients
println(s"Coefficients: ${lrModel.coefficients}")
println(s"Intercept: ${lrModel.intercept}")
// Feature importance (manual calculation for logistic regression)
val coefficients = lrModel.coefficients.toArray
val featureImportance = featureColumns.zip(coefficients).sortBy(-_._2)
println("Feature Importance:")
featureImportance.foreach { case (feature, coeff) =>
println(f"$feature: $coeff%.4f")
}
Random Forest Classification
import org.apache.spark.ml.classification.{RandomForestClassifier, RandomForestClassificationModel}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
// Multi-class classification with Random Forest
val rf = new RandomForestClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setNumTrees(100)
.setMaxDepth(10)
.setMinInstancesPerNode(1)
.setSubsamplingRate(0.8)
.setFeatureSubsetStrategy("sqrt")
val rfModel = rf.fit(trainingData)
val rfPredictions = rfModel.transform(testData)
// Evaluate with multiple metrics
val multiclassEvaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
val accuracy = multiclassEvaluator.setMetricName("accuracy").evaluate(rfPredictions)
val f1Score = multiclassEvaluator.setMetricName("f1").evaluate(rfPredictions)
val weightedPrecision = multiclassEvaluator.setMetricName("weightedPrecision").evaluate(rfPredictions)
val weightedRecall = multiclassEvaluator.setMetricName("weightedRecall").evaluate(rfPredictions)
println(f"Random Forest Results:")
println(f"Accuracy: $accuracy%.4f")
println(f"F1 Score: $f1Score%.4f")
println(f"Weighted Precision: $weightedPrecision%.4f")
println(f"Weighted Recall: $weightedRecall%.4f")
// Feature importance
val importances = rfModel.featureImportances.toArray
val featureImportance = featureColumns.zip(importances).sortBy(-_._2)
println("\nFeature Importance (Random Forest):")
featureImportance.foreach { case (feature, importance) =>
println(f"$feature: $importance%.4f")
}
Gradient Boosted Trees
import org.apache.spark.ml.classification.GBTClassifier
val gbt = new GBTClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(100)
.setMaxDepth(6)
.setStepSize(0.1)
.setSubsamplingRate(0.8)
val gbtModel = gbt.fit(trainingData)
val gbtPredictions = gbtModel.transform(testData)
val gbtAccuracy = multiclassEvaluator.setMetricName("accuracy").evaluate(gbtPredictions)
println(f"GBT Accuracy: $gbtAccuracy%.4f")
// Feature importance for GBT
val gbtImportances = gbtModel.featureImportances.toArray
val gbtFeatureImportance = featureColumns.zip(gbtImportances).sortBy(-_._2)
println("\nFeature Importance (GBT):")
gbtFeatureImportance.foreach { case (feature, importance) =>
println(f"$feature: $importance%.4f")
}
Regression Algorithms
Linear Regression
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.evaluation.RegressionEvaluator
// Prepare regression dataset
val regressionData = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("data/boston_housing.csv")
val regressionFeatures = Array("crim", "zn", "indus", "chas", "nox", "rm", "age", "dis", "rad", "tax", "ptratio", "b", "lstat")
val regressionAssembler = new VectorAssembler()
.setInputCols(regressionFeatures)
.setOutputCol("features")
val regressionProcessed = regressionAssembler.transform(regressionData)
.select("features", "medv")
.withColumnRenamed("medv", "label")
val Array(trainRegression, testRegression) = regressionProcessed.randomSplit(Array(0.8, 0.2), seed = 42)
// Linear regression with regularization
val linearRegression = new LinearRegression()
.setMaxIter(100)
.setRegParam(0.01)
.setElasticNetParam(0.8)
val lrModelRegression = linearRegression.fit(trainRegression)
val lrPredictionsRegression = lrModelRegression.transform(testRegression)
// Evaluate regression model
val regressionEvaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
val rmse = regressionEvaluator.setMetricName("rmse").evaluate(lrPredictionsRegression)
val mae = regressionEvaluator.setMetricName("mae").evaluate(lrPredictionsRegression)
val r2 = regressionEvaluator.setMetricName("r2").evaluate(lrPredictionsRegression)
println(f"Linear Regression Results:")
println(f"RMSE: $rmse%.4f")
println(f"MAE: $mae%.4f")
println(f"R²: $r2%.4f")
// Model summary
val trainingSummary = lrModelRegression.summary
println(s"Training RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"Training R²: ${trainingSummary.r2}")
Random Forest Regression
import org.apache.spark.ml.regression.RandomForestRegressor
val rfRegressor = new RandomForestRegressor()
.setLabelCol("label")
.setFeaturesCol("features")
.setNumTrees(100)
.setMaxDepth(10)
.setSubsamplingRate(0.8)
val rfRegressionModel = rfRegressor.fit(trainRegression)
val rfRegressionPredictions = rfRegressionModel.transform(testRegression)
val rfRmse = regressionEvaluator.setMetricName("rmse").evaluate(rfRegressionPredictions)
val rfR2 = regressionEvaluator.setMetricName("r2").evaluate(rfRegressionPredictions)
println(f"Random Forest Regression Results:")
println(f"RMSE: $rfRmse%.4f")
println(f"R²: $rfR2%.4f")
Clustering Algorithms
K-Means Clustering
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.evaluation.ClusteringEvaluator
// Prepare clustering dataset
val clusteringData = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("data/customer_data.csv")
val clusteringFeatures = Array("annual_spending", "age", "income")
val clusteringAssembler = new VectorAssembler()
.setInputCols(clusteringFeatures)
.setOutputCol("features")
val clusteringProcessed = clusteringAssembler.transform(clusteringData)
// Scale features for clustering
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(true)
val scalerModel = scaler.fit(clusteringProcessed)
val scaledData = scalerModel.transform(clusteringProcessed)
// Find optimal number of clusters using elbow method
def findOptimalK(data: DataFrame, maxK: Int = 10): Unit = {
val costs = (2 to maxK).map { k =>
val kmeans = new KMeans()
.setK(k)
.setFeaturesCol("scaledFeatures")
.setSeed(42)
val model = kmeans.fit(data)
val cost = model.computeCost(data)
(k, cost)
}
println("K-Means Cost for different K values:")
costs.foreach { case (k, cost) =>
println(f"K = $k, Cost = $cost%.2f")
}
}
findOptimalK(scaledData)
// Train K-Means with optimal K
val kmeans = new KMeans()
.setK(4)
.setFeaturesCol("scaledFeatures")
.setPredictionCol("cluster")
.setSeed(42)
val kmeansModel = kmeans.fit(scaledData)
val clusterPredictions = kmeansModel.transform(scaledData)
// Evaluate clustering
val clusteringEvaluator = new ClusteringEvaluator()
.setFeaturesCol("scaledFeatures")
.setPredictionCol("cluster")
val silhouette = clusteringEvaluator.evaluate(clusterPredictions)
println(f"Silhouette Score: $silhouette%.4f")
// Analyze clusters
clusterPredictions.groupBy("cluster").agg(
count("*").as("count"),
avg("annual_spending").as("avg_spending"),
avg("age").as("avg_age"),
avg("income").as("avg_income")
).show()
// Print cluster centers
println("Cluster Centers:")
kmeansModel.clusterCenters.zipWithIndex.foreach { case (center, index) =>
println(s"Cluster $index: ${center.toArray.mkString(", ")}")
}
Gaussian Mixture Model
import org.apache.spark.ml.clustering.GaussianMixture
val gmm = new GaussianMixture()
.setK(4)
.setFeaturesCol("scaledFeatures")
.setPredictionCol("cluster")
.setProbabilityCol("probability")
.setSeed(42)
val gmmModel = gmm.fit(scaledData)
val gmmPredictions = gmmModel.transform(scaledData)
// Analyze GMM results
println("GMM Cluster Analysis:")
gmmPredictions.groupBy("cluster").agg(
count("*").as("count"),
avg("annual_spending").as("avg_spending"),
avg("age").as("avg_age"),
avg("income").as("avg_income")
).show()
// Show probability distributions
gmmPredictions.select("cluster", "probability").show(10, truncate = false)
Model Selection and Hyperparameter Tuning
Cross-Validation and Grid Search
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
// Set up parameter grid for Random Forest
val rf = new RandomForestClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
val paramGrid = new ParamGridBuilder()
.addGrid(rf.numTrees, Array(50, 100, 200))
.addGrid(rf.maxDepth, Array(5, 10, 15))
.addGrid(rf.minInstancesPerNode, Array(1, 5, 10))
.build()
// Set up cross-validator
val cv = new CrossValidator()
.setEstimator(rf)
.setEvaluator(new BinaryClassificationEvaluator())
.setEstimatorParamMaps(paramGrid)
.setNumFolds(5)
.setParallelism(4)
// Train model with cross-validation
val cvModel = cv.fit(trainingData)
// Get best model
val bestModel = cvModel.bestModel.asInstanceOf[RandomForestClassificationModel]
println("Best model parameters:")
println(s"Number of trees: ${bestModel.getNumTrees}")
println(s"Max depth: ${bestModel.getMaxDepth}")
println(s"Min instances per node: ${bestModel.getMinInstancesPerNode}")
// Evaluate best model
val bestPredictions = cvModel.transform(testData)
val bestAccuracy = multiclassEvaluator.setMetricName("accuracy").evaluate(bestPredictions)
println(f"Best model accuracy: $bestAccuracy%.4f")
Train-Validation Split
import org.apache.spark.ml.tuning.TrainValidationSplit
val trainValidationSplit = new TrainValidationSplit()
.setEstimator(rf)
.setEvaluator(new BinaryClassificationEvaluator())
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.8)
.setParallelism(4)
val tvModel = trainValidationSplit.fit(trainingData)
val tvPredictions = tvModel.transform(testData)
val tvAccuracy = multiclassEvaluator.setMetricName("accuracy").evaluate(tvPredictions)
println(f"Train-Validation Split accuracy: $tvAccuracy%.4f")
Deep Learning with BigDL
Setting up BigDL
import com.intel.analytics.bigdl.dllib.nn.{Module, Sequential, Linear, ReLU, LogSoftMax}
import com.intel.analytics.bigdl.dllib.nn.abstractnn.Activity
import com.intel.analytics.bigdl.dllib.optim._
import com.intel.analytics.bigdl.dllib.tensor.Tensor
import com.intel.analytics.bigdl.dllib.utils.Engine
// Initialize BigDL engine
Engine.init
// Create a simple neural network
def createMLP(inputSize: Int, hiddenSize: Int, outputSize: Int): Module[Float] = {
Sequential[Float]()
.add(Linear(inputSize, hiddenSize))
.add(ReLU())
.add(Linear(hiddenSize, hiddenSize))
.add(ReLU())
.add(Linear(hiddenSize, outputSize))
.add(LogSoftMax())
}
// Prepare data for BigDL
def prepareDataForBigDL(df: DataFrame): RDD[Sample[Float]] = {
df.rdd.map { row =>
val features = row.getAs[org.apache.spark.ml.linalg.Vector]("features").toArray.map(_.toFloat)
val label = row.getAs[Double]("label").toFloat + 1 // BigDL labels start from 1
Sample[Float](
featureTensor = Tensor(features, Array(features.length)),
labelTensor = Tensor(Array(label), Array(1))
)
}
}
val trainingSamples = prepareDataForBigDL(trainingData)
val testSamples = prepareDataForBigDL(testData)
// Create model
val model = createMLP(inputSize = featureColumns.length, hiddenSize = 64, outputSize = 2)
// Configure optimizer
val optimizer = Optimizer(
model = model,
sampleRDD = trainingSamples,
criterion = ClassNLLCriterion[Float](),
batchSize = 32
)
optimizer
.setOptimMethod(new Adam[Float](learningRate = 0.001))
.setEndWhen(Trigger.maxEpoch(50))
.setValidation(
trigger = Trigger.everyEpoch,
dataset = testSamples,
vMethods = Array(new Top1Accuracy[Float]())
)
// Train the model
val trainedModel = optimizer.optimize()
// Make predictions
val predictions = trainedModel.predict(testSamples)
Advanced ML Techniques
Ensemble Methods
// Custom ensemble of multiple models
class ModelEnsemble {
def createEnsemble(trainingData: DataFrame): EnsembleModel = {
// Train multiple models
val rf = new RandomForestClassifier()
.setNumTrees(100)
.setMaxDepth(10)
.fit(trainingData)
val gbt = new GBTClassifier()
.setMaxIter(50)
.setMaxDepth(6)
.fit(trainingData)
val lr = new LogisticRegression()
.setMaxIter(100)
.setRegParam(0.01)
.fit(trainingData)
new EnsembleModel(Array(rf, gbt, lr))
}
}
class EnsembleModel(models: Array[org.apache.spark.ml.classification.ClassificationModel[_, _]]) {
def transform(dataset: DataFrame): DataFrame = {
// Get predictions from all models
val predictions = models.zipWithIndex.map { case (model, index) =>
model.transform(dataset)
.select("prediction")
.withColumnRenamed("prediction", s"prediction_$index")
}
// Combine predictions (majority voting)
val combined = predictions.reduce(_.join(_, Seq()))
// Implement majority voting
val predictionCols = (0 until models.length).map(i => col(s"prediction_$i"))
val majorityVote = when(
(predictionCols(0) + predictionCols(1) + predictionCols(2)) > 1.5, 1.0
).otherwise(0.0)
combined.withColumn("ensemble_prediction", majorityVote)
}
}
Feature Selection and Engineering
// Advanced feature engineering
class AdvancedFeatureEngineering {
def createPolynomialFeatures(df: DataFrame, inputCol: String, degree: Int): DataFrame = {
val polyExpansion = new PolynomialExpansion()
.setInputCol(inputCol)
.setOutputCol(s"${inputCol}_poly")
.setDegree(degree)
polyExpansion.transform(df)
}
def createInteractionFeatures(df: DataFrame, cols: Array[String]): DataFrame = {
val assembler = new VectorAssembler()
.setInputCols(cols)
.setOutputCol("interaction_input")
val interaction = new Interaction()
.setInputCols(Array("interaction_input"))
.setOutputCol("interaction_features")
val pipeline = new Pipeline().setStages(Array(assembler, interaction))
pipeline.fit(df).transform(df)
}
def selectFeaturesRFE(df: DataFrame, k: Int): DataFrame = {
// Recursive Feature Elimination (simplified)
val selector = new UnivariateFeatureSelector()
.setFeatureType("continuous")
.setLabelType("categorical")
.setSelectionMode("numTopFeatures")
.setSelectionThreshold(k)
.setFeaturesCol("features")
.setOutputCol("selected_features")
selector.fit(df).transform(df)
}
}
Model Persistence and Deployment
Saving and Loading Models
// Save trained models
val modelPath = "models/random_forest_model"
rfModel.write.overwrite().save(modelPath)
// Save pipeline
val pipelinePath = "models/feature_pipeline"
val fittedPipeline = pipeline.fit(trainingData)
fittedPipeline.write.overwrite().save(pipelinePath)
// Load models
val loadedRFModel = RandomForestClassificationModel.load(modelPath)
val loadedPipeline = PipelineModel.load(pipelinePath)
// Create prediction service
class MLPredictionService {
val model = RandomForestClassificationModel.load(modelPath)
val featurePipeline = PipelineModel.load(pipelinePath)
def predict(inputData: DataFrame): DataFrame = {
val features = featurePipeline.transform(inputData)
model.transform(features)
}
def predictSingle(features: Map[String, Any]): Double = {
val df = spark.createDataFrame(Seq(features)).toDF()
val prediction = predict(df)
prediction.select("prediction").collect()(0).getDouble(0)
}
}
val predictionService = new MLPredictionService()
Model Monitoring and Evaluation
// Comprehensive model evaluation
class ModelEvaluator {
def evaluateClassificationModel(
predictions: DataFrame,
labelCol: String = "label",
predictionCol: String = "prediction"
): Map[String, Double] = {
val binaryEvaluator = new BinaryClassificationEvaluator()
.setLabelCol(labelCol)
.setRawPredictionCol("rawPrediction")
val multiclassEvaluator = new MulticlassClassificationEvaluator()
.setLabelCol(labelCol)
.setPredictionCol(predictionCol)
Map(
"auc" -> binaryEvaluator.setMetricName("areaUnderROC").evaluate(predictions),
"pr_auc" -> binaryEvaluator.setMetricName("areaUnderPR").evaluate(predictions),
"accuracy" -> multiclassEvaluator.setMetricName("accuracy").evaluate(predictions),
"precision" -> multiclassEvaluator.setMetricName("weightedPrecision").evaluate(predictions),
"recall" -> multiclassEvaluator.setMetricName("weightedRecall").evaluate(predictions),
"f1" -> multiclassEvaluator.setMetricName("f1").evaluate(predictions)
)
}
def evaluateRegressionModel(
predictions: DataFrame,
labelCol: String = "label",
predictionCol: String = "prediction"
): Map[String, Double] = {
val evaluator = new RegressionEvaluator()
.setLabelCol(labelCol)
.setPredictionCol(predictionCol)
Map(
"rmse" -> evaluator.setMetricName("rmse").evaluate(predictions),
"mae" -> evaluator.setMetricName("mae").evaluate(predictions),
"r2" -> evaluator.setMetricName("r2").evaluate(predictions),
"var" -> evaluator.setMetricName("var").evaluate(predictions)
)
}
def calculateConfusionMatrix(predictions: DataFrame): DataFrame = {
predictions.groupBy("label", "prediction").count()
.orderBy("label", "prediction")
}
}
val evaluator = new ModelEvaluator()
val metrics = evaluator.evaluateClassificationModel(predictions)
metrics.foreach { case (metric, value) =>
println(f"$metric: $value%.4f")
}
Distributed Computing Best Practices
Optimizing Spark ML Performance
// Performance optimization tips
object SparkMLOptimization {
def optimizeSparkSession(): SparkSession = {
SparkSession.builder()
.appName("OptimizedML")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.adaptive.skewJoin.enabled", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.execution.arrow.pyspark.enabled", "true")
.config("spark.sql.parquet.compression.codec", "snappy")
.getOrCreate()
}
def optimizeDataframe(df: DataFrame): DataFrame = {
df.cache() // Cache frequently accessed DataFrames
.repartition(200) // Optimize partitioning
}
def parallelModelTraining(data: DataFrame, models: Array[Any]): Array[Any] = {
// Train multiple models in parallel
models.par.map { model =>
// Train model (pseudo-code)
model
}.toArray
}
}
Conclusion
Machine learning in Scala offers powerful capabilities for building scalable, type-safe ML applications. Key advantages include:
Scala Strengths for ML:
- Type safety prevents runtime errors
- Functional programming paradigms promote clean, maintainable code
- Excellent performance on the JVM
- Native integration with Apache Spark for distributed computing
MLlib Capabilities:
- Comprehensive suite of ML algorithms
- Built-in feature engineering and selection tools
- Scalable to large datasets
- Pipeline API for workflow management
Deep Learning Options:
- BigDL for distributed deep learning on Spark
- Integration with popular frameworks
- GPU acceleration support
Best Practices:
- Use appropriate data structures and caching
- Implement proper cross-validation and model selection
- Monitor model performance and drift
- Optimize Spark configurations for ML workloads
- Design reusable, composable ML pipelines
Production Considerations:
- Model versioning and persistence
- Monitoring and logging
- A/B testing frameworks
- Real-time prediction services
- Batch vs. streaming prediction patterns
Scala's combination of type safety, performance, and distributed computing capabilities makes it an excellent choice for enterprise-scale machine learning applications, especially when working with large datasets and requiring high-performance, reliable ML systems.
Comments
Be the first to comment on this lesson!