Skip to content

Latest commit

 

History

History
81 lines (65 loc) · 3.12 KB

File metadata and controls

81 lines (65 loc) · 3.12 KB

关联操作

  在许多情况下,有必要将外部数据加入到图中。例如,我们可能有额外的用户属性需要合并到已有的图中或者我们可能想从一个图中取出顶点特征加入到另外一个图中。这些任务可以用join操作完成。 主要的join操作如下所示。

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}

  joinVertices操作join输入RDD和顶点,返回一个新的带有顶点特征的图。这些特征是通过在连接顶点的结果上使用用户定义的map函数获得的。没有匹配的顶点保留其原始值。 下面详细地来分析这两个函数。

1 joinVertices

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
    : Graph[VD, ED] = {
    val uf = (id: VertexId, data: VD, o: Option[U]) => {
      o match {
        case Some(u) => mapFunc(id, data, u)
        case None => data
      }
    }
    graph.outerJoinVertices(table)(uf)
  }

  我们可以看到,joinVertices的实现是通过outerJoinVertices来实现的。这是因为join本来就是outer join的一种特例。

2 outerJoinVertices

override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
      (other: RDD[(VertexId, U)])
      (updateF: (VertexId, VD, Option[U]) => VD2)
      (implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
    if (eq != null) {
      vertices.cache()
      // updateF preserves type, so we can use incremental replication
      val newVerts = vertices.leftJoin(other)(updateF).cache()
      val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
      val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
        .updateVertices(changedVerts)
      new GraphImpl(newVerts, newReplicatedVertexView)
    } else {
      // updateF does not preserve type, so we must re-replicate all vertices
      val newVerts = vertices.leftJoin(other)(updateF)
      GraphImpl(newVerts, replicatedVertexView.edges)
    }
  }

  通过以上的代码我们可以看到,如果updateF不改变类型,我们只需要创建改变的顶点即可,否则我们要重新创建所有的顶点。我们讨论不改变类型的情况。 这种情况分三步。

  • 1 修改顶点属性值
val newVerts = vertices.leftJoin(other)(updateF).cache()

  这一步会用顶点RDD join 传入的RDD,然后用updateF作用joinRDD中的所有顶点,改变它们的值。

  • 2 找到发生改变的顶点
  val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
  • 3 更新newReplicatedVertexView中边分区中的顶点属性
val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
        .updateVertices(changedVerts)

  第2、3两步的源码已经在转换操作中详细介绍。