Graph Processing with GraphX: Network Analysis and Algorithms
Graph processing is essential for analyzing relationships and connections in data, from social networks to recommendation systems. Apache Spark's GraphX library provides a distributed graph processing framework that enables analysis of large-scale graphs. In this lesson, we'll explore comprehensive graph processing techniques using GraphX in Scala.
Understanding Graph Theory and GraphX
Graph Fundamentals
- Vertices (Nodes): Entities in the graph with associated data
- Edges: Connections between vertices with optional weights/properties
- Directed vs Undirected: Whether edges have direction
- Weighted vs Unweighted: Whether edges have associated values
- Multigraph: Graphs with multiple edges between same vertices
GraphX Architecture
- Property Graphs: Vertices and edges can have associated properties
- Distributed Storage: Graphs are partitioned across cluster
- Immutable: Graph operations create new graphs
- RDD-Based: Built on top of Spark RDDs for scalability
Setting Up GraphX Environment
Dependencies and Setup
// build.sbt
val SparkVersion = "3.5.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % SparkVersion,
"org.apache.spark" %% "spark-sql" % SparkVersion,
"org.apache.spark" %% "spark-graphx" % SparkVersion,
"org.apache.spark" %% "spark-mllib" % SparkVersion,
// Graph visualization (optional)
"org.graphstream" % "gs-core" % "2.0",
"org.graphstream" % "gs-ui-swing" % "2.0",
// Network analysis libraries
"org.jgrapht" % "jgrapht-core" % "1.5.1",
"org.gephi" % "gephi-toolkit" % "0.9.7"
)
Basic GraphX Setup
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
// Initialize Spark
val spark = SparkSession.builder()
.appName("GraphX Processing")
.master("local[*]")
.config("spark.sql.adaptive.enabled", "true")
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
Creating and Building Graphs
Basic Graph Construction
// Define vertex and edge types
type VertexId = Long
case class User(name: String, age: Int, occupation: String)
case class Relationship(relationship: String, since: String)
// Create vertices RDD
val users: RDD[(VertexId, User)] = sc.parallelize(Array(
(1L, User("Alice", 28, "Engineer")),
(2L, User("Bob", 32, "Designer")),
(3L, User("Charlie", 25, "Analyst")),
(4L, User("Diana", 30, "Manager")),
(5L, User("Eve", 27, "Developer"))
))
// Create edges RDD
val relationships: RDD[Edge[Relationship]] = sc.parallelize(Array(
Edge(1L, 2L, Relationship("friend", "2020-01-15")),
Edge(2L, 3L, Relationship("colleague", "2019-06-01")),
Edge(3L, 4L, Relationship("reports_to", "2021-03-10")),
Edge(4L, 1L, Relationship("friend", "2018-12-05")),
Edge(1L, 5L, Relationship("mentor", "2020-08-20")),
Edge(5L, 3L, Relationship("collaborator", "2021-01-15"))
))
// Default user for missing vertices
val defaultUser = User("Unknown", 0, "Unknown")
// Create graph
val graph = Graph(users, relationships, defaultUser)
// Basic graph properties
println(s"Number of vertices: ${graph.vertices.count()}")
println(s"Number of edges: ${graph.edges.count()}")
println(s"Number of triplets: ${graph.triplets.count()}")
// Display graph structure
graph.triplets.foreach { triplet =>
println(s"${triplet.srcAttr.name} -> ${triplet.dstAttr.name} (${triplet.attr.relationship})")
}
Loading Graphs from Data Sources
// Load from CSV files
def loadGraphFromCSV(verticesPath: String, edgesPath: String): Graph[User, Relationship] = {
val verticesDF = spark.read
.option("header", "true")
.csv(verticesPath)
val edgesDF = spark.read
.option("header", "true")
.csv(edgesPath)
// Convert to RDDs
val vertices = verticesDF.rdd.map { row =>
(row.getString(0).toLong, User(row.getString(1), row.getString(2).toInt, row.getString(3)))
}
val edges = edgesDF.rdd.map { row =>
Edge(row.getString(0).toLong, row.getString(1).toLong,
Relationship(row.getString(2), row.getString(3)))
}
Graph(vertices, edges, defaultUser)
}
// Load from JSON
def loadGraphFromJSON(path: String): Graph[String, Double] = {
val df = spark.read.json(path)
// Extract vertices and edges from JSON structure
val vertices = df.select("vertices").rdd.flatMap { row =>
row.getAs[Array[Row]]("vertices").map { vertex =>
(vertex.getLong(0), vertex.getString(1))
}
}
val edges = df.select("edges").rdd.flatMap { row =>
row.getAs[Array[Row]]("edges").map { edge =>
Edge(edge.getLong(0), edge.getLong(1), edge.getDouble(2))
}
}
Graph(vertices, edges, "Unknown")
}
// Generate synthetic graphs for testing
def generateRandomGraph(numVertices: Int, numEdges: Int): Graph[Int, Double] = {
val vertices = sc.parallelize((1L to numVertices).map(id => (id, id.toInt)))
val edges = sc.parallelize(
(1 to numEdges).map { _ =>
Edge(
scala.util.Random.nextInt(numVertices) + 1L,
scala.util.Random.nextInt(numVertices) + 1L,
scala.util.Random.nextDouble()
)
}
).distinct() // Remove duplicate edges
Graph(vertices, edges, 0)
}
// Scale-free network (Barabási-Albert model)
def generateScaleFreeGraph(n: Int, m: Int): Graph[Int, Double] = {
import scala.collection.mutable
val vertices = mutable.ArrayBuffer[(VertexId, Int)]()
val edges = mutable.ArrayBuffer[Edge[Double]]()
// Start with small complete graph
for (i <- 1 to m) {
vertices += ((i.toLong, i))
for (j <- 1 until i) {
edges += Edge(i.toLong, j.toLong, 1.0)
}
}
// Add vertices with preferential attachment
for (i <- (m + 1) to n) {
vertices += ((i.toLong, i))
val degrees = edges.flatMap(e => Seq(e.srcId, e.dstId)).groupBy(identity).mapValues(_.size)
val totalDegree = degrees.values.sum
val targets = mutable.Set[Long]()
while (targets.size < m) {
val rand = scala.util.Random.nextDouble() * totalDegree
var cumulative = 0
for ((vertex, degree) <- degrees if targets.size < m) {
cumulative += degree
if (rand <= cumulative && vertex != i && !targets.contains(vertex)) {
targets += vertex
}
}
}
targets.foreach(target => edges += Edge(i.toLong, target, 1.0))
}
Graph(sc.parallelize(vertices), sc.parallelize(edges), 0)
}
val randomGraph = generateRandomGraph(1000, 5000)
val scaleFreeGraph = generateScaleFreeGraph(1000, 3)
Graph Operations and Transformations
Basic Graph Operations
// Vertex operations
val userNames = graph.vertices.map { case (id, user) => (id, user.name) }
val engineeringUsers = graph.vertices.filter { case (id, user) =>
user.occupation == "Engineer"
}
// Edge operations
val friendships = graph.edges.filter(_.attr.relationship == "friend")
val edgeWeights = graph.edges.map(edge => (edge.srcId, edge.dstId, 1.0))
// Structural operations
val undirectedGraph = graph.convertToCanonicalEdges()
val reverseGraph = graph.reverse
// Subgraph extraction
val engineerSubgraph = graph.subgraph(
epred = triplet => triplet.srcAttr.occupation == "Engineer" ||
triplet.dstAttr.occupation == "Engineer",
vpred = (id, user) => user.occupation == "Engineer"
)
// Map operations
val graphWithAges = graph.mapVertices { (id, user) =>
(user.name, user.age)
}
val graphWithWeights = graph.mapEdges { edge =>
edge.attr.relationship match {
case "friend" => 1.0
case "colleague" => 0.8
case "mentor" => 0.9
case "reports_to" => 0.7
case _ => 0.5
}
}
// Join with external data
val promotions: RDD[(VertexId, String)] = sc.parallelize(Array(
(1L, "Senior Engineer"),
(3L, "Senior Analyst")
))
val updatedGraph = graph.outerJoinVertices(promotions) { (id, user, promotion) =>
promotion match {
case Some(newTitle) => user.copy(occupation = newTitle)
case None => user
}
}
Advanced Graph Transformations
// Aggregate neighboring information
def computeNeighborStats[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[(VD, (Int, Int)), ED] = {
graph.aggregateMessages[(Int, Int)](
sendMsg = triplet => {
triplet.sendToSrc((1, 0)) // Count in-neighbors
triplet.sendToDst((0, 1)) // Count out-neighbors
},
mergeMsg = (a, b) => (a._1 + b._1, a._2 + b._2)
).map { case (id, (inDegree, outDegree)) =>
(id, (inDegree, outDegree))
}.rightOuterJoin(graph.vertices).map { case (id, (degreeOpt, vertex)) =>
val degrees = degreeOpt.getOrElse((0, 0))
(id, (vertex, degrees))
}
}
// Compute vertex degrees
val vertexDegrees = graph.degrees.collect().toMap
val inDegrees = graph.inDegrees.collect().toMap
val outDegrees = graph.outDegrees.collect().toMap
println("Vertex degrees:")
vertexDegrees.foreach { case (id, degree) =>
val user = graph.vertices.lookup(id).headOption
println(s"User ${user.map(_.name).getOrElse("Unknown")} (ID: $id): $degree connections")
}
// Find neighbors
def findNeighbors[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], vertexId: VertexId): Array[VertexId] = {
graph.collectNeighborIds(EdgeDirection.Either).lookup(vertexId).headOption.getOrElse(Array.empty)
}
val aliceNeighbors = findNeighbors(graph, 1L)
println(s"Alice's neighbors: ${aliceNeighbors.mkString(", ")}")
// Connected components
val connectedComponents = graph.connectedComponents()
connectedComponents.vertices.collect().foreach { case (id, component) =>
println(s"Vertex $id belongs to component $component")
}
// Strongly connected components (for directed graphs)
val stronglyConnectedComponents = graph.stronglyConnectedComponents(numIter = 10)
stronglyConnectedComponents.vertices.collect().foreach { case (id, component) =>
println(s"Vertex $id belongs to strongly connected component $component")
}
Graph Algorithms
PageRank Algorithm
// Standard PageRank
val pageRankGraph = graph.pageRank(tol = 0.0001)
val pageRankVertices = pageRankGraph.vertices
println("PageRank scores:")
pageRankVertices.join(graph.vertices).sortBy(_._2._1, ascending = false).collect().foreach {
case (id, (rank, user)) =>
println(f"${user.name}: $rank%.4f")
}
// Personalized PageRank (from specific source)
val personalizedPageRank = graph.personalizedPageRank(1L, tol = 0.0001)
println("\nPersonalized PageRank from Alice:")
personalizedPageRank.vertices.join(graph.vertices).sortBy(_._2._1, ascending = false).collect().foreach {
case (id, (rank, user)) =>
println(f"${user.name}: $rank%.4f")
}
// Custom PageRank implementation
def customPageRank[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED],
numIter: Int = 10,
alpha: Double = 0.15
): Graph[Double, ED] = {
val initialGraph = graph.mapVertices((id, _) => 1.0)
def iteration(g: Graph[Double, ED]): Graph[Double, ED] = {
val ranks = g.aggregateMessages[Double](
sendMsg = triplet => triplet.sendToDst(triplet.srcAttr / triplet.srcAttr),
mergeMsg = _ + _
)
g.outerJoinVertices(ranks) { (id, oldRank, msgOpt) =>
alpha + (1.0 - alpha) * msgOpt.getOrElse(0.0)
}
}
(1 to numIter).foldLeft(initialGraph)((g, _) => iteration(g))
}
Shortest Path Algorithms
// Single Source Shortest Path (SSSP)
val shortestPaths = graph.shortestPaths(Seq(1L, 4L))
shortestPaths.vertices.collect().foreach { case (id, pathMap) =>
val user = graph.vertices.lookup(id).head
println(s"Shortest paths from ${user.name}:")
pathMap.foreach { case (target, distance) =>
val targetUser = graph.vertices.lookup(target).head
println(s" to ${targetUser.name}: $distance hops")
}
}
// All pairs shortest paths (for small graphs)
def allPairsShortestPaths[VD: ClassTag](graph: Graph[VD, Double]): Graph[Map[VertexId, Double], Double] = {
val landmarks = graph.vertices.map(_._1).collect()
val initialGraph = graph.mapVertices { (id, _) =>
Map(id -> 0.0) ++ landmarks.filter(_ != id).map(_ -> Double.PositiveInfinity).toMap
}
def iteration(g: Graph[Map[VertexId, Double], Double]): Graph[Map[VertexId, Double], Double] = {
val messages = g.aggregateMessages[Map[VertexId, Double]](
sendMsg = triplet => {
val updates = triplet.srcAttr.map { case (landmark, dist) =>
landmark -> (dist + triplet.attr)
}
triplet.sendToDst(updates)
},
mergeMsg = (map1, map2) => {
(map1.keySet ++ map2.keySet).map { landmark =>
landmark -> math.min(map1.getOrElse(landmark, Double.PositiveInfinity),
map2.getOrElse(landmark, Double.PositiveInfinity))
}.toMap
}
)
g.outerJoinVertices(messages) { (id, oldMap, msgOpt) =>
msgOpt match {
case Some(newMap) =>
(oldMap.keySet ++ newMap.keySet).map { landmark =>
landmark -> math.min(oldMap.getOrElse(landmark, Double.PositiveInfinity),
newMap.getOrElse(landmark, Double.PositiveInfinity))
}.toMap
case None => oldMap
}
}
}
// Run until convergence
var currentGraph = initialGraph
for (_ <- 1 to landmarks.length) {
currentGraph = iteration(currentGraph)
}
currentGraph
}
// Bellman-Ford algorithm for weighted shortest paths
def bellmanFord[VD: ClassTag](
graph: Graph[VD, Double],
sourceId: VertexId
): Graph[Double, Double] = {
val initialGraph = graph.mapVertices { (id, _) =>
if (id == sourceId) 0.0 else Double.PositiveInfinity
}
def iteration(g: Graph[Double, Double]): Graph[Double, Double] = {
val messages = g.aggregateMessages[Double](
sendMsg = triplet => {
if (triplet.srcAttr != Double.PositiveInfinity) {
triplet.sendToDst(triplet.srcAttr + triplet.attr)
}
},
mergeMsg = math.min
)
g.outerJoinVertices(messages) { (id, oldDist, msgOpt) =>
msgOpt match {
case Some(newDist) => math.min(oldDist, newDist)
case None => oldDist
}
}
}
val numVertices = graph.vertices.count().toInt
(1 to numVertices - 1).foldLeft(initialGraph)((g, _) => iteration(g))
}
Clustering and Community Detection
// Label Propagation Algorithm (LPA)
val communities = graph.labelPropagation(maxSteps = 10)
communities.vertices.join(graph.vertices).collect().foreach { case (id, (community, user)) =>
println(s"${user.name} belongs to community $community")
}
// Custom community detection using modularity
def louvainCommunityDetection[VD: ClassTag](
graph: Graph[VD, Double],
maxIterations: Int = 10
): Graph[Long, Double] = {
// Initialize each vertex as its own community
var currentGraph = graph.mapVertices((id, _) => id)
var improved = true
var iteration = 0
while (improved && iteration < maxIterations) {
improved = false
iteration += 1
// For each vertex, consider moving to neighbor communities
val messages = currentGraph.aggregateMessages[(Long, Double)](
sendMsg = triplet => {
// Send community and edge weight to neighbors
triplet.sendToSrc((triplet.dstAttr, triplet.attr))
triplet.sendToDst((triplet.srcAttr, triplet.attr))
},
mergeMsg = (a, b) => if (a._2 > b._2) a else b
)
val newCommunities = currentGraph.outerJoinVertices(messages) { (id, currentCommunity, msgOpt) =>
msgOpt match {
case Some((suggestedCommunity, weight)) =>
if (suggestedCommunity != currentCommunity && weight > 0.5) {
improved = true
suggestedCommunity
} else {
currentCommunity
}
case None => currentCommunity
}
}
currentGraph = newCommunities
}
currentGraph
}
// Triangle counting for clustering coefficient
val triangleCount = graph.triangleCount()
triangleCount.vertices.join(graph.vertices).collect().foreach { case (id, (triangles, user)) =>
println(s"${user.name} participates in $triangles triangles")
}
// Clustering coefficient calculation
def clusteringCoefficient[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Double, ED] = {
val triangles = graph.triangleCount()
val degrees = graph.degrees
triangles.vertices.join(degrees).mapValues { case (triangleCount, degree) =>
if (degree >= 2) {
2.0 * triangleCount / (degree * (degree - 1))
} else {
0.0
}
}.rightOuterJoin(graph.vertices).mapValues { case (coeffOpt, _) =>
coeffOpt.getOrElse(0.0)
}
}
Centrality Measures
// Betweenness centrality (approximate for large graphs)
def betweennessCentrality[VD: ClassTag](
graph: Graph[VD, Double],
landmarks: Array[VertexId]
): Graph[Double, Double] = {
// For each landmark, compute shortest paths and dependencies
val centralityContributions = landmarks.map { landmark =>
val sssp = graph.mapVertices { (id, _) =>
if (id == landmark) 0.0 else Double.PositiveInfinity
}
// Run SSSP from landmark
val distances = bellmanFord(graph.mapVertices((_, _) => ()), landmark)
// Compute dependencies (simplified version)
distances.vertices.map { case (id, dist) =>
if (dist != Double.PositiveInfinity && id != landmark) {
(id, 1.0 / distances.vertices.count())
} else {
(id, 0.0)
}
}
}
// Aggregate contributions
val betweenness = centralityContributions.reduce { (a, b) =>
a.union(b).reduceByKey(_ + _)
}
graph.outerJoinVertices(betweenness) { (id, attr, centralityOpt) =>
centralityOpt.getOrElse(0.0)
}
}
// Closeness centrality
def closenessCentrality[VD: ClassTag](graph: Graph[VD, Double]): Graph[Double, Double] = {
val landmarks = graph.vertices.map(_._1).collect()
val shortestPaths = allPairsShortestPaths(graph.mapVertices((_, _) => ()))
shortestPaths.mapVertices { (id, pathMap) =>
val totalDistance = pathMap.values.filter(_ != Double.PositiveInfinity).sum
if (totalDistance > 0) {
(pathMap.size - 1).toDouble / totalDistance
} else {
0.0
}
}
}
// Eigenvector centrality (simplified Power Iteration)
def eigenvectorCentrality[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED],
numIter: Int = 20
): Graph[Double, ED] = {
val initialGraph = graph.mapVertices((id, _) => 1.0)
def iteration(g: Graph[Double, ED]): Graph[Double, ED] = {
val messages = g.aggregateMessages[Double](
sendMsg = triplet => {
triplet.sendToSrc(triplet.dstAttr)
triplet.sendToDst(triplet.srcAttr)
},
mergeMsg = _ + _
)
val newScores = g.outerJoinVertices(messages) { (id, oldScore, msgOpt) =>
msgOpt.getOrElse(0.0)
}
// Normalize
val norm = math.sqrt(newScores.vertices.map(_._2).map(x => x * x).reduce(_ + _))
newScores.mapVertices((id, score) => score / norm)
}
(1 to numIter).foldLeft(initialGraph)((g, _) => iteration(g))
}
Real-World Applications
Social Network Analysis
// Social network metrics
class SocialNetworkAnalyzer[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
def analyzeNetwork(): Map[String, Any] = {
val numVertices = graph.vertices.count()
val numEdges = graph.edges.count()
val density = (2.0 * numEdges) / (numVertices * (numVertices - 1))
val degreeStats = graph.degrees.map(_._2).stats()
val components = graph.connectedComponents().vertices.map(_._2).distinct().count()
Map(
"vertices" -> numVertices,
"edges" -> numEdges,
"density" -> density,
"avgDegree" -> degreeStats.mean,
"maxDegree" -> degreeStats.max,
"components" -> components
)
}
def findInfluencers(topK: Int = 10): Array[(VertexId, Double)] = {
val pageRank = graph.pageRank(0.0001)
pageRank.vertices.top(topK)(Ordering.by(_._2))
}
def detectCommunities(): Graph[Long, ED] = {
graph.labelPropagation(maxSteps = 20)
}
def analyzeInformation Diffusion(seedSet: Set[VertexId], threshold: Double = 0.5): Graph[Boolean, ED] = {
var currentGraph = graph.mapVertices { (id, _) =>
seedSet.contains(id)
}
var changed = true
while (changed) {
changed = false
val messages = currentGraph.aggregateMessages[Double](
sendMsg = triplet => {
if (triplet.srcAttr) {
triplet.sendToDst(1.0)
}
},
mergeMsg = _ + _
)
val newAdoptions = currentGraph.outerJoinVertices(messages) { (id, adopted, msgOpt) =>
if (!adopted && msgOpt.exists(_ / currentGraph.degrees.lookup(id).head >= threshold)) {
changed = true
true
} else {
adopted
}
}
currentGraph = newAdoptions
}
currentGraph
}
}
// Usage example
val socialNetwork = graph // Assume this is a social network
val analyzer = new SocialNetworkAnalyzer(socialNetwork)
val networkStats = analyzer.analyzeNetwork()
println(s"Network statistics: $networkStats")
val influencers = analyzer.findInfluencers(5)
println("Top influencers:")
influencers.foreach { case (id, score) =>
val user = graph.vertices.lookup(id).headOption
println(f"${user.map(_.name).getOrElse("Unknown")}: $score%.4f")
}
Recommendation System
// Collaborative filtering using graph-based approach
class GraphBasedRecommendation {
case class Item(id: String, category: String, rating: Double)
case class UserItemRating(rating: Double, timestamp: Long)
def buildBipartiteGraph(
userItems: RDD[(VertexId, VertexId, Double)]
): Graph[String, Double] = {
val userVertices = userItems.map(_._1).distinct().map(id => (id, s"user_$id"))
val itemVertices = userItems.map(_._2).distinct().map(id => (id + 1000000L, s"item_${id - 1000000L}"))
val vertices = userVertices.union(itemVertices)
val edges = userItems.map { case (userId, itemId, rating) =>
Edge(userId, itemId + 1000000L, rating)
}
Graph(vertices, edges, "unknown")
}
def computeItemSimilarity(graph: Graph[String, Double]): Graph[Map[VertexId, Double], Double] = {
// Compute item-item similarity using collaborative filtering
val itemVertices = graph.vertices.filter(_._2.startsWith("item_"))
itemVertices.cartesian(itemVertices).filter { case ((id1, _), (id2, _)) => id1 < id2 }
.map { case ((id1, _), (id2, _)) =>
val item1Users = graph.edges.filter(_.dstId == id1).map(e => (e.srcId, e.attr)).collect().toMap
val item2Users = graph.edges.filter(_.dstId == id2).map(e => (e.srcId, e.attr)).collect().toMap
val commonUsers = item1Users.keySet.intersect(item2Users.keySet)
if (commonUsers.nonEmpty) {
val similarity = commonUsers.map { userId =>
item1Users(userId) * item2Users(userId)
}.sum / (commonUsers.size * 5.0) // Normalize by max rating
(id1, Map(id2 -> similarity))
} else {
(id1, Map.empty[VertexId, Double])
}
}
.reduceByKey { (map1, map2) =>
(map1.keySet ++ map2.keySet).map { key =>
key -> (map1.getOrElse(key, 0.0) + map2.getOrElse(key, 0.0))
}.toMap
}
.rightOuterJoin(graph.vertices)
.mapValues { case (simOpt, _) => simOpt.getOrElse(Map.empty) }
}
def recommendItems(
userId: VertexId,
graph: Graph[String, Double],
topK: Int = 10
): Array[(VertexId, Double)] = {
val userRatings = graph.edges.filter(_.srcId == userId).map(e => (e.dstId, e.attr)).collect().toMap
val itemSimilarity = computeItemSimilarity(graph)
val recommendations = itemSimilarity.vertices
.filter { case (itemId, _) => !userRatings.contains(itemId) }
.map { case (itemId, similarities) =>
val score = similarities.map { case (similarItemId, similarity) =>
userRatings.get(similarItemId).map(_ * similarity).getOrElse(0.0)
}.sum
(itemId, score)
}
.filter(_._2 > 0)
.top(topK)(Ordering.by(_._2))
recommendations
}
}
Fraud Detection
// Graph-based fraud detection
class FraudDetectionSystem {
case class Transaction(amount: Double, timestamp: Long, transactionType: String)
case class Account(accountType: String, creationDate: Long, riskScore: Double)
def detectFraudulentCommunities(
transactionGraph: Graph[Account, Transaction]
): Graph[Boolean, Transaction] = {
// Detect suspicious patterns
val suspiciousAccounts = transactionGraph.vertices.filter { case (id, account) =>
account.riskScore > 0.8 ||
(System.currentTimeMillis() - account.creationDate) < 86400000L // Less than 1 day old
}.map(_._1).collect().toSet
// Find communities with high fraud risk
val communities = transactionGraph.labelPropagation(maxSteps = 10)
val communityRisk = communities.vertices
.map { case (accountId, communityId) =>
val isSuspicious = suspiciousAccounts.contains(accountId)
(communityId, if (isSuspicious) 1 else 0)
}
.reduceByKey(_ + _)
.map { case (communityId, suspiciousCount) =>
val totalInCommunity = communities.vertices.filter(_._2 == communityId).count()
val riskRatio = suspiciousCount.toDouble / totalInCommunity
(communityId, riskRatio > 0.3) // Mark community as fraudulent if >30% suspicious
}
.collect()
.toMap
// Mark accounts in fraudulent communities
communities.vertices
.map { case (accountId, communityId) =>
(accountId, communityRisk.getOrElse(communityId, false))
}
.rightOuterJoin(transactionGraph.vertices)
.mapValues { case (fraudOpt, _) => fraudOpt.getOrElse(false) }
}
def findSuspiciousPatterns(
graph: Graph[Account, Transaction]
): RDD[(VertexId, String, Double)] = {
// Pattern 1: Rapid money movement (velocity)
val rapidMovement = graph.aggregateMessages[(Double, Int)](
sendMsg = triplet => {
val hoursDiff = (System.currentTimeMillis() - triplet.attr.timestamp) / 3600000.0
if (hoursDiff <= 24) { // Within 24 hours
triplet.sendToSrc((triplet.attr.amount, 1))
triplet.sendToDst((triplet.attr.amount, 1))
}
},
mergeMsg = (a, b) => (a._1 + b._1, a._2 + b._2)
).filter { case (id, (totalAmount, transactionCount)) =>
totalAmount > 100000 && transactionCount > 50 // High volume and frequency
}.map { case (id, (amount, count)) =>
(id, "rapid_movement", amount / count)
}
// Pattern 2: Circular transactions
val circularTransactions = graph.edges
.map(edge => (edge.srcId, edge.dstId))
.join(graph.edges.map(edge => (edge.dstId, edge.srcId)))
.map { case (src, (dst, returnSrc)) =>
if (returnSrc == src) (src, "circular", 1.0) else (src, "none", 0.0)
}
.filter(_._3 > 0)
rapidMovement.union(circularTransactions)
}
}
Performance Optimization and Best Practices
Graph Partitioning Strategies
// Custom partitioning for better performance
import org.apache.spark.graphx.PartitionStrategy
// Choose partitioning strategy based on graph characteristics
val partitionedGraph = graph.partitionBy(PartitionStrategy.EdgePartition2D, numPartitions = 8)
// Custom partitioner for domain-specific optimization
class CommunityPartitioner(numPartitions: Int, communityAssignment: Map[VertexId, Int])
extends org.apache.spark.Partitioner {
override def numPartitions: Int = numPartitions
override def getPartition(key: Any): Int = {
key match {
case vertexId: VertexId =>
communityAssignment.getOrElse(vertexId, vertexId.hashCode() % numPartitions)
case _ => key.hashCode() % numPartitions
}
}
}
// Graph-specific caching strategies
def optimizeGraphPerformance[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED]
): Graph[VD, ED] = {
// Cache frequently accessed components
graph.vertices.cache()
graph.edges.cache()
// Materialize the graph to avoid recomputation
graph.vertices.count()
graph.edges.count()
graph
}
// Memory-efficient processing for large graphs
def processLargeGraph[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED],
batchSize: Int = 1000
): Unit = {
val vertexIds = graph.vertices.map(_._1).collect()
vertexIds.grouped(batchSize).foreach { batch =>
val subgraph = graph.subgraph(vpred = (id, _) => batch.contains(id))
// Process subgraph
val localResults = processSubgraph(subgraph)
// Save or aggregate results
saveResults(localResults)
// Clean up
subgraph.unpersist()
}
}
def processSubgraph[VD: ClassTag, ED: ClassTag](subgraph: Graph[VD, ED]): Array[(VertexId, Double)] = {
// Implement specific processing logic
subgraph.vertices.map { case (id, _) => (id, 1.0) }.collect()
}
def saveResults(results: Array[(VertexId, Double)]): Unit = {
// Save results to storage
println(s"Processed ${results.length} vertices")
}
Monitoring and Debugging
// Graph statistics and monitoring
class GraphMonitor[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
def collectStatistics(): Map[String, Any] = {
val vertexCount = graph.vertices.count()
val edgeCount = graph.edges.count()
val tripletCount = graph.triplets.count()
val degreeStats = graph.degrees.map(_._2).stats()
val edgeStats = graph.edges.map(_ => 1).stats()
Map(
"vertices" -> vertexCount,
"edges" -> edgeCount,
"triplets" -> tripletCount,
"avgDegree" -> degreeStats.mean,
"maxDegree" -> degreeStats.max,
"minDegree" -> degreeStats.min,
"densityRatio" -> (2.0 * edgeCount) / (vertexCount * (vertexCount - 1))
)
}
def validateGraph(): Array[String] = {
val issues = scala.collection.mutable.ArrayBuffer[String]()
// Check for orphaned vertices
val vertexIds = graph.vertices.map(_._1).collect().toSet
val edgeVertexIds = graph.edges.flatMap(e => Array(e.srcId, e.dstId)).distinct().collect().toSet
val orphanedVertices = vertexIds.diff(edgeVertexIds)
if (orphanedVertices.nonEmpty) {
issues += s"Found ${orphanedVertices.size} orphaned vertices"
}
// Check for self-loops
val selfLoops = graph.edges.filter(e => e.srcId == e.dstId).count()
if (selfLoops > 0) {
issues += s"Found $selfLoops self-loops"
}
// Check for duplicate edges
val totalEdges = graph.edges.count()
val uniqueEdges = graph.edges.map(e => (e.srcId, e.dstId)).distinct().count()
if (totalEdges != uniqueEdges) {
issues += s"Found ${totalEdges - uniqueEdges} duplicate edges"
}
issues.toArray
}
def profilePerformance(operation: () => Unit): Map[String, Long] = {
val startTime = System.currentTimeMillis()
val startMemory = Runtime.getRuntime.totalMemory() - Runtime.getRuntime.freeMemory()
operation()
val endTime = System.currentTimeMillis()
val endMemory = Runtime.getRuntime.totalMemory() - Runtime.getRuntime.freeMemory()
Map(
"executionTime" -> (endTime - startTime),
"memoryUsed" -> (endMemory - startMemory)
)
}
}
// Usage
val monitor = new GraphMonitor(graph)
val stats = monitor.collectStatistics()
println(s"Graph statistics: $stats")
val issues = monitor.validateGraph()
if (issues.nonEmpty) {
println(s"Graph validation issues: ${issues.mkString(", ")}")
}
val performance = monitor.profilePerformance(() => {
graph.pageRank(0.0001).vertices.count()
})
println(s"PageRank performance: $performance")
Conclusion
GraphX provides a powerful framework for large-scale graph processing and network analysis. Key capabilities include:
Core Features:
- Distributed graph storage and processing
- Rich set of graph algorithms and transformations
- Integration with Spark ecosystem
- Property graphs with vertex and edge attributes
Algorithm Support:
- Centrality measures (PageRank, Betweenness, Closeness)
- Community detection and clustering
- Shortest path algorithms
- Triangle counting and graph metrics
Real-World Applications:
- Social network analysis and influence detection
- Recommendation systems and collaborative filtering
- Fraud detection and risk analysis
- Network topology analysis
Performance Optimizations:
- Strategic graph partitioning
- Efficient caching and persistence
- Memory management for large graphs
- Batch processing for scalability
Best Practices:
- Choose appropriate partitioning strategies
- Cache frequently accessed graph components
- Monitor graph statistics and validate data quality
- Use appropriate algorithms for specific use cases
- Consider graph characteristics when optimizing
GraphX excels in scenarios requiring analysis of relationships and connectivity patterns, making it invaluable for applications in social media, finance, telecommunications, and any domain where understanding network structure and dynamics is crucial.
Comments
Be the first to comment on this lesson!