Graph Processing with GraphX: Network Analysis and Algorithms

Graph processing is essential for analyzing relationships and connections in data, from social networks to recommendation systems. Apache Spark's GraphX library provides a distributed graph processing framework that enables analysis of large-scale graphs. In this lesson, we'll explore comprehensive graph processing techniques using GraphX in Scala.

Understanding Graph Theory and GraphX

Graph Fundamentals

  • Vertices (Nodes): Entities in the graph with associated data
  • Edges: Connections between vertices with optional weights/properties
  • Directed vs Undirected: Whether edges have direction
  • Weighted vs Unweighted: Whether edges have associated values
  • Multigraph: Graphs with multiple edges between same vertices

GraphX Architecture

  • Property Graphs: Vertices and edges can have associated properties
  • Distributed Storage: Graphs are partitioned across cluster
  • Immutable: Graph operations create new graphs
  • RDD-Based: Built on top of Spark RDDs for scalability

Setting Up GraphX Environment

Dependencies and Setup

// build.sbt
val SparkVersion = "3.5.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % SparkVersion,
  "org.apache.spark" %% "spark-sql" % SparkVersion,
  "org.apache.spark" %% "spark-graphx" % SparkVersion,
  "org.apache.spark" %% "spark-mllib" % SparkVersion,

  // Graph visualization (optional)
  "org.graphstream" % "gs-core" % "2.0",
  "org.graphstream" % "gs-ui-swing" % "2.0",

  // Network analysis libraries
  "org.jgrapht" % "jgrapht-core" % "1.5.1",
  "org.gephi" % "gephi-toolkit" % "0.9.7"
)

Basic GraphX Setup

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag

// Initialize Spark
val spark = SparkSession.builder()
  .appName("GraphX Processing")
  .master("local[*]")
  .config("spark.sql.adaptive.enabled", "true")
  .getOrCreate()

val sc = spark.sparkContext
import spark.implicits._

Creating and Building Graphs

Basic Graph Construction

// Define vertex and edge types
type VertexId = Long
case class User(name: String, age: Int, occupation: String)
case class Relationship(relationship: String, since: String)

// Create vertices RDD
val users: RDD[(VertexId, User)] = sc.parallelize(Array(
  (1L, User("Alice", 28, "Engineer")),
  (2L, User("Bob", 32, "Designer")),
  (3L, User("Charlie", 25, "Analyst")),
  (4L, User("Diana", 30, "Manager")),
  (5L, User("Eve", 27, "Developer"))
))

// Create edges RDD
val relationships: RDD[Edge[Relationship]] = sc.parallelize(Array(
  Edge(1L, 2L, Relationship("friend", "2020-01-15")),
  Edge(2L, 3L, Relationship("colleague", "2019-06-01")),
  Edge(3L, 4L, Relationship("reports_to", "2021-03-10")),
  Edge(4L, 1L, Relationship("friend", "2018-12-05")),
  Edge(1L, 5L, Relationship("mentor", "2020-08-20")),
  Edge(5L, 3L, Relationship("collaborator", "2021-01-15"))
))

// Default user for missing vertices
val defaultUser = User("Unknown", 0, "Unknown")

// Create graph
val graph = Graph(users, relationships, defaultUser)

// Basic graph properties
println(s"Number of vertices: ${graph.vertices.count()}")
println(s"Number of edges: ${graph.edges.count()}")
println(s"Number of triplets: ${graph.triplets.count()}")

// Display graph structure
graph.triplets.foreach { triplet =>
  println(s"${triplet.srcAttr.name} -> ${triplet.dstAttr.name} (${triplet.attr.relationship})")
}

Loading Graphs from Data Sources

// Load from CSV files
def loadGraphFromCSV(verticesPath: String, edgesPath: String): Graph[User, Relationship] = {
  val verticesDF = spark.read
    .option("header", "true")
    .csv(verticesPath)

  val edgesDF = spark.read
    .option("header", "true")
    .csv(edgesPath)

  // Convert to RDDs
  val vertices = verticesDF.rdd.map { row =>
    (row.getString(0).toLong, User(row.getString(1), row.getString(2).toInt, row.getString(3)))
  }

  val edges = edgesDF.rdd.map { row =>
    Edge(row.getString(0).toLong, row.getString(1).toLong, 
         Relationship(row.getString(2), row.getString(3)))
  }

  Graph(vertices, edges, defaultUser)
}

