From 6904f0136f16e9a23bb702b9b59c7775cc764da5 Mon Sep 17 00:00:00 2001 From: lizihan Date: Tue, 16 Aug 2022 18:21:01 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E6=9D=8E=E5=AD=90=E6=B6=B5=E4=B8=80?= =?UTF-8?q?=E9=98=B6=E6=AE=B5=E6=89=80=E5=AE=8C=E6=88=90=E7=9A=84struc2vec?= =?UTF-8?q?=E7=AE=97=E6=B3=95=E7=9A=84=E4=BB=A3=E7=A0=81=E5=B7=A5=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../angel/graph/embedding/struc2vec/DTW.scala | 50 ++++++++++++++ .../graph/embedding/struc2vec/Floyd.scala | 16 +++++ .../graph/embedding/struc2vec/MainEnter.scala | 35 ++++++++++ .../angel/graph/embedding/struc2vec/README.md | 23 +++++++ .../struc2vec/StructureSimilarity.scala | 65 +++++++++++++++++++ .../embedding/struc2vec/test/TestDTW.scala | 13 ++++ .../embedding/struc2vec/test/TestFloyd.scala | 30 +++++++++ .../struc2vec/test/TestStructSimi.scala | 36 ++++++++++ 8 files changed, 268 insertions(+) create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/DTW.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/Floyd.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/MainEnter.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/README.md create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/StructureSimilarity.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestDTW.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestFloyd.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestStructSimi.scala diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/DTW.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/DTW.scala new file mode 100644 index 000000000..203c451ef --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/DTW.scala @@ -0,0 +1,50 @@ +package com.tencent.angel.graph.embedding.struc2vec + +object DTW { + def compute(a:Array[Int],b:Array[Int]):Double = { + + // DTW(Dynamic Time Warping) + // a fundamental implementation based on dynamic programming + // dp[i][j] = Dist(i,j) + min{dp[i-1][j],dp[i][j-1],dp[i-1][j-1]} + + if(a.length==0||b.length==0) + Double.NaN + else{ + val dp: Array[Array[Double]] = Array.ofDim(a.length, b.length) + + // set the boundary condition + // the first column, dp[i][0] = Dist(i,0)+ dp[i-1][0]; + + dp(0)(0) = dist(a(0),b(0)) + for(i <- 1 until a.length) + dp(i)(0) = dp(i-1)(0)+dist(a(i),b(0)) + + // the first row, dp[0][j] = Dist(0,j)+ dp[0][j-1]; + + for (j <- 1 until b.length) + dp(0)(j) = dp(0)(j - 1) + dist(b(j), a(0)) + + // fill up the whole dp matrix from left to right, from top to bottom + + for(i<-1 until a.length;j<- 1 until b.length) + dp(i)(j) = dist(a(i),b(j))+minAmongThree(dp(i-1)(j),dp(i)(j-1),dp(i-1)(j-1)) + + dp(a.length-1)(b.length-1) + } + } + + def dist(a:Int,b:Int):Double = { + // Assume that a, b are both positive (the node degrees ) + Math.max(a,b)/Math.min(a,b)-1 + } + + def minAmongThree(a:Double,b:Double,c:Double):Double = { + if(adiam){ + + }else if(k==0){ + for(i<- 0 until n; j<- 0 until n) + structSimi(0)(i)(j) = DTW.compute(getHopRingK(i,0),getHopRingK(j,0)) + }else{ + for(i<-0 until n; j<- 0 until n){ + if (structSimi(k-1)(i)(j)==Double.NaN) + structSimi(k)(i)(j) = Double.NaN + else{ + val temp: Double = DTW.compute(getHopRingK(i,k),getHopRingK(j,k)) + if(temp == Double.NaN) + structSimi(k)(i)(j) = Double.NaN + else + structSimi(k)(i)(j) = structSimi(k-1)(i)(j)+temp + } + } + } + } + + def getHopRingK(node:Int,k:Int): Array[Int] = { + // return the ordered degree array s(R_k(node)) + if (node < 0 || node > n - 1 || k < 0 || k > diam) + Array[Int]() + else{ + val result = ArrayBuffer[Int]() + for(j<- 0 until n) + if(hopCountResult(node)(j)==k) + result.append(degrees(j)) + result.sorted.toArray + } + } + +} diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestDTW.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestDTW.scala new file mode 100644 index 000000000..77ea3519e --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestDTW.scala @@ -0,0 +1,13 @@ +package com.tencent.angel.graph.embedding.struc2vec.test + +import com.tencent.angel.graph.embedding.struc2vec.DTW + +object TestDTW { + def main(args: Array[String]): Unit = { + // val a = Array(1, 1, 1, 1, 2, 2, 3, 3, 4, 3, 2, 2, 1, 1, 1, 1) + // val b = Array(1, 1, 2, 2, 3, 3, 4, 4, 4, 4, 3, 3, 2, 2, 1, 1) + // println(DTW.compute(a, b)) + println(DTW.compute(Array(1, 1), Array(2))) + } + +} diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestFloyd.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestFloyd.scala new file mode 100644 index 000000000..c47818378 --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestFloyd.scala @@ -0,0 +1,30 @@ +package com.tencent.angel.graph.embedding.struc2vec.test +import com.tencent.angel.graph.embedding.struc2vec.Floyd + + +object TestFloyd { + val INF = Floyd.INF + def main(args: Array[String]): Unit = { + val len: Int = 3 + // initialize adjacency matrix + val adjMatrix = Array.ofDim[Int](len, len) + adjMatrix(0)(0) = 0 + adjMatrix(0)(1) = 1 + adjMatrix(0)(2) = INF + adjMatrix(1)(0) = 1 + adjMatrix(1)(1) = 0 + adjMatrix(1)(2) = 1 + adjMatrix(2)(0) = INF + adjMatrix(2)(1) = 1 + adjMatrix(2)(2) = 0 + + val res = Floyd.hopCount(adjMatrix) + for (i <- 0 to res.length - 1; j <- 0 to res(0).length - 1) { + println("Element " + i + j + " = " + res(i)(j)) + } + + + + + } +} diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestStructSimi.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestStructSimi.scala new file mode 100644 index 000000000..92e21b363 --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestStructSimi.scala @@ -0,0 +1,36 @@ +package com.tencent.angel.graph.embedding.struc2vec.test + +import com.tencent.angel.graph.embedding.struc2vec._ + +object TestStructSimi { + val INF = Floyd.INF + def main(args:Array[String]): Unit ={ + val adjMatrix = Array(Array(0,1,INF),Array(1,0,1),Array(INF,1,0)) + val struct_simi = new StructureSimilarity(adjMatrix) + val diam = struct_simi.diam + val degrees = struct_simi.degrees + val hc = struct_simi.hopCountResult + val ring = struct_simi.getHopRingK(1,1) + println(diam) + for (i <- 0 to hc.length - 1; j <- 0 to hc(0).length - 1) { + println("Element " + i + j + " = " + hc(i)(j)) + } + + println(struct_simi.getHopRingK(0,0)(0)) + println(struct_simi.getHopRingK(1,0)(0)) + +// println(DTW.compute(struct_simi.getHopRingK(0,0),struct_simi.getHopRingK(0,0))) + +// for (i <- 0 to degrees.length - 1) +// println("Element " + i + " = " + degrees(i)) + +// for (i <- 0 to ring.length - 1) +// println("ring Element " + i + " = " + ring(i)) + +// println(ring.length) + + + + } + +} From 941cc7aa8b3cd0af1eda0b8f5d18c39b3a69b02d Mon Sep 17 00:00:00 2001 From: ZihanLi Date: Thu, 25 Aug 2022 22:34:18 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E5=B0=86=E7=9B=B8=E4=BC=BC=E5=BA=A6?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=E9=83=A8=E5=88=86=E5=B5=8C=E5=85=A5spark?= =?UTF-8?q?=E6=A1=86=E6=9E=B6=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data/output/output1/_SUCCESS | 0 ...89a38-c974-4d33-a020-d83060e18bda-c000.csv | 34 ++++ ...8cd3b-db23-44fe-a3c5-47048c8533dd-c000.csv | 34 ++++ .../embedding/struc2vec/test/Struc2Vec.scala | 153 ++++++++++++++++++ .../struc2vec/test/Struc2VecExample.scala | 74 +++++++++ 5 files changed, 295 insertions(+) create mode 100644 data/output/output1/_SUCCESS create mode 100644 data/output/output1/part-00000-4ba89a38-c974-4d33-a020-d83060e18bda-c000.csv create mode 100644 data/output/output1/part-00000-c068cd3b-db23-44fe-a3c5-47048c8533dd-c000.csv create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2Vec.scala create mode 100644 spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala diff --git a/data/output/output1/_SUCCESS b/data/output/output1/_SUCCESS new file mode 100644 index 000000000..e69de29bb diff --git a/data/output/output1/part-00000-4ba89a38-c974-4d33-a020-d83060e18bda-c000.csv b/data/output/output1/part-00000-4ba89a38-c974-4d33-a020-d83060e18bda-c000.csv new file mode 100644 index 000000000..bbc9bcd7a --- /dev/null +++ b/data/output/output1/part-00000-4ba89a38-c974-4d33-a020-d83060e18bda-c000.csv @@ -0,0 +1,34 @@ +20 32 31 24 25 24 27 24 25 31 +13 2 32 2 0 12 0 5 16 6 +19 1 21 1 2 9 33 8 2 8 +21 1 19 0 17 1 3 1 2 8 +4 10 0 8 0 13 0 5 16 5 +15 32 31 32 14 33 8 32 2 1 +16 5 6 4 10 4 0 5 16 6 +22 33 22 32 14 33 18 32 15 33 +28 2 28 33 8 2 1 19 33 28 +29 33 31 24 25 24 27 24 25 31 +25 24 27 2 28 31 33 26 29 33 +11 0 17 0 2 27 33 13 2 13 +30 32 30 32 29 26 29 26 29 33 +32 15 33 27 2 27 33 26 29 33 +27 23 33 23 27 24 27 33 15 33 +0 4 0 31 25 24 27 2 8 32 +24 31 28 31 25 31 28 31 25 24 +14 33 23 25 23 27 24 25 31 24 +33 9 2 7 1 0 13 3 13 1 +23 27 24 31 25 31 28 31 25 31 +1 17 0 21 1 30 32 2 28 31 +6 0 11 0 8 33 8 30 32 31 +17 0 13 3 1 30 32 15 33 32 +3 13 3 12 3 1 3 12 0 19 +7 0 11 0 10 5 6 16 6 16 +12 0 4 6 16 5 6 4 10 0 +8 2 9 33 15 33 13 2 28 31 +9 33 31 32 23 29 26 33 32 22 +18 33 31 32 23 29 23 29 33 32 +31 28 2 1 30 8 30 1 3 2 +26 33 32 33 9 2 13 0 2 9 +10 4 6 16 6 5 0 19 1 0 +5 0 7 2 7 0 6 5 16 5 +2 0 21 1 0 6 16 6 0 12 diff --git a/data/output/output1/part-00000-c068cd3b-db23-44fe-a3c5-47048c8533dd-c000.csv b/data/output/output1/part-00000-c068cd3b-db23-44fe-a3c5-47048c8533dd-c000.csv new file mode 100644 index 000000000..c9f7a67f2 --- /dev/null +++ b/data/output/output1/part-00000-c068cd3b-db23-44fe-a3c5-47048c8533dd-c000.csv @@ -0,0 +1,34 @@ +20 33 20 32 20 32 23 25 23 27 +13 0 7 0 11 0 31 24 27 23 +19 33 20 33 18 33 19 0 17 0 +21 1 21 1 3 13 2 32 23 27 +4 10 0 12 0 21 1 7 0 13 +15 33 14 32 2 7 2 32 15 32 +16 6 16 6 5 0 10 5 0 13 +22 33 20 33 18 32 23 25 31 0 +28 33 14 32 2 7 2 32 15 32 +29 26 33 13 33 28 2 9 2 0 +25 23 25 23 32 33 20 32 8 30 +11 0 7 0 10 5 16 6 4 0 +30 1 21 0 12 3 1 3 0 13 +32 23 25 23 32 30 1 3 0 13 +27 33 15 33 19 33 20 33 18 33 +0 19 33 28 33 9 33 20 32 15 +24 27 23 32 20 32 29 23 33 30 +14 33 30 32 8 2 27 24 27 2 +33 32 22 32 8 2 7 2 8 30 +23 27 23 25 24 27 24 31 32 15 +1 30 1 3 12 0 8 30 33 20 +6 4 0 31 33 23 33 14 33 14 +17 1 13 1 13 1 13 33 19 33 +3 2 13 33 32 31 25 31 24 27 +7 2 13 1 7 2 9 33 30 1 +12 0 19 33 14 32 8 30 32 22 +8 32 22 32 31 28 33 23 25 24 +9 33 8 0 8 32 31 24 31 28 +18 32 22 32 31 28 31 24 31 28 +31 25 24 27 23 32 31 24 25 24 +26 33 32 31 32 2 27 2 27 33 +10 4 0 17 1 17 0 12 3 0 +5 0 4 0 19 33 26 29 26 29 +2 27 24 25 23 29 23 33 26 29 diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2Vec.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2Vec.scala new file mode 100644 index 000000000..d43511284 --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2Vec.scala @@ -0,0 +1,153 @@ +package com.tencent.angel.graph.embedding.struc2vec.test + +import com.tencent.angel.graph.data.neighbor.NeighborDataOps +import com.tencent.angel.graph.embedding.struc2vec.StructureSimilarity +import com.tencent.angel.graph.utils.{GraphIO, Stats} +import com.tencent.angel.graph.utils.params.{HasArrayBoundsPath, HasBalancePartitionPercent, HasBatchSize, HasDstNodeIdCol, HasEpochNum, HasIsWeighted, HasMaxIteration, HasNeedReplicaEdge, HasOutputCoreIdCol, HasOutputNodeIdCol, HasPSPartitionNum, HasPartitionNum, HasSrcNodeIdCol, HasStorageLevel, HasUseBalancePartition, HasUseEdgeBalancePartition, HasWalkLength, HasWeightCol} +import com.tencent.angel.spark.context.PSContext +import org.apache.spark.SparkContext +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.util.Identifiable +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer + +class Struc2Vec(override val uid: String) extends Transformer + with HasSrcNodeIdCol with HasDstNodeIdCol with HasOutputNodeIdCol with HasOutputCoreIdCol + with HasStorageLevel with HasPartitionNum with HasPSPartitionNum with HasMaxIteration + with HasBatchSize with HasArrayBoundsPath with HasIsWeighted with HasWeightCol with HasUseBalancePartition + with HasNeedReplicaEdge with HasUseEdgeBalancePartition with HasWalkLength with HasEpochNum with HasBalancePartitionPercent { + private var output: String = _ + + def this() = this(Identifiable.randomUID("Struc2Vec")) + + def setOutputDir(in: String): Unit = { + output = in + } + + override def transform(dataset: Dataset[_]): DataFrame = { + + //create origin edges RDD and data preprocessing + + val rawEdges = NeighborDataOps.loadEdgesWithWeight(dataset, $(srcNodeIdCol), $(dstNodeIdCol), $(weightCol), $(isWeighted), $(needReplicaEdge), true, false, false) + rawEdges.repartition($(partitionNum)).persist(StorageLevel.DISK_ONLY) + val (minId, maxId, numEdges) = Stats.summarizeWithWeight(rawEdges) + println(s"minId=$minId maxId=$maxId numEdges=$numEdges level=${$(storageLevel)}") + + val edges = rawEdges.map { case (src, dst, w) => (src, (dst, w)) } + val rmWeight = rawEdges.map{case (src, dst, w) => (src, dst)} + + // create adjacency table + val adjTable = rmWeight.groupByKey() + + // create adjacency matrix + val len:Int = adjTable.count().toInt + val adjArray = adjTable.collect() + val adjMatrix: Array[Array[Int]] = Array.ofDim(len,len) + + // initialize adjMatrix + for (i <- 0 until len; j<- 0 until len) + if(i==j) + adjMatrix(i)(j) = 0 + else + adjMatrix(i)(j) = Int.MaxValue + + for(item <- adjArray){ + for(j<-item._2) + adjMatrix(item._1.toInt)(j.toInt) = 1 + } + + val structureSimilarity = new StructureSimilarity(adjMatrix) + +// val diam = structureSimilarity.diam +// println(s"diam = $diam") +// val hop = structureSimilarity.hopCountResult +// val degrees = structureSimilarity.degrees + + structureSimilarity.compute() + + // get the structure similarity network + val simi = structureSimilarity.structSimi + +// println("Layer1: ") +// for (i <- 0 to simi(0).length - 1; j <- 0 to simi(0)(0).length - 1) { +// print((i, j) + " = " + simi(0)(i)(j) + " ") +// } +// println("Layer5: ") +// for (i <- 0 to simi(4).length - 1; j <- 0 to simi(0)(0).length - 1) { +// print((i, j) + " = " + simi(4)(i)(j) + " ") +// } + +// println("Degrees: ") +// for (i <- 0 to degrees.length - 1) { +// print("node"+ i + " = " + degrees(i)+ " ") +// } + +// val fn =adjTable.take(17)(16)._2 +// val temp = adjArray(0)._2 +// val cnt = adjTable.count() +// val ele = adjMatrix(32)(33) +// println(s"get = $ele count = $cnt") + + dataset.sparkSession.createDataFrame(edges.map(x => Row(x)), transformSchema(dataset.schema)) + + } + + + override def transformSchema(schema: StructType): StructType = { + StructType(Seq(StructField("path",StringType, nullable = false))) + } + + + override def copy(extra: ParamMap): Transformer = defaultCopy(extra) + + + +} + +object Struc2Vec { + def calcAliasTable(partId: Int, iter: Iterator[(Long, Array[(Long, Float)])]): Iterator[(Long, Array[Long], Array[Float], Array[Int])] = { + iter.map { case (src, neighbors) => + val (events, weights) = neighbors.unzip + val weightsSum = weights.sum + val len = weights.length + val areaRatio = weights.map(_ / weightsSum * len) + val (accept, alias) = createAliasTable(areaRatio) + (src, events, accept, alias) + } + } + + def createAliasTable(areaRatio: Array[Float]): (Array[Float], Array[Int]) = { + val len = areaRatio.length + val small = ArrayBuffer[Int]() + val large = ArrayBuffer[Int]() + val accept = Array.fill(len)(0f) + val alias = Array.fill(len)(0) + + for (idx <- areaRatio.indices) { + if (areaRatio(idx) < 1.0) small.append(idx) else large.append(idx) + } + while (small.nonEmpty && large.nonEmpty) { + val smallIndex = small.remove(small.size - 1) + val largeIndex = large.remove(large.size - 1) + accept(smallIndex) = areaRatio(smallIndex) + alias(smallIndex) = largeIndex + areaRatio(largeIndex) = areaRatio(largeIndex) - (1 - areaRatio(smallIndex)) + if (areaRatio(largeIndex) < 1.0) small.append(largeIndex) else large.append(largeIndex) + } + while (small.nonEmpty) { + val smallIndex = small.remove(small.size - 1) + accept(smallIndex) = 1 + } + + while (large.nonEmpty) { + val largeIndex = large.remove(large.size - 1) + accept(largeIndex) = 1 + } + (accept, alias) + } + +} \ No newline at end of file diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala new file mode 100644 index 000000000..054809024 --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala @@ -0,0 +1,74 @@ +package com.tencent.angel.graph.embedding.struc2vec.test + +import com.tencent.angel.conf.AngelConf +import com.tencent.angel.graph.embedding.struc2vec.test.Struc2Vec +import com.tencent.angel.graph.utils.GraphIO +import com.tencent.angel.spark.context.PSContext +import org.apache.spark.storage.StorageLevel +import org.apache.spark.{SparkConf, SparkContext} + +import scala.collection.mutable.ArrayBuffer + +object Struc2VecExample { + def main(args: Array[String]): Unit = { + val input = "data/bc/karate_club_network.txt" + val storageLevel = StorageLevel.fromString("MEMORY_ONLY") + val batchSize = 10 + val output = "data/output/output1" + val srcIndex = 0 + val dstIndex = 1 + val weightIndex = 2 + val psPartitionNum = 1 + val partitionNum = 1 + val useEdgeBalancePartition = false + val isWeighted = false + val needReplicateEdge = true + + val sep = " " + val walkLength = 10 + + + start() + + val struc2Vec = new Struc2Vec() + .setStorageLevel(storageLevel) + .setPSPartitionNum(psPartitionNum) + .setSrcNodeIdCol("src") + .setDstNodeIdCol("dst") + .setWeightCol("weight") + .setBatchSize(batchSize) + .setWalkLength(walkLength) + .setPartitionNum(partitionNum) + .setIsWeighted(isWeighted) + .setNeedReplicaEdge(needReplicateEdge) + .setUseEdgeBalancePartition(useEdgeBalancePartition) + .setEpochNum(2) + + struc2Vec.setOutputDir(output) + val df = GraphIO.load(input, isWeighted = isWeighted, srcIndex, dstIndex, weightIndex, sep = sep) + val mapping = struc2Vec.transform(df) + + mapping.show() + + stop() + } + + def start(mode: String = "local[4]"): Unit = { + val conf = new SparkConf() + conf.setMaster(mode) + conf.setAppName("Struc2Vec") + conf.set(AngelConf.ANGEL_PSAGENT_UPDATE_SPLIT_ADAPTION_ENABLE, "false") + val sc = new SparkContext(conf) + sc.setLogLevel("ERROR") + sc.setCheckpointDir("data/cp") + //PSContext.getOrCreate(sc) + } + + def stop(): Unit = { + + } + +} + + + From 46ac3f0bce7bc7c89a8ba8828bd918c3f32f5fce Mon Sep 17 00:00:00 2001 From: lizihan Date: Sun, 4 Sep 2022 12:42:12 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E5=B0=86=E7=94=9F=E6=88=90=E5=A4=9A?= =?UTF-8?q?=E5=B1=82=E6=9D=83=E5=80=BC=E7=BD=91=E7=BB=9C=E6=A8=A1=E5=9D=97?= =?UTF-8?q?=E4=BB=A5=E5=8F=8A=E8=BF=9B=E8=A1=8Cbiased-random-walk=E9=87=87?= =?UTF-8?q?=E6=A0=B7=E7=94=9F=E6=88=90node-context=E6=A8=A1=E5=9D=97?= =?UTF-8?q?=E5=B5=8C=E5=85=A5spark=E6=A1=86=E6=9E=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../struc2vec/StructureSimilarity.scala | 5 +- .../embedding/struc2vec/test/Struc2Vec.scala | 160 ++++++++++++++++-- .../struc2vec/test/Struc2VecExample.scala | 3 +- 3 files changed, 153 insertions(+), 15 deletions(-) diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/StructureSimilarity.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/StructureSimilarity.scala index 9190beeb6..92d171f8f 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/StructureSimilarity.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/StructureSimilarity.scala @@ -36,11 +36,12 @@ class StructureSimilarity(adjMatrix:Array[Array[Int]]) { structSimi(0)(i)(j) = DTW.compute(getHopRingK(i,0),getHopRingK(j,0)) }else{ for(i<-0 until n; j<- 0 until n){ - if (structSimi(k-1)(i)(j)==Double.NaN) + // notice that NaN == NaN (false!) + if (structSimi(k-1)(i)(j).isNaN) structSimi(k)(i)(j) = Double.NaN else{ val temp: Double = DTW.compute(getHopRingK(i,k),getHopRingK(j,k)) - if(temp == Double.NaN) + if(temp.isNaN) structSimi(k)(i)(j) = Double.NaN else structSimi(k)(i)(j) = structSimi(k-1)(i)(j)+temp diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2Vec.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2Vec.scala index d43511284..2844e4c96 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2Vec.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2Vec.scala @@ -1,20 +1,25 @@ package com.tencent.angel.graph.embedding.struc2vec.test +import breeze.stats.distributions.AliasTable import com.tencent.angel.graph.data.neighbor.NeighborDataOps import com.tencent.angel.graph.embedding.struc2vec.StructureSimilarity +import com.tencent.angel.graph.embedding.struc2vec.test.Struc2Vec.{addDstAndWeights, addSrcAndLayer, calcAverageLayerWeights, calcCrossLayerWeights, sumLayerWeights,process} import com.tencent.angel.graph.utils.{GraphIO, Stats} import com.tencent.angel.graph.utils.params.{HasArrayBoundsPath, HasBalancePartitionPercent, HasBatchSize, HasDstNodeIdCol, HasEpochNum, HasIsWeighted, HasMaxIteration, HasNeedReplicaEdge, HasOutputCoreIdCol, HasOutputNodeIdCol, HasPSPartitionNum, HasPartitionNum, HasSrcNodeIdCol, HasStorageLevel, HasUseBalancePartition, HasUseEdgeBalancePartition, HasWalkLength, HasWeightCol} import com.tencent.angel.spark.context.PSContext -import org.apache.spark.SparkContext +import org.apache.spark.{SPARK_BRANCH, SparkContext} import org.apache.spark.ml.Transformer import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.util.Identifiable +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} -import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.types.{StringType, StructField, StructType, _} import org.apache.spark.storage.StorageLevel +import java.util.Random import scala.collection.mutable.ArrayBuffer + class Struc2Vec(override val uid: String) extends Transformer with HasSrcNodeIdCol with HasDstNodeIdCol with HasOutputNodeIdCol with HasOutputCoreIdCol with HasStorageLevel with HasPartitionNum with HasPSPartitionNum with HasMaxIteration @@ -69,18 +74,60 @@ class Struc2Vec(override val uid: String) extends Transformer structureSimilarity.compute() - // get the structure similarity network + // get the structure similarity distance network val simi = structureSimilarity.structSimi + val diam = structureSimilarity.diam + + // create multilayer weighted graph RDD + val multilayerRDD = adjTable.map(x => x._1.toInt).sortBy(x => x).flatMap(x => addSrcAndLayer(x,simi.length)) + val multilayerGraph = multilayerRDD.map(x => addDstAndWeights(x._1,x._2,simi)).filter(x=>x._2.length!=0) + + // create layer average weight array + val layerAverageWeight = multilayerGraph.map(x=>sumLayerWeights(x)).groupByKey().map(x=>calcAverageLayerWeights(x)).sortByKey().collect() + + // create cross layer weight(level up weights) + val crossLayerWeights = multilayerGraph.map(x=>calcCrossLayerWeights(x,layerAverageWeight)) + val crossArray = crossLayerWeights.collect() + + // create the alias table + val aliasTable = multilayerGraph + .mapPartitionsWithIndex { case (partId, iter) => + Struc2Vec.calcAliasTable(partId, iter) + } + val aliasArray = aliasTable.collect() + + val temp = aliasTable.filter(x => x._1==0&&x._2==0).collect()(0) + println(s"temp: ${temp._3.length}") + + + // generate context for nodes + // create the raw sample path rdd, (epochNum,src) + val rawPathRDD = adjTable.map(x => x._1.toInt).sortBy(x => x).flatMap(x => addSrcAndLayer(x,$(epochNum))) + val pathRDD = rawPathRDD.map{case (epochNum,src) => (src,epochNum) }.map(x => process(x,aliasArray,crossArray,$(walkLength),diam)) + + val ele1 = multilayerGraph.count() +// println(s"crosslen = ${crossArray.length}, ele1len = $ele1, sum = ${emp1.sum}, len = ${emp2.length}") + +// val item = simi(1)(33)(0) +// println(s"ele1 = $ele1 ele2 = $ele2 item = $item") // println("Layer1: ") // for (i <- 0 to simi(0).length - 1; j <- 0 to simi(0)(0).length - 1) { // print((i, j) + " = " + simi(0)(i)(j) + " ") // } // println("Layer5: ") -// for (i <- 0 to simi(4).length - 1; j <- 0 to simi(0)(0).length - 1) { -// print((i, j) + " = " + simi(4)(i)(j) + " ") +// for (i <- 0 to simi(5).length - 1; j <- 0 to simi(0)(0).length - 1) { +// print((i, j) + " = " + simi(5)(i)(j).isNaN+ " ") // } + + +// val ele = aliasTable.collect()(203) +// println(s"acc cnt = ${ele._4.length}") +// for (i <- 0 to ele.length - 1) { +// println("no."+ i + " = layer: " + ele(i)._1+ " src: "+ele(i)._2+" dst: "+ele(i)._3(0)+" wei: "+ele(i)._4(0)+" acc: "+ele(i)._5(0)) +// } + // println("Degrees: ") // for (i <- 0 to degrees.length - 1) { // print("node"+ i + " = " + degrees(i)+ " ") @@ -92,13 +139,13 @@ class Struc2Vec(override val uid: String) extends Transformer // val ele = adjMatrix(32)(33) // println(s"get = $ele count = $cnt") - dataset.sparkSession.createDataFrame(edges.map(x => Row(x)), transformSchema(dataset.schema)) + dataset.sparkSession.createDataFrame(pathRDD.map(x => Row(x._1,x._2,x._3)), transformSchema(dataset.schema)) } override def transformSchema(schema: StructType): StructType = { - StructType(Seq(StructField("path",StringType, nullable = false))) + StructType(Seq(StructField("src",IntegerType, nullable = false),StructField("epochNum",IntegerType,nullable = false),StructField("path",StringType,nullable = false))) } @@ -109,25 +156,26 @@ class Struc2Vec(override val uid: String) extends Transformer } object Struc2Vec { - def calcAliasTable(partId: Int, iter: Iterator[(Long, Array[(Long, Float)])]): Iterator[(Long, Array[Long], Array[Float], Array[Int])] = { - iter.map { case (src, neighbors) => + def calcAliasTable(partId: Int, iter: Iterator[((Int,Int), Array[(Int, Double)])]): Iterator[(Int,Int, Array[Int], Array[Double], Array[Int])] = { + iter.map { case ((layer,src), neighbors) => val (events, weights) = neighbors.unzip val weightsSum = weights.sum val len = weights.length val areaRatio = weights.map(_ / weightsSum * len) val (accept, alias) = createAliasTable(areaRatio) - (src, events, accept, alias) + (layer, src, events, accept, alias) } } - def createAliasTable(areaRatio: Array[Float]): (Array[Float], Array[Int]) = { + def createAliasTable(areaRatio: Array[Double]): (Array[Double], Array[Int]) = { val len = areaRatio.length val small = ArrayBuffer[Int]() val large = ArrayBuffer[Int]() - val accept = Array.fill(len)(0f) + val accept = Array.fill(len)(0d) val alias = Array.fill(len)(0) for (idx <- areaRatio.indices) { + // note that both accept and alias carries the indices of corresponding event indices, not the corresponding node! if (areaRatio(idx) < 1.0) small.append(idx) else large.append(idx) } while (small.nonEmpty && large.nonEmpty) { @@ -150,4 +198,92 @@ object Struc2Vec { (accept, alias) } + def addSrcAndLayer(x: Int, layers: Int): Array[(Int, Int)] = { + val temp = ArrayBuffer[(Int, Int)]() + for (i: Int <- 0 until layers) + temp.append((i, x)) + temp.toArray + } + + def addDstAndWeights(layer:Int,src:Int,simi:Array[Array[Array[Double]]]):((Int,Int),Array[(Int,Double)])={ + val temp = ArrayBuffer[(Int,Double)]() + for(dst<- 0 until simi(layer)(src).length){ + // here dst != src + if(!simi(layer)(src)(dst).isNaN && dst!=src) + temp.append((dst,Math.exp(-simi(layer)(src)(dst)))) + } + ((layer,src),temp.toArray) + } + + def sumLayerWeights(x:((Int,Int),Array[(Int,Double)])):(Int,(Double,Int))={ + val (dst,weights) = x._2.unzip + val len = weights.length + (x._1._1,(weights.sum,len)) + } + + def calcAverageLayerWeights(x:(Int,Iterable[(Double,Int)])): (Int,Double)={ + val (sum,cnt) = x._2.unzip + (x._1,sum.sum/cnt.sum) + } + + def calcCrossLayerWeights(x:((Int,Int),Array[(Int,Double)]),layerAverageWeights:Array[(Int,Double)]):((Int,Int),Double)={ + val (dst,w) = x._2.unzip + val temp = w.filter(y => y>layerAverageWeights(x._1._1)._2) + (x._1,Math.log(Math.E+temp.length)) + } + + def process(x: (Int, Int), aliasArray: Array[(Int, Int, Array[Int], Array[Double], Array[Int])], cross: Array[((Int, Int), Double)], walkLen: Int, diam:Int): (Int,Int,String) = { + // ((Int,Int,Array[Int])) + // initialize path + val path = new ArrayBuffer[Int]() + val (src, epochNum) = x + var curLayer = 0 + path.append(src) + + // set the stay probability + val stay = 0.5 + + // start a random walk at node $src at layer 0 during $epochNUm + while (path.length < walkLen) { + val curNode: Int = path(path.length - 1) + // decide whether stay in current layer + val ifStay = new Random().nextDouble() + if (ifStay < stay) { + // stay in this layer + val temp = aliasArray.filter(x => x._1 == curLayer && x._2 == curNode)(0) + val events = temp._3 + val accepts = temp._4 + val alias = temp._5 + val index = new Random().nextInt(events.length) + val isAccept = new Random().nextDouble() + if(isAccept x._1._1==curLayer&&x._1._2==curNode)(0)._2 + if(cross.filter(x => x._1._1==curLayer+1 && x._1._2==curNode).length!=0){ + val ifUp = new Random().nextDouble() + if (ifUp < (up / (up + 1))) + curLayer += 1 + else + curLayer -= 1 + }else + // not go to upper corresponding node with no neighbor + curLayer -=1 + } + } + } + (src,epochNum,path.toArray.mkString(" ")) + } } \ No newline at end of file diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala index 054809024..951740a2f 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala @@ -42,7 +42,7 @@ object Struc2VecExample { .setIsWeighted(isWeighted) .setNeedReplicaEdge(needReplicateEdge) .setUseEdgeBalancePartition(useEdgeBalancePartition) - .setEpochNum(2) + .setEpochNum(3) struc2Vec.setOutputDir(output) val df = GraphIO.load(input, isWeighted = isWeighted, srcIndex, dstIndex, weightIndex, sep = sep) @@ -64,6 +64,7 @@ object Struc2VecExample { //PSContext.getOrCreate(sc) } + def stop(): Unit = { } From 7fe695e6eb18bdb1eb3ba451febceed21847d1d8 Mon Sep 17 00:00:00 2001 From: lizihan Date: Tue, 6 Sep 2022 22:12:02 +0800 Subject: [PATCH 4/5] get walks embeddings by word2vec --- .../embedding/struc2vec/test/Struc2Vec.scala | 7 ++--- .../struc2vec/test/Struc2VecExample.scala | 27 ++++++++++++++++--- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2Vec.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2Vec.scala index 2844e4c96..685731e2e 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2Vec.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2Vec.scala @@ -145,7 +145,7 @@ class Struc2Vec(override val uid: String) extends Transformer override def transformSchema(schema: StructType): StructType = { - StructType(Seq(StructField("src",IntegerType, nullable = false),StructField("epochNum",IntegerType,nullable = false),StructField("path",StringType,nullable = false))) + StructType(Seq(StructField("src",IntegerType, nullable = false),StructField("epochNum",IntegerType,nullable = false),StructField("path",ArrayType(StringType),nullable = false))) } @@ -232,7 +232,7 @@ object Struc2Vec { (x._1,Math.log(Math.E+temp.length)) } - def process(x: (Int, Int), aliasArray: Array[(Int, Int, Array[Int], Array[Double], Array[Int])], cross: Array[((Int, Int), Double)], walkLen: Int, diam:Int): (Int,Int,String) = { + def process(x: (Int, Int), aliasArray: Array[(Int, Int, Array[Int], Array[Double], Array[Int])], cross: Array[((Int, Int), Double)], walkLen: Int, diam:Int): (Int,Int,Array[String]) = { // ((Int,Int,Array[Int])) // initialize path val path = new ArrayBuffer[Int]() @@ -272,6 +272,7 @@ object Struc2Vec { }else{ // decide up or down val up = cross.filter(x => x._1._1==curLayer&&x._1._2==curNode)(0)._2 + // since node in higher layer if(cross.filter(x => x._1._1==curLayer+1 && x._1._2==curNode).length!=0){ val ifUp = new Random().nextDouble() if (ifUp < (up / (up + 1))) @@ -284,6 +285,6 @@ object Struc2Vec { } } } - (src,epochNum,path.toArray.mkString(" ")) + (src,epochNum,path.toArray.mkString(" ").split(" ")) } } \ No newline at end of file diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala index 951740a2f..e391aed7f 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala @@ -6,7 +6,7 @@ import com.tencent.angel.graph.utils.GraphIO import com.tencent.angel.spark.context.PSContext import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} - +import org.apache.spark.ml.feature.Word2Vec import scala.collection.mutable.ArrayBuffer object Struc2VecExample { @@ -25,7 +25,7 @@ object Struc2VecExample { val needReplicateEdge = true val sep = " " - val walkLength = 10 + val walkLength = 15 start() @@ -42,14 +42,33 @@ object Struc2VecExample { .setIsWeighted(isWeighted) .setNeedReplicaEdge(needReplicateEdge) .setUseEdgeBalancePartition(useEdgeBalancePartition) - .setEpochNum(3) + .setEpochNum(2) struc2Vec.setOutputDir(output) val df = GraphIO.load(input, isWeighted = isWeighted, srcIndex, dstIndex, weightIndex, sep = sep) val mapping = struc2Vec.transform(df) + mapping.show() + val path = mapping.select("path") + + + + val word2Vec = new Word2Vec() + .setInputCol("path") + .setOutputCol("result") + .setVectorSize(10) + .setMinCount(0) + + val model = word2Vec.fit(mapping) + val result = model.transform(mapping) + result.show() +// result.select("result").take(3).foreach(println) + println(s"count = ${result.count()}") + + + stop() } @@ -66,7 +85,7 @@ object Struc2VecExample { def stop(): Unit = { - + SparkContext.getOrCreate().stop() } } From dc21377e249582c87a3a93d3e5d4e4400c98d029 Mon Sep 17 00:00:00 2001 From: lizihan Date: Mon, 12 Sep 2022 00:02:28 +0800 Subject: [PATCH 5/5] complete the implementation of the struc2vec algorithm[local ver.] --- data/bc/mirrored_karate_club_network.txt | 157 ++++++++++++++++++ .../angel/graph/embedding/struc2vec/README.md | 47 +++++- .../struc2vec/test/Struc2VecExample.scala | 32 +++- 3 files changed, 226 insertions(+), 10 deletions(-) create mode 100644 data/bc/mirrored_karate_club_network.txt diff --git a/data/bc/mirrored_karate_club_network.txt b/data/bc/mirrored_karate_club_network.txt new file mode 100644 index 000000000..f673ac844 --- /dev/null +++ b/data/bc/mirrored_karate_club_network.txt @@ -0,0 +1,157 @@ +0 1 +0 2 +0 3 +0 4 +0 5 +0 6 +0 7 +0 8 +0 10 +0 11 +0 12 +0 13 +0 17 +0 19 +0 21 +0 31 +0 34 +1 2 +1 3 +1 7 +1 13 +1 17 +1 19 +1 21 +1 30 +2 3 +2 7 +2 8 +2 9 +2 13 +2 27 +2 28 +2 32 +3 7 +3 12 +3 13 +4 6 +4 10 +5 6 +5 10 +5 16 +6 16 +8 30 +8 32 +8 33 +9 33 +13 33 +14 32 +14 33 +15 32 +15 33 +18 32 +18 33 +19 33 +20 32 +20 33 +22 32 +22 33 +23 25 +23 27 +23 29 +23 32 +23 33 +24 25 +24 27 +24 31 +25 31 +26 29 +26 33 +27 33 +28 31 +28 33 +29 32 +29 33 +30 32 +30 33 +31 32 +31 33 +32 33 +34 35 +34 36 +34 37 +34 38 +34 39 +34 40 +34 41 +34 42 +34 44 +34 45 +34 46 +34 47 +34 51 +34 53 +34 55 +34 65 +35 36 +35 37 +35 41 +35 47 +35 51 +35 53 +35 55 +35 64 +36 37 +36 41 +36 42 +36 43 +36 47 +36 61 +36 62 +36 66 +37 41 +37 46 +37 47 +38 40 +38 44 +39 40 +39 44 +39 50 +40 50 +42 64 +42 66 +42 67 +43 67 +47 67 +48 66 +48 67 +49 66 +49 67 +52 66 +52 67 +53 67 +54 66 +54 67 +56 66 +56 67 +57 59 +57 61 +57 63 +57 66 +57 67 +58 59 +58 61 +58 65 +59 65 +60 63 +60 67 +61 67 +62 65 +62 67 +63 66 +63 67 +64 66 +64 67 +65 66 +65 67 +66 67 \ No newline at end of file diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/README.md b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/README.md index dbafbb49e..2e2c29635 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/README.md +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/README.md @@ -8,7 +8,7 @@ - 方便处理,假设输入图为单个连通分量,且不同顶点间无multiple edges - 使用Floyd算法获得以上输入图的hopCountResult矩阵,显然在此矩阵中,最大值(单个连通分量,max hop count != INF)为此图直径,第i行记录了顶点i到到其他所有顶点的 hop counts - 一次遍历hopCountResult矩阵,获取图直径以及各顶点度数 -- 基于层次计算的思想,自底向上计算得到图结构相似度,最后输入结果为三维矩阵 Integer[k][n][n] structSimi,显然structSimi[k][i][j] = fk(Nodei,Nodej) +- 基于层次计算的思想,自底向上计算得到图结构相似度,最后输出结果为三维矩阵 Integer[k][n][n] structSimi,显然structSimi[k][i][j] = fk(Nodei,Nodej) **算法需要进行的优化**: - Item1:无向图的adjacency matrix为对称矩阵,可以进行相应的矩阵压缩 @@ -19,5 +19,48 @@ **测试** - 测试使用的图为 Barbell-Graph (1,1), 其对应邻接矩阵为:[[0,1,INF],[1,0,1],[INF,1,0]] - 期望输出的相似度层次网络的矩阵(经手算验证)应为:[[[0,1,0],[1,0,1],[0,1,0]],[[0,3,0],[3,0,3],[0,3,0]],[[0,NaN,0],[NaN,NaN,NaN],[0,NaN,NaN]]] -- 相关具体输出结果见issue +- 相关具体输出结果见issue[1229] +------------更新于2022/9/11------------ + +**当前进展** +- 在第5、6周基本跑通local任务的基础上结合论文进行了进一步验证 +- 最新的可视化结果表明算法实现基本正确 +- 目前算法分布式版本任务除“Struc2VecPartition”和“Struc2VecPSModel”的实现外已基本完成 +- 在Zachary’s Karate Network(第5&6周的测试输入图)的基础上构建Mirrored Zachary’s Karate Network(基于论文) +- 其中两个Zachary’s Karate Network由0号顶点与34号(0+33)顶点间的边所连结,即所有属于[0,33]号的顶点分别对应其顶点号+34的顶点所对应(例如:0 对应 34; 14 对应 48 ...) +- 输入Mirrored Zachary’s Karate network得到各个顶点的embedding,基于PCA得到压缩后的各顶点的二维向量,并使用Origin2021对得到的二维向量进行可视化 + +**struc2vec算法[local]实现基本思路**: +- 获得rawEdges的RDD后,使用collect()函数,建立图的完整邻接矩阵(local版本不进行切分) +- 基于类StructureSimilarity计算得到structure similarity distance network +- 建立multilayer weighted graph RDD +- 创建cross layer weight(level up weights) +- 创建the alias table +- sample node context,并注意如下实现细节: +- scala中Double.NaN不等于任何数(包括其自身),判断时应该使用 “.isNaN()” +- 构建层间跳跃时,忽视某顶点的高一层对应顶点可能其邻点集合为空,在实现时判断逻辑优化为如下: + - 在构建多层网络时在RDD中直接过滤掉邻点集为空的元素 + - 在进行跨层判断时,先判断该顶点是否存在上层对应点,若不存在,则跨层只向下 + + + +**测试** + +1. 输入: + - Mirrored Zachary’s Karate network(68 nodes 157 undirected edges) + - walk length: 15 + - epochNum(训练轮数): 5 + - vector size (word2vec) : 10 + - window size (word2vec) : 3 + - stay(停留在当前层的概率): 0.5 + +2. 期望输出: + - 在可视化散点图中所有属于[0,33]号的顶点分别与其对应其顶点号+34的顶点应该尽可能靠近(结构相似度最高) + - 相关具体输出结果见issue[1242] + + +**未来的工作**: +- Item1:继续进行算法分布式版本实现的工作,特别是实现“Struc2VecPartition”和“Struc2VecPSModel” +- Item2:调整算法的超参数,优化得到的结果 +- Item3:在算法整体实现完成后的基础上根据论文完善对实现代码细节的优化 \ No newline at end of file diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala index e391aed7f..bbb9c31ba 100644 --- a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala @@ -6,12 +6,12 @@ import com.tencent.angel.graph.utils.GraphIO import com.tencent.angel.spark.context.PSContext import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.ml.feature.Word2Vec +import org.apache.spark.ml.feature.{Word2Vec,PCA} import scala.collection.mutable.ArrayBuffer object Struc2VecExample { def main(args: Array[String]): Unit = { - val input = "data/bc/karate_club_network.txt" + val input = "data/bc/mirrored_karate_club_network.txt" val storageLevel = StorageLevel.fromString("MEMORY_ONLY") val batchSize = 10 val output = "data/output/output1" @@ -42,30 +42,46 @@ object Struc2VecExample { .setIsWeighted(isWeighted) .setNeedReplicaEdge(needReplicateEdge) .setUseEdgeBalancePartition(useEdgeBalancePartition) - .setEpochNum(2) + .setEpochNum(5) struc2Vec.setOutputDir(output) val df = GraphIO.load(input, isWeighted = isWeighted, srcIndex, dstIndex, weightIndex, sep = sep) val mapping = struc2Vec.transform(df) - mapping.show() val path = mapping.select("path") - - + // get node embeddings by word2vec val word2Vec = new Word2Vec() .setInputCol("path") .setOutputCol("result") .setVectorSize(10) .setMinCount(0) + .setWindowSize(3) val model = word2Vec.fit(mapping) val result = model.transform(mapping) - result.show() + val vec = model.getVectors + + +// visulization by pca + val pca = new PCA() + .setInputCol("vector") + .setOutputCol("pcaFeatures") + .setK(2) + .fit(vec) + + val result2 = pca.transform(vec) + + result2.show(68,false) + + result2.select("word","pcaFeatures").show(68,false) + +// result2.select("word","pcaFeatures").rdd.saveAsTextFile("C:\\Users\\Lzh\\Desktop\\result.txt") +// vec.show() // result.select("result").take(3).foreach(println) - println(s"count = ${result.count()}") +// println(s"vec(0) = ${vecArr(0)}")