// Load from JSON
def loadGraphFromJSON(path: String): Graph[String, Double] = {
  val df = spark.read.json(path)

  // Extract vertices and edges from JSON structure
  val vertices = df.select("vertices").rdd.flatMap { row =>
    row.getAs[Array[Row]]("vertices").map { vertex =>
      (vertex.getLong(0), vertex.getString(1))
    }
  }

  val edges = df.select("edges").rdd.flatMap { row =>
    row.getAs[Array[Row]]("edges").map { edge =>
      Edge(edge.getLong(0), edge.getLong(1), edge.getDouble(2))
    }
  }

  Graph(vertices, edges, "Unknown")
}

// Generate synthetic graphs for testing
def generateRandomGraph(numVertices: Int, numEdges: Int): Graph[Int, Double] = {
  val vertices = sc.parallelize((1L to numVertices).map(id => (id, id.toInt)))
  val edges = sc.parallelize(
    (1 to numEdges).map { _ =>
      Edge(
        scala.util.Random.nextInt(numVertices) + 1L,
        scala.util.Random.nextInt(numVertices) + 1L,
        scala.util.Random.nextDouble()
      )
    }
  ).distinct() // Remove duplicate edges

  Graph(vertices, edges, 0)
}

// Scale-free network (Barabási-Albert model)
def generateScaleFreeGraph(n: Int, m: Int): Graph[Int, Double] = {
  import scala.collection.mutable

  val vertices = mutable.ArrayBuffer[(VertexId, Int)]()
  val edges = mutable.ArrayBuffer[Edge[Double]]()

  // Start with small complete graph
  for (i <- 1 to m) {
    vertices += ((i.toLong, i))
    for (j <- 1 until i) {
      edges += Edge(i.toLong, j.toLong, 1.0)
    }
  }

  // Add vertices with preferential attachment
  for (i <- (m + 1) to n) {
    vertices += ((i.toLong, i))
    val degrees = edges.flatMap(e => Seq(e.srcId, e.dstId)).groupBy(identity).mapValues(_.size)
    val totalDegree = degrees.values.sum

    val targets = mutable.Set[Long]()
    while (targets.size < m) {
      val rand = scala.util.Random.nextDouble() * totalDegree
      var cumulative = 0
      for ((vertex, degree) <- degrees if targets.size < m) {
        cumulative += degree
        if (rand <= cumulative && vertex != i && !targets.contains(vertex)) {
          targets += vertex
        }
      }
    }

    targets.foreach(target => edges += Edge(i.toLong, target, 1.0))
  }

  Graph(sc.parallelize(vertices), sc.parallelize(edges), 0)
}

val randomGraph = generateRandomGraph(1000, 5000)
val scaleFreeGraph = generateScaleFreeGraph(1000, 3)

Graph Operations and Transformations

Basic Graph Operations

// Vertex operations
val userNames = graph.vertices.map { case (id, user) => (id, user.name) }
val engineeringUsers = graph.vertices.filter { case (id, user) => 
  user.occupation == "Engineer" 
}

// Edge operations
val friendships = graph.edges.filter(_.attr.relationship == "friend")
val edgeWeights = graph.edges.map(edge => (edge.srcId, edge.dstId, 1.0))

// Structural operations
val undirectedGraph = graph.convertToCanonicalEdges()
val reverseGraph = graph.reverse

// Subgraph extraction
val engineerSubgraph = graph.subgraph(
  epred = triplet => triplet.srcAttr.occupation == "Engineer" || 
                   triplet.dstAttr.occupation == "Engineer",
  vpred = (id, user) => user.occupation == "Engineer"
)

// Map operations
val graphWithAges = graph.mapVertices { (id, user) =>
  (user.name, user.age)
}

val graphWithWeights = graph.mapEdges { edge =>
  edge.attr.relationship match {
    case "friend" => 1.0
    case "colleague" => 0.8
    case "mentor" => 0.9
    case "reports_to" => 0.7
    case _ => 0.5
  }
}

// Join with external data
val promotions: RDD[(VertexId, String)] = sc.parallelize(Array(
  (1L, "Senior Engineer"),
  (3L, "Senior Analyst")
))

val updatedGraph = graph.outerJoinVertices(promotions) { (id, user, promotion) =>
  promotion match {
    case Some(newTitle) => user.copy(occupation = newTitle)
    case None => user
  }
}

Advanced Graph Transformations

// Aggregate neighboring information
def computeNeighborStats[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[(VD, (Int, Int)), ED] = {
  graph.aggregateMessages[(Int, Int)](
    sendMsg = triplet => {
      triplet.sendToSrc((1, 0)) // Count in-neighbors
      triplet.sendToDst((0, 1)) // Count out-neighbors
    },
    mergeMsg = (a, b) => (a._1 + b._1, a._2 + b._2)
  ).map { case (id, (inDegree, outDegree)) =>
    (id, (inDegree, outDegree))
  }.rightOuterJoin(graph.vertices).map { case (id, (degreeOpt, vertex)) =>
    val degrees = degreeOpt.getOrElse((0, 0))
    (id, (vertex, degrees))
  }
}

// Compute vertex degrees
val vertexDegrees = graph.degrees.collect().toMap
val inDegrees = graph.inDegrees.collect().toMap
val outDegrees = graph.outDegrees.collect().toMap

println("Vertex degrees:")
vertexDegrees.foreach { case (id, degree) =>
  val user = graph.vertices.lookup(id).headOption
  println(s"User ${user.map(_.name).getOrElse("Unknown")} (ID: $id): $degree connections")
}

// Find neighbors
def findNeighbors[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], vertexId: VertexId): Array[VertexId] = {
  graph.collectNeighborIds(EdgeDirection.Either).lookup(vertexId).headOption.getOrElse(Array.empty)
}

val aliceNeighbors = findNeighbors(graph, 1L)
println(s"Alice's neighbors: ${aliceNeighbors.mkString(", ")}")

// Connected components
val connectedComponents = graph.connectedComponents()
connectedComponents.vertices.collect().foreach { case (id, component) =>
  println(s"Vertex $id belongs to component $component")
}

// Strongly connected components (for directed graphs)
val stronglyConnectedComponents = graph.stronglyConnectedComponents(numIter = 10)
stronglyConnectedComponents.vertices.collect().foreach { case (id, component) =>
  println(s"Vertex $id belongs to strongly connected component $component")
}

Graph Algorithms

PageRank Algorithm

// Standard PageRank
val pageRankGraph = graph.pageRank(tol = 0.0001)
val pageRankVertices = pageRankGraph.vertices

println("PageRank scores:")
pageRankVertices.join(graph.vertices).sortBy(_._2._1, ascending = false).collect().foreach {
  case (id, (rank, user)) =>
    println(f"${user.name}: $rank%.4f")
}

// Personalized PageRank (from specific source)
val personalizedPageRank = graph.personalizedPageRank(1L, tol = 0.0001)
println("\nPersonalized PageRank from Alice:")
personalizedPageRank.vertices.join(graph.vertices).sortBy(_._2._1, ascending = false).collect().foreach {
  case (id, (rank, user)) =>
    println(f"${user.name}: $rank%.4f")
}

// Custom PageRank implementation
def customPageRank[VD: ClassTag, ED: ClassTag](
  graph: Graph[VD, ED], 
  numIter: Int = 10, 
  alpha: Double = 0.15
): Graph[Double, ED] = {

  val initialGraph = graph.mapVertices((id, _) => 1.0)

  def iteration(g: Graph[Double, ED]): Graph[Double, ED] = {
    val ranks = g.aggregateMessages[Double](
      sendMsg = triplet => triplet.sendToDst(triplet.srcAttr / triplet.srcAttr),
      mergeMsg = _ + _
    )

    g.outerJoinVertices(ranks) { (id, oldRank, msgOpt) =>
      alpha + (1.0 - alpha) * msgOpt.getOrElse(0.0)
    }
  }

  (1 to numIter).foldLeft(initialGraph)((g, _) => iteration(g))
}

Shortest Path Algorithms

// Single Source Shortest Path (SSSP)
val shortestPaths = graph.shortestPaths(Seq(1L, 4L))
shortestPaths.vertices.collect().foreach { case (id, pathMap) =>
  val user = graph.vertices.lookup(id).head
  println(s"Shortest paths from ${user.name}:")
  pathMap.foreach { case (target, distance) =>
    val targetUser = graph.vertices.lookup(target).head
    println(s"  to ${targetUser.name}: $distance hops")
  }
}

// All pairs shortest paths (for small graphs)
def allPairsShortestPaths[VD: ClassTag](graph: Graph[VD, Double]): Graph[Map[VertexId, Double], Double] = {
  val landmarks = graph.vertices.map(_._1).collect()
  val initialGraph = graph.mapVertices { (id, _) =>
    Map(id -> 0.0) ++ landmarks.filter(_ != id).map(_ -> Double.PositiveInfinity).toMap
  }

  def iteration(g: Graph[Map[VertexId, Double], Double]): Graph[Map[VertexId, Double], Double] = {
    val messages = g.aggregateMessages[Map[VertexId, Double]](
      sendMsg = triplet => {
        val updates = triplet.srcAttr.map { case (landmark, dist) =>
          landmark -> (dist + triplet.attr)
        }
        triplet.sendToDst(updates)
      },
      mergeMsg = (map1, map2) => {
        (map1.keySet ++ map2.keySet).map { landmark =>
          landmark -> math.min(map1.getOrElse(landmark, Double.PositiveInfinity),
                              map2.getOrElse(landmark, Double.PositiveInfinity))
        }.toMap
      }
    )

    g.outerJoinVertices(messages) { (id, oldMap, msgOpt) =>
      msgOpt match {
        case Some(newMap) =>
          (oldMap.keySet ++ newMap.keySet).map { landmark =>
            landmark -> math.min(oldMap.getOrElse(landmark, Double.PositiveInfinity),
                                newMap.getOrElse(landmark, Double.PositiveInfinity))
          }.toMap
        case None => oldMap
      }
    }
  }

  // Run until convergence
  var currentGraph = initialGraph
  for (_ <- 1 to landmarks.length) {
    currentGraph = iteration(currentGraph)
  }
  currentGraph
}

// Bellman-Ford algorithm for weighted shortest paths
def bellmanFord[VD: ClassTag](
  graph: Graph[VD, Double], 
  sourceId: VertexId
): Graph[Double, Double] = {

  val initialGraph = graph.mapVertices { (id, _) =>
    if (id == sourceId) 0.0 else Double.PositiveInfinity
  }

  def iteration(g: Graph[Double, Double]): Graph[Double, Double] = {
    val messages = g.aggregateMessages[Double](
      sendMsg = triplet => {
        if (triplet.srcAttr != Double.PositiveInfinity) {
          triplet.sendToDst(triplet.srcAttr + triplet.attr)
        }
      },
      mergeMsg = math.min
    )

    g.outerJoinVertices(messages) { (id, oldDist, msgOpt) =>
      msgOpt match {
        case Some(newDist) => math.min(oldDist, newDist)
        case None => oldDist
      }
    }
  }

  val numVertices = graph.vertices.count().toInt
  (1 to numVertices - 1).foldLeft(initialGraph)((g, _) => iteration(g))
}

Clustering and Community Detection

// Label Propagation Algorithm (LPA)
val communities = graph.labelPropagation(maxSteps = 10)
communities.vertices.join(graph.vertices).collect().foreach { case (id, (community, user)) =>
  println(s"${user.name} belongs to community $community")
}

// Custom community detection using modularity
def louvainCommunityDetection[VD: ClassTag](
  graph: Graph[VD, Double], 
  maxIterations: Int = 10
): Graph[Long, Double] = {

  // Initialize each vertex as its own community
  var currentGraph = graph.mapVertices((id, _) => id)
  var improved = true
  var iteration = 0

  while (improved && iteration < maxIterations) {
    improved = false
    iteration += 1

    // For each vertex, consider moving to neighbor communities
    val messages = currentGraph.aggregateMessages[(Long, Double)](
      sendMsg = triplet => {
        // Send community and edge weight to neighbors
        triplet.sendToSrc((triplet.dstAttr, triplet.attr))
        triplet.sendToDst((triplet.srcAttr, triplet.attr))
      },
      mergeMsg = (a, b) => if (a._2 > b._2) a else b
    )

    val newCommunities = currentGraph.outerJoinVertices(messages) { (id, currentCommunity, msgOpt) =>
      msgOpt match {
        case Some((suggestedCommunity, weight)) =>
          if (suggestedCommunity != currentCommunity && weight > 0.5) {
            improved = true
            suggestedCommunity
          } else {
            currentCommunity
          }
        case None => currentCommunity
      }
    }

    currentGraph = newCommunities
  }

  currentGraph
}

// Triangle counting for clustering coefficient
val triangleCount = graph.triangleCount()
triangleCount.vertices.join(graph.vertices).collect().foreach { case (id, (triangles, user)) =>
  println(s"${user.name} participates in $triangles triangles")
}

// Clustering coefficient calculation
def clusteringCoefficient[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Double, ED] = {
  val triangles = graph.triangleCount()
  val degrees = graph.degrees

  triangles.vertices.join(degrees).mapValues { case (triangleCount, degree) =>
    if (degree >= 2) {
      2.0 * triangleCount / (degree * (degree - 1))
    } else {
      0.0
    }
  }.rightOuterJoin(graph.vertices).mapValues { case (coeffOpt, _) =>
    coeffOpt.getOrElse(0.0)
  }
}

Centrality Measures

// Betweenness centrality (approximate for large graphs)
def betweennessCentrality[VD: ClassTag](
  graph: Graph[VD, Double], 
  landmarks: Array[VertexId]
): Graph[Double, Double] = {

  // For each landmark, compute shortest paths and dependencies
  val centralityContributions = landmarks.map { landmark =>
    val sssp = graph.mapVertices { (id, _) =>
      if (id == landmark) 0.0 else Double.PositiveInfinity
    }

    // Run SSSP from landmark
    val distances = bellmanFord(graph.mapVertices((_, _) => ()), landmark)

    // Compute dependencies (simplified version)
    distances.vertices.map { case (id, dist) =>
      if (dist != Double.PositiveInfinity && id != landmark) {
        (id, 1.0 / distances.vertices.count())
      } else {
        (id, 0.0)
      }
    }
  }

  // Aggregate contributions
  val betweenness = centralityContributions.reduce { (a, b) =>
    a.union(b).reduceByKey(_ + _)
  }

  graph.outerJoinVertices(betweenness) { (id, attr, centralityOpt) =>
    centralityOpt.getOrElse(0.0)
  }
}

// Closeness centrality
def closenessCentrality[VD: ClassTag](graph: Graph[VD, Double]): Graph[Double, Double] = {
  val landmarks = graph.vertices.map(_._1).collect()
  val shortestPaths = allPairsShortestPaths(graph.mapVertices((_, _) => ()))

  shortestPaths.mapVertices { (id, pathMap) =>
    val totalDistance = pathMap.values.filter(_ != Double.PositiveInfinity).sum
    if (totalDistance > 0) {
      (pathMap.size - 1).toDouble / totalDistance
    } else {
      0.0
    }
  }
}

// Eigenvector centrality (simplified Power Iteration)
def eigenvectorCentrality[VD: ClassTag, ED: ClassTag](
  graph: Graph[VD, ED], 
  numIter: Int = 20
): Graph[Double, ED] = {

  val initialGraph = graph.mapVertices((id, _) => 1.0)

  def iteration(g: Graph[Double, ED]): Graph[Double, ED] = {
    val messages = g.aggregateMessages[Double](
      sendMsg = triplet => {
        triplet.sendToSrc(triplet.dstAttr)
        triplet.sendToDst(triplet.srcAttr)
      },
      mergeMsg = _ + _
    )

    val newScores = g.outerJoinVertices(messages) { (id, oldScore, msgOpt) =>
      msgOpt.getOrElse(0.0)
    }

    // Normalize
    val norm = math.sqrt(newScores.vertices.map(_._2).map(x => x * x).reduce(_ + _))
    newScores.mapVertices((id, score) => score / norm)
  }

  (1 to numIter).foldLeft(initialGraph)((g, _) => iteration(g))
}

Real-World Applications

Social Network Analysis

// Social network metrics
class SocialNetworkAnalyzer[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {

  def analyzeNetwork(): Map[String, Any] = {
    val numVertices = graph.vertices.count()
    val numEdges = graph.edges.count()
    val density = (2.0 * numEdges) / (numVertices * (numVertices - 1))

    val degreeStats = graph.degrees.map(_._2).stats()
    val components = graph.connectedComponents().vertices.map(_._2).distinct().count()

    Map(
      "vertices" -> numVertices,
      "edges" -> numEdges,
      "density" -> density,
      "avgDegree" -> degreeStats.mean,
      "maxDegree" -> degreeStats.max,
      "components" -> components
    )
  }

  def findInfluencers(topK: Int = 10): Array[(VertexId, Double)] = {
    val pageRank = graph.pageRank(0.0001)
    pageRank.vertices.top(topK)(Ordering.by(_._2))
  }

  def detectCommunities(): Graph[Long, ED] = {
    graph.labelPropagation(maxSteps = 20)
  }

  def analyzeInformation Diffusion(seedSet: Set[VertexId], threshold: Double = 0.5): Graph[Boolean, ED] = {
    var currentGraph = graph.mapVertices { (id, _) =>
      seedSet.contains(id)
    }

    var changed = true
    while (changed) {
      changed = false
      val messages = currentGraph.aggregateMessages[Double](
        sendMsg = triplet => {
          if (triplet.srcAttr) {
            triplet.sendToDst(1.0)
          }
        },
        mergeMsg = _ + _
      )

      val newAdoptions = currentGraph.outerJoinVertices(messages) { (id, adopted, msgOpt) =>
        if (!adopted && msgOpt.exists(_ / currentGraph.degrees.lookup(id).head >= threshold)) {
          changed = true
          true
        } else {
          adopted
        }
      }

      currentGraph = newAdoptions
    }

    currentGraph
  }
}

// Usage example
val socialNetwork = graph // Assume this is a social network
val analyzer = new SocialNetworkAnalyzer(socialNetwork)

val networkStats = analyzer.analyzeNetwork()
println(s"Network statistics: $networkStats")

val influencers = analyzer.findInfluencers(5)
println("Top influencers:")
influencers.foreach { case (id, score) =>
  val user = graph.vertices.lookup(id).headOption
  println(f"${user.map(_.name).getOrElse("Unknown")}: $score%.4f")
}

Recommendation System

// Collaborative filtering using graph-based approach
class GraphBasedRecommendation {

  case class Item(id: String, category: String, rating: Double)
  case class UserItemRating(rating: Double, timestamp: Long)

  def buildBipartiteGraph(
    userItems: RDD[(VertexId, VertexId, Double)]
  ): Graph[String, Double] = {

    val userVertices = userItems.map(_._1).distinct().map(id => (id, s"user_$id"))
    val itemVertices = userItems.map(_._2).distinct().map(id => (id + 1000000L, s"item_${id - 1000000L}"))
    val vertices = userVertices.union(itemVertices)

    val edges = userItems.map { case (userId, itemId, rating) =>
      Edge(userId, itemId + 1000000L, rating)
    }

    Graph(vertices, edges, "unknown")
  }

  def computeItemSimilarity(graph: Graph[String, Double]): Graph[Map[VertexId, Double], Double] = {
    // Compute item-item similarity using collaborative filtering
    val itemVertices = graph.vertices.filter(_._2.startsWith("item_"))

    itemVertices.cartesian(itemVertices).filter { case ((id1, _), (id2, _)) => id1 < id2 }
      .map { case ((id1, _), (id2, _)) =>
        val item1Users = graph.edges.filter(_.dstId == id1).map(e => (e.srcId, e.attr)).collect().toMap
        val item2Users = graph.edges.filter(_.dstId == id2).map(e => (e.srcId, e.attr)).collect().toMap

        val commonUsers = item1Users.keySet.intersect(item2Users.keySet)

        if (commonUsers.nonEmpty) {
          val similarity = commonUsers.map { userId =>
            item1Users(userId) * item2Users(userId)
          }.sum / (commonUsers.size * 5.0) // Normalize by max rating

          (id1, Map(id2 -> similarity))
        } else {
          (id1, Map.empty[VertexId, Double])
        }
      }
      .reduceByKey { (map1, map2) =>
        (map1.keySet ++ map2.keySet).map { key =>
          key -> (map1.getOrElse(key, 0.0) + map2.getOrElse(key, 0.0))
        }.toMap
      }
      .rightOuterJoin(graph.vertices)
      .mapValues { case (simOpt, _) => simOpt.getOrElse(Map.empty) }
  }

  def recommendItems(
    userId: VertexId, 
    graph: Graph[String, Double], 
    topK: Int = 10
  ): Array[(VertexId, Double)] = {

    val userRatings = graph.edges.filter(_.srcId == userId).map(e => (e.dstId, e.attr)).collect().toMap
    val itemSimilarity = computeItemSimilarity(graph)

    val recommendations = itemSimilarity.vertices
      .filter { case (itemId, _) => !userRatings.contains(itemId) }
      .map { case (itemId, similarities) =>
        val score = similarities.map { case (similarItemId, similarity) =>
          userRatings.get(similarItemId).map(_ * similarity).getOrElse(0.0)
        }.sum

        (itemId, score)
      }
      .filter(_._2 > 0)
      .top(topK)(Ordering.by(_._2))

    recommendations
  }
}

Fraud Detection

// Graph-based fraud detection
class FraudDetectionSystem {

  case class Transaction(amount: Double, timestamp: Long, transactionType: String)
  case class Account(accountType: String, creationDate: Long, riskScore: Double)

  def detectFraudulentCommunities(
    transactionGraph: Graph[Account, Transaction]
  ): Graph[Boolean, Transaction] = {

    // Detect suspicious patterns
    val suspiciousAccounts = transactionGraph.vertices.filter { case (id, account) =>
      account.riskScore > 0.8 || 
      (System.currentTimeMillis() - account.creationDate) < 86400000L // Less than 1 day old
    }.map(_._1).collect().toSet

    // Find communities with high fraud risk
    val communities = transactionGraph.labelPropagation(maxSteps = 10)

    val communityRisk = communities.vertices
      .map { case (accountId, communityId) =>
        val isSuspicious = suspiciousAccounts.contains(accountId)
        (communityId, if (isSuspicious) 1 else 0)
      }
      .reduceByKey(_ + _)
      .map { case (communityId, suspiciousCount) =>
        val totalInCommunity = communities.vertices.filter(_._2 == communityId).count()
        val riskRatio = suspiciousCount.toDouble / totalInCommunity
        (communityId, riskRatio > 0.3) // Mark community as fraudulent if >30% suspicious
      }
      .collect()
      .toMap

    // Mark accounts in fraudulent communities
    communities.vertices
      .map { case (accountId, communityId) =>
        (accountId, communityRisk.getOrElse(communityId, false))
      }
      .rightOuterJoin(transactionGraph.vertices)
      .mapValues { case (fraudOpt, _) => fraudOpt.getOrElse(false) }
  }

  def findSuspiciousPatterns(
    graph: Graph[Account, Transaction]
  ): RDD[(VertexId, String, Double)] = {

    // Pattern 1: Rapid money movement (velocity)
    val rapidMovement = graph.aggregateMessages[(Double, Int)](
      sendMsg = triplet => {
        val hoursDiff = (System.currentTimeMillis() - triplet.attr.timestamp) / 3600000.0
        if (hoursDiff <= 24) { // Within 24 hours
          triplet.sendToSrc((triplet.attr.amount, 1))
          triplet.sendToDst((triplet.attr.amount, 1))
        }
      },
      mergeMsg = (a, b) => (a._1 + b._1, a._2 + b._2)
    ).filter { case (id, (totalAmount, transactionCount)) =>
      totalAmount > 100000 && transactionCount > 50 // High volume and frequency
    }.map { case (id, (amount, count)) =>
      (id, "rapid_movement", amount / count)
    }

    // Pattern 2: Circular transactions
    val circularTransactions = graph.edges
      .map(edge => (edge.srcId, edge.dstId))
      .join(graph.edges.map(edge => (edge.dstId, edge.srcId)))
      .map { case (src, (dst, returnSrc)) =>
        if (returnSrc == src) (src, "circular", 1.0) else (src, "none", 0.0)
      }
      .filter(_._3 > 0)

    rapidMovement.union(circularTransactions)
  }
}

Performance Optimization and Best Practices

Graph Partitioning Strategies

// Custom partitioning for better performance
import org.apache.spark.graphx.PartitionStrategy

// Choose partitioning strategy based on graph characteristics
val partitionedGraph = graph.partitionBy(PartitionStrategy.EdgePartition2D, numPartitions = 8)

// Custom partitioner for domain-specific optimization
class CommunityPartitioner(numPartitions: Int, communityAssignment: Map[VertexId, Int]) 
  extends org.apache.spark.Partitioner {

  override def numPartitions: Int = numPartitions

  override def getPartition(key: Any): Int = {
    key match {
      case vertexId: VertexId => 
        communityAssignment.getOrElse(vertexId, vertexId.hashCode() % numPartitions)
      case _ => key.hashCode() % numPartitions
    }
  }
}

// Graph-specific caching strategies
def optimizeGraphPerformance[VD: ClassTag, ED: ClassTag](
  graph: Graph[VD, ED]
): Graph[VD, ED] = {

  // Cache frequently accessed components
  graph.vertices.cache()
  graph.edges.cache()

  // Materialize the graph to avoid recomputation
  graph.vertices.count()
  graph.edges.count()

  graph
}

// Memory-efficient processing for large graphs
def processLargeGraph[VD: ClassTag, ED: ClassTag](
  graph: Graph[VD, ED],
  batchSize: Int = 1000
): Unit = {

  val vertexIds = graph.vertices.map(_._1).collect()

  vertexIds.grouped(batchSize).foreach { batch =>
    val subgraph = graph.subgraph(vpred = (id, _) => batch.contains(id))

    // Process subgraph
    val localResults = processSubgraph(subgraph)

    // Save or aggregate results
    saveResults(localResults)

    // Clean up
    subgraph.unpersist()
  }
}

def processSubgraph[VD: ClassTag, ED: ClassTag](subgraph: Graph[VD, ED]): Array[(VertexId, Double)] = {
  // Implement specific processing logic
  subgraph.vertices.map { case (id, _) => (id, 1.0) }.collect()
}

def saveResults(results: Array[(VertexId, Double)]): Unit = {
  // Save results to storage
  println(s"Processed ${results.length} vertices")
}

Monitoring and Debugging

// Graph statistics and monitoring
class GraphMonitor[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {

  def collectStatistics(): Map[String, Any] = {
    val vertexCount = graph.vertices.count()
    val edgeCount = graph.edges.count()
    val tripletCount = graph.triplets.count()

    val degreeStats = graph.degrees.map(_._2).stats()
    val edgeStats = graph.edges.map(_ => 1).stats()

    Map(
      "vertices" -> vertexCount,
      "edges" -> edgeCount,
      "triplets" -> tripletCount,
      "avgDegree" -> degreeStats.mean,
      "maxDegree" -> degreeStats.max,
      "minDegree" -> degreeStats.min,
      "densityRatio" -> (2.0 * edgeCount) / (vertexCount * (vertexCount - 1))
    )
  }

  def validateGraph(): Array[String] = {
    val issues = scala.collection.mutable.ArrayBuffer[String]()

    // Check for orphaned vertices
    val vertexIds = graph.vertices.map(_._1).collect().toSet
    val edgeVertexIds = graph.edges.flatMap(e => Array(e.srcId, e.dstId)).distinct().collect().toSet
    val orphanedVertices = vertexIds.diff(edgeVertexIds)

    if (orphanedVertices.nonEmpty) {
      issues += s"Found ${orphanedVertices.size} orphaned vertices"
    }

    // Check for self-loops
    val selfLoops = graph.edges.filter(e => e.srcId == e.dstId).count()
    if (selfLoops > 0) {
      issues += s"Found $selfLoops self-loops"
    }

    // Check for duplicate edges
    val totalEdges = graph.edges.count()
    val uniqueEdges = graph.edges.map(e => (e.srcId, e.dstId)).distinct().count()
    if (totalEdges != uniqueEdges) {
      issues += s"Found ${totalEdges - uniqueEdges} duplicate edges"
    }

    issues.toArray
  }

  def profilePerformance(operation: () => Unit): Map[String, Long] = {
    val startTime = System.currentTimeMillis()
    val startMemory = Runtime.getRuntime.totalMemory() - Runtime.getRuntime.freeMemory()

    operation()

    val endTime = System.currentTimeMillis()
    val endMemory = Runtime.getRuntime.totalMemory() - Runtime.getRuntime.freeMemory()

    Map(
      "executionTime" -> (endTime - startTime),
      "memoryUsed" -> (endMemory - startMemory)
    )
  }
}

// Usage
val monitor = new GraphMonitor(graph)
val stats = monitor.collectStatistics()
println(s"Graph statistics: $stats")

val issues = monitor.validateGraph()
if (issues.nonEmpty) {
  println(s"Graph validation issues: ${issues.mkString(", ")}")
}

val performance = monitor.profilePerformance(() => {
  graph.pageRank(0.0001).vertices.count()
})
println(s"PageRank performance: $performance")

Conclusion

GraphX provides a powerful framework for large-scale graph processing and network analysis. Key capabilities include:

Core Features:

  • Distributed graph storage and processing
  • Rich set of graph algorithms and transformations
  • Integration with Spark ecosystem
  • Property graphs with vertex and edge attributes

Algorithm Support:

  • Centrality measures (PageRank, Betweenness, Closeness)
  • Community detection and clustering
  • Shortest path algorithms
  • Triangle counting and graph metrics

Real-World Applications:

  • Social network analysis and influence detection
  • Recommendation systems and collaborative filtering
  • Fraud detection and risk analysis
  • Network topology analysis

Performance Optimizations:

  • Strategic graph partitioning
  • Efficient caching and persistence
  • Memory management for large graphs
  • Batch processing for scalability

Best Practices:

  • Choose appropriate partitioning strategies
  • Cache frequently accessed graph components
  • Monitor graph statistics and validate data quality
  • Use appropriate algorithms for specific use cases
  • Consider graph characteristics when optimizing

GraphX excels in scenarios requiring analysis of relationships and connectivity patterns, making it invaluable for applications in social media, finance, telecommunications, and any domain where understanding network structure and dynamics is crucial.