diff --git a/data/bc/mirrored_karate_club_network.txt b/data/bc/mirrored_karate_club_network.txt new file mode 100644 index 000000000..f673ac844 --- /dev/null +++ b/data/bc/mirrored_karate_club_network.txt @@ -0,0 +1,157 @@ +0 1 +0 2 +0 3 +0 4 +0 5 +0 6 +0 7 +0 8 +0 10 +0 11 +0 12 +0 13 +0 17 +0 19 +0 21 +0 31 +0 34 +1 2 +1 3 +1 7 +1 13 +1 17 +1 19 +1 21 +1 30 +2 3 +2 7 +2 8 +2 9 +2 13 +2 27 +2 28 +2 32 +3 7 +3 12 +3 13 +4 6 +4 10 +5 6 +5 10 +5 16 +6 16 +8 30 +8 32 +8 33 +9 33 +13 33 +14 32 +14 33 +15 32 +15 33 +18 32 +18 33 +19 33 +20 32 +20 33 +22 32 +22 33 +23 25 +23 27 +23 29 +23 32 +23 33 +24 25 +24 27 +24 31 +25 31 +26 29 +26 33 +27 33 +28 31 +28 33 +29 32 +29 33 +30 32 +30 33 +31 32 +31 33 +32 33 +34 35 +34 36 +34 37 +34 38 +34 39 +34 40 +34 41 +34 42 +34 44 +34 45 +34 46 +34 47 +34 51 +34 53 +34 55 +34 65 +35 36 +35 37 +35 41 +35 47 +35 51 +35 53 +35 55 +35 64 +36 37 +36 41 +36 42 +36 43 +36 47 +36 61 +36 62 +36 66 +37 41 +37 46 +37 47 +38 40 +38 44 +39 40 +39 44 +39 50 +40 50 +42 64 +42 66 +42 67 +43 67 +47 67 +48 66 +48 67 +49 66 +49 67 +52 66 +52 67 +53 67 +54 66 +54 67 +56 66 +56 67 +57 59 +57 61 +57 63 +57 66 +57 67 +58 59 +58 61 +58 65 +59 65 +60 63 +60 67 +61 67 +62 65 +62 67 +63 66 +63 67 +64 66 +64 67 +65 66 +65 67 +66 67 \ No newline at end of file diff --git a/data/output/output1/_SUCCESS b/data/output/output1/_SUCCESS new file mode 100644 index 000000000..e69de29bb diff --git a/data/output/output1/part-00000-4ba89a38-c974-4d33-a020-d83060e18bda-c000.csv b/data/output/output1/part-00000-4ba89a38-c974-4d33-a020-d83060e18bda-c000.csv new file mode 100644 index 000000000..bbc9bcd7a --- /dev/null +++ b/data/output/output1/part-00000-4ba89a38-c974-4d33-a020-d83060e18bda-c000.csv @@ -0,0 +1,34 @@ +20 32 31 24 25 24 27 24 25 31 +13 2 32 2 0 12 0 5 16 6 +19 1 21 1 2 9 33 8 2 8 +21 1 19 0 17 1 3 1 2 8 +4 10 0 8 0 13 0 5 16 5 +15 32 31 32 14 33 8 32 2 1 +16 5 6 4 10 4 0 5 16 6 +22 33 22 32 14 33 18 32 15 33 +28 2 28 33 8 2 1 19 33 28 +29 33 31 24 25 24 27 24 25 31 +25 24 27 2 28 31 33 26 29 33 +11 0 17 0 2 27 33 13 2 13 +30 32 30 32 29 26 29 26 29 33 +32 15 33 27 2 27 33 26 29 33 +27 23 33 23 27 24 27 33 15 33 +0 4 0 31 25 24 27 2 8 32 +24 31 28 31 25 31 28 31 25 24 +14 33 23 25 23 27 24 25 31 24 +33 9 2 7 1 0 13 3 13 1 +23 27 24 31 25 31 28 31 25 31 +1 17 0 21 1 30 32 2 28 31 +6 0 11 0 8 33 8 30 32 31 +17 0 13 3 1 30 32 15 33 32 +3 13 3 12 3 1 3 12 0 19 +7 0 11 0 10 5 6 16 6 16 +12 0 4 6 16 5 6 4 10 0 +8 2 9 33 15 33 13 2 28 31 +9 33 31 32 23 29 26 33 32 22 +18 33 31 32 23 29 23 29 33 32 +31 28 2 1 30 8 30 1 3 2 +26 33 32 33 9 2 13 0 2 9 +10 4 6 16 6 5 0 19 1 0 +5 0 7 2 7 0 6 5 16 5 +2 0 21 1 0 6 16 6 0 12 diff --git a/data/output/output1/part-00000-c068cd3b-db23-44fe-a3c5-47048c8533dd-c000.csv b/data/output/output1/part-00000-c068cd3b-db23-44fe-a3c5-47048c8533dd-c000.csv new file mode 100644 index 000000000..c9f7a67f2 --- /dev/null +++ b/data/output/output1/part-00000-c068cd3b-db23-44fe-a3c5-47048c8533dd-c000.csv @@ -0,0 +1,34 @@ +20 33 20 32 20 32 23 25 23 27 +13 0 7 0 11 0 31 24 27 23 +19 33 20 33 18 33 19 0 17 0 +21 1 21 1 3 13 2 32 23 27 +4 10 0 12 0 21 1 7 0 13 +15 33 14 32 2 7 2 32 15 32 +16 6 16 6 5 0 10 5 0 13 +22 33 20 33 18 32 23 25 31 0 +28 33 14 32 2 7 2 32 15 32 +29 26 33 13 33 28 2 9 2 0 +25 23 25 23 32 33 20 32 8 30 +11 0 7 0 10 5 16 6 4 0 +30 1 21 0 12 3 1 3 0 13 +32 23 25 23 32 30 1 3 0 13 +27 33 15 33 19 33 20 33 18 33 +0 19 33 28 33 9 33 20 32 15 +24 27 23 32 20 32 29 23 33 30 +14 33 30 32 8 2 27 24 27 2 +33 32 22 32 8 2 7 2 8 30 +23 27 23 25 24 27 24 31 32 15 +1 30 1 3 12 0 8 30 33 20 +6 4 0 31 33 23 33 14 33 14 +17 1 13 1 13 1 13 33 19 33 +3 2 13 33 32 31 25 31 24 27 +7 2 13 1 7 2 9 33 30 1 +12 0 19 33 14 32 8 30 32 22 +8 32 22 32 31 28 33 23 25 24 +9 33 8 0 8 32 31 24 31 28 +18 32 22 32 31 28 31 24 31 28 +31 25 24 27 23 32 31 24 25 24 +26 33 32 31 32 2 27 2 27 33 +10 4 0 17 1 17 0 12 3 0 +5 0 4 0 19 33 26 29 26 29 +2 27 24 25 23 29 23 33 26 29 diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/DTW.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/DTW.scala new file mode 100644 index 000000000..203c451ef --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/DTW.scala @@ -0,0 +1,50 @@ +package com.tencent.angel.graph.embedding.struc2vec + +object DTW { + def compute(a:Array[Int],b:Array[Int]):Double = { + + // DTW(Dynamic Time Warping) + // a fundamental implementation based on dynamic programming + // dp[i][j] = Dist(i,j) + min{dp[i-1][j],dp[i][j-1],dp[i-1][j-1]} + + if(a.length==0||b.length==0) + Double.NaN + else{ + val dp: Array[Array[Double]] = Array.ofDim(a.length, b.length) + + // set the boundary condition + // the first column, dp[i][0] = Dist(i,0)+ dp[i-1][0]; + + dp(0)(0) = dist(a(0),b(0)) + for(i <- 1 until a.length) + dp(i)(0) = dp(i-1)(0)+dist(a(i),b(0)) + + // the first row, dp[0][j] = Dist(0,j)+ dp[0][j-1]; + + for (j <- 1 until b.length) + dp(0)(j) = dp(0)(j - 1) + dist(b(j), a(0)) + + // fill up the whole dp matrix from left to right, from top to bottom + + for(i<-1 until a.length;j<- 1 until b.length) + dp(i)(j) = dist(a(i),b(j))+minAmongThree(dp(i-1)(j),dp(i)(j-1),dp(i-1)(j-1)) + + dp(a.length-1)(b.length-1) + } + } + + def dist(a:Int,b:Int):Double = { + // Assume that a, b are both positive (the node degrees ) + Math.max(a,b)/Math.min(a,b)-1 + } + + def minAmongThree(a:Double,b:Double,c:Double):Double = { + if(adiam){ + + }else if(k==0){ + for(i<- 0 until n; j<- 0 until n) + structSimi(0)(i)(j) = DTW.compute(getHopRingK(i,0),getHopRingK(j,0)) + }else{ + for(i<-0 until n; j<- 0 until n){ + // notice that NaN == NaN (false!) + if (structSimi(k-1)(i)(j).isNaN) + structSimi(k)(i)(j) = Double.NaN + else{ + val temp: Double = DTW.compute(getHopRingK(i,k),getHopRingK(j,k)) + if(temp.isNaN) + structSimi(k)(i)(j) = Double.NaN + else + structSimi(k)(i)(j) = structSimi(k-1)(i)(j)+temp + } + } + } + } + + def getHopRingK(node:Int,k:Int): Array[Int] = { + // return the ordered degree array s(R_k(node)) + if (node < 0 || node > n - 1 || k < 0 || k > diam) + Array[Int]() + else{ + val result = ArrayBuffer[Int]() + for(j<- 0 until n) + if(hopCountResult(node)(j)==k) + result.append(degrees(j)) + result.sorted.toArray + } + } + +} diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2Vec.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2Vec.scala new file mode 100644 index 000000000..685731e2e --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2Vec.scala @@ -0,0 +1,290 @@ +package com.tencent.angel.graph.embedding.struc2vec.test + +import breeze.stats.distributions.AliasTable +import com.tencent.angel.graph.data.neighbor.NeighborDataOps +import com.tencent.angel.graph.embedding.struc2vec.StructureSimilarity +import com.tencent.angel.graph.embedding.struc2vec.test.Struc2Vec.{addDstAndWeights, addSrcAndLayer, calcAverageLayerWeights, calcCrossLayerWeights, sumLayerWeights,process} +import com.tencent.angel.graph.utils.{GraphIO, Stats} +import com.tencent.angel.graph.utils.params.{HasArrayBoundsPath, HasBalancePartitionPercent, HasBatchSize, HasDstNodeIdCol, HasEpochNum, HasIsWeighted, HasMaxIteration, HasNeedReplicaEdge, HasOutputCoreIdCol, HasOutputNodeIdCol, HasPSPartitionNum, HasPartitionNum, HasSrcNodeIdCol, HasStorageLevel, HasUseBalancePartition, HasUseEdgeBalancePartition, HasWalkLength, HasWeightCol} +import com.tencent.angel.spark.context.PSContext +import org.apache.spark.{SPARK_BRANCH, SparkContext} +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.util.Identifiable +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.types.{StringType, StructField, StructType, _} +import org.apache.spark.storage.StorageLevel + +import java.util.Random +import scala.collection.mutable.ArrayBuffer + + +class Struc2Vec(override val uid: String) extends Transformer + with HasSrcNodeIdCol with HasDstNodeIdCol with HasOutputNodeIdCol with HasOutputCoreIdCol + with HasStorageLevel with HasPartitionNum with HasPSPartitionNum with HasMaxIteration + with HasBatchSize with HasArrayBoundsPath with HasIsWeighted with HasWeightCol with HasUseBalancePartition + with HasNeedReplicaEdge with HasUseEdgeBalancePartition with HasWalkLength with HasEpochNum with HasBalancePartitionPercent { + private var output: String = _ + + def this() = this(Identifiable.randomUID("Struc2Vec")) + + def setOutputDir(in: String): Unit = { + output = in + } + + override def transform(dataset: Dataset[_]): DataFrame = { + + //create origin edges RDD and data preprocessing + + val rawEdges = NeighborDataOps.loadEdgesWithWeight(dataset, $(srcNodeIdCol), $(dstNodeIdCol), $(weightCol), $(isWeighted), $(needReplicaEdge), true, false, false) + rawEdges.repartition($(partitionNum)).persist(StorageLevel.DISK_ONLY) + val (minId, maxId, numEdges) = Stats.summarizeWithWeight(rawEdges) + println(s"minId=$minId maxId=$maxId numEdges=$numEdges level=${$(storageLevel)}") + + val edges = rawEdges.map { case (src, dst, w) => (src, (dst, w)) } + val rmWeight = rawEdges.map{case (src, dst, w) => (src, dst)} + + // create adjacency table + val adjTable = rmWeight.groupByKey() + + // create adjacency matrix + val len:Int = adjTable.count().toInt + val adjArray = adjTable.collect() + val adjMatrix: Array[Array[Int]] = Array.ofDim(len,len) + + // initialize adjMatrix + for (i <- 0 until len; j<- 0 until len) + if(i==j) + adjMatrix(i)(j) = 0 + else + adjMatrix(i)(j) = Int.MaxValue + + for(item <- adjArray){ + for(j<-item._2) + adjMatrix(item._1.toInt)(j.toInt) = 1 + } + + val structureSimilarity = new StructureSimilarity(adjMatrix) + +// val diam = structureSimilarity.diam +// println(s"diam = $diam") +// val hop = structureSimilarity.hopCountResult +// val degrees = structureSimilarity.degrees + + structureSimilarity.compute() + + // get the structure similarity distance network + val simi = structureSimilarity.structSimi + val diam = structureSimilarity.diam + + // create multilayer weighted graph RDD + val multilayerRDD = adjTable.map(x => x._1.toInt).sortBy(x => x).flatMap(x => addSrcAndLayer(x,simi.length)) + val multilayerGraph = multilayerRDD.map(x => addDstAndWeights(x._1,x._2,simi)).filter(x=>x._2.length!=0) + + // create layer average weight array + val layerAverageWeight = multilayerGraph.map(x=>sumLayerWeights(x)).groupByKey().map(x=>calcAverageLayerWeights(x)).sortByKey().collect() + + // create cross layer weight(level up weights) + val crossLayerWeights = multilayerGraph.map(x=>calcCrossLayerWeights(x,layerAverageWeight)) + val crossArray = crossLayerWeights.collect() + + // create the alias table + val aliasTable = multilayerGraph + .mapPartitionsWithIndex { case (partId, iter) => + Struc2Vec.calcAliasTable(partId, iter) + } + val aliasArray = aliasTable.collect() + + val temp = aliasTable.filter(x => x._1==0&&x._2==0).collect()(0) + println(s"temp: ${temp._3.length}") + + + // generate context for nodes + // create the raw sample path rdd, (epochNum,src) + val rawPathRDD = adjTable.map(x => x._1.toInt).sortBy(x => x).flatMap(x => addSrcAndLayer(x,$(epochNum))) + val pathRDD = rawPathRDD.map{case (epochNum,src) => (src,epochNum) }.map(x => process(x,aliasArray,crossArray,$(walkLength),diam)) + + val ele1 = multilayerGraph.count() +// println(s"crosslen = ${crossArray.length}, ele1len = $ele1, sum = ${emp1.sum}, len = ${emp2.length}") + +// val item = simi(1)(33)(0) +// println(s"ele1 = $ele1 ele2 = $ele2 item = $item") + +// println("Layer1: ") +// for (i <- 0 to simi(0).length - 1; j <- 0 to simi(0)(0).length - 1) { +// print((i, j) + " = " + simi(0)(i)(j) + " ") +// } +// println("Layer5: ") +// for (i <- 0 to simi(5).length - 1; j <- 0 to simi(0)(0).length - 1) { +// print((i, j) + " = " + simi(5)(i)(j).isNaN+ " ") +// } + + + +// val ele = aliasTable.collect()(203) +// println(s"acc cnt = ${ele._4.length}") +// for (i <- 0 to ele.length - 1) { +// println("no."+ i + " = layer: " + ele(i)._1+ " src: "+ele(i)._2+" dst: "+ele(i)._3(0)+" wei: "+ele(i)._4(0)+" acc: "+ele(i)._5(0)) +// } + +// println("Degrees: ") +// for (i <- 0 to degrees.length - 1) { +// print("node"+ i + " = " + degrees(i)+ " ") +// } + +// val fn =adjTable.take(17)(16)._2 +// val temp = adjArray(0)._2 +// val cnt = adjTable.count() +// val ele = adjMatrix(32)(33) +// println(s"get = $ele count = $cnt") + + dataset.sparkSession.createDataFrame(pathRDD.map(x => Row(x._1,x._2,x._3)), transformSchema(dataset.schema)) + + } + + + override def transformSchema(schema: StructType): StructType = { + StructType(Seq(StructField("src",IntegerType, nullable = false),StructField("epochNum",IntegerType,nullable = false),StructField("path",ArrayType(StringType),nullable = false))) + } + + + override def copy(extra: ParamMap): Transformer = defaultCopy(extra) + + + +} + +object Struc2Vec { + def calcAliasTable(partId: Int, iter: Iterator[((Int,Int), Array[(Int, Double)])]): Iterator[(Int,Int, Array[Int], Array[Double], Array[Int])] = { + iter.map { case ((layer,src), neighbors) => + val (events, weights) = neighbors.unzip + val weightsSum = weights.sum + val len = weights.length + val areaRatio = weights.map(_ / weightsSum * len) + val (accept, alias) = createAliasTable(areaRatio) + (layer, src, events, accept, alias) + } + } + + def createAliasTable(areaRatio: Array[Double]): (Array[Double], Array[Int]) = { + val len = areaRatio.length + val small = ArrayBuffer[Int]() + val large = ArrayBuffer[Int]() + val accept = Array.fill(len)(0d) + val alias = Array.fill(len)(0) + + for (idx <- areaRatio.indices) { + // note that both accept and alias carries the indices of corresponding event indices, not the corresponding node! + if (areaRatio(idx) < 1.0) small.append(idx) else large.append(idx) + } + while (small.nonEmpty && large.nonEmpty) { + val smallIndex = small.remove(small.size - 1) + val largeIndex = large.remove(large.size - 1) + accept(smallIndex) = areaRatio(smallIndex) + alias(smallIndex) = largeIndex + areaRatio(largeIndex) = areaRatio(largeIndex) - (1 - areaRatio(smallIndex)) + if (areaRatio(largeIndex) < 1.0) small.append(largeIndex) else large.append(largeIndex) + } + while (small.nonEmpty) { + val smallIndex = small.remove(small.size - 1) + accept(smallIndex) = 1 + } + + while (large.nonEmpty) { + val largeIndex = large.remove(large.size - 1) + accept(largeIndex) = 1 + } + (accept, alias) + } + + def addSrcAndLayer(x: Int, layers: Int): Array[(Int, Int)] = { + val temp = ArrayBuffer[(Int, Int)]() + for (i: Int <- 0 until layers) + temp.append((i, x)) + temp.toArray + } + + def addDstAndWeights(layer:Int,src:Int,simi:Array[Array[Array[Double]]]):((Int,Int),Array[(Int,Double)])={ + val temp = ArrayBuffer[(Int,Double)]() + for(dst<- 0 until simi(layer)(src).length){ + // here dst != src + if(!simi(layer)(src)(dst).isNaN && dst!=src) + temp.append((dst,Math.exp(-simi(layer)(src)(dst)))) + } + ((layer,src),temp.toArray) + } + + def sumLayerWeights(x:((Int,Int),Array[(Int,Double)])):(Int,(Double,Int))={ + val (dst,weights) = x._2.unzip + val len = weights.length + (x._1._1,(weights.sum,len)) + } + + def calcAverageLayerWeights(x:(Int,Iterable[(Double,Int)])): (Int,Double)={ + val (sum,cnt) = x._2.unzip + (x._1,sum.sum/cnt.sum) + } + + def calcCrossLayerWeights(x:((Int,Int),Array[(Int,Double)]),layerAverageWeights:Array[(Int,Double)]):((Int,Int),Double)={ + val (dst,w) = x._2.unzip + val temp = w.filter(y => y>layerAverageWeights(x._1._1)._2) + (x._1,Math.log(Math.E+temp.length)) + } + + def process(x: (Int, Int), aliasArray: Array[(Int, Int, Array[Int], Array[Double], Array[Int])], cross: Array[((Int, Int), Double)], walkLen: Int, diam:Int): (Int,Int,Array[String]) = { + // ((Int,Int,Array[Int])) + // initialize path + val path = new ArrayBuffer[Int]() + val (src, epochNum) = x + var curLayer = 0 + path.append(src) + + // set the stay probability + val stay = 0.5 + + // start a random walk at node $src at layer 0 during $epochNUm + while (path.length < walkLen) { + val curNode: Int = path(path.length - 1) + // decide whether stay in current layer + val ifStay = new Random().nextDouble() + if (ifStay < stay) { + // stay in this layer + val temp = aliasArray.filter(x => x._1 == curLayer && x._2 == curNode)(0) + val events = temp._3 + val accepts = temp._4 + val alias = temp._5 + val index = new Random().nextInt(events.length) + val isAccept = new Random().nextDouble() + if(isAccept x._1._1==curLayer&&x._1._2==curNode)(0)._2 + // since node in higher layer + if(cross.filter(x => x._1._1==curLayer+1 && x._1._2==curNode).length!=0){ + val ifUp = new Random().nextDouble() + if (ifUp < (up / (up + 1))) + curLayer += 1 + else + curLayer -= 1 + }else + // not go to upper corresponding node with no neighbor + curLayer -=1 + } + } + } + (src,epochNum,path.toArray.mkString(" ").split(" ")) + } +} \ No newline at end of file diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala new file mode 100644 index 000000000..bbb9c31ba --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/Struc2VecExample.scala @@ -0,0 +1,110 @@ +package com.tencent.angel.graph.embedding.struc2vec.test + +import com.tencent.angel.conf.AngelConf +import com.tencent.angel.graph.embedding.struc2vec.test.Struc2Vec +import com.tencent.angel.graph.utils.GraphIO +import com.tencent.angel.spark.context.PSContext +import org.apache.spark.storage.StorageLevel +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.ml.feature.{Word2Vec,PCA} +import scala.collection.mutable.ArrayBuffer + +object Struc2VecExample { + def main(args: Array[String]): Unit = { + val input = "data/bc/mirrored_karate_club_network.txt" + val storageLevel = StorageLevel.fromString("MEMORY_ONLY") + val batchSize = 10 + val output = "data/output/output1" + val srcIndex = 0 + val dstIndex = 1 + val weightIndex = 2 + val psPartitionNum = 1 + val partitionNum = 1 + val useEdgeBalancePartition = false + val isWeighted = false + val needReplicateEdge = true + + val sep = " " + val walkLength = 15 + + + start() + + val struc2Vec = new Struc2Vec() + .setStorageLevel(storageLevel) + .setPSPartitionNum(psPartitionNum) + .setSrcNodeIdCol("src") + .setDstNodeIdCol("dst") + .setWeightCol("weight") + .setBatchSize(batchSize) + .setWalkLength(walkLength) + .setPartitionNum(partitionNum) + .setIsWeighted(isWeighted) + .setNeedReplicaEdge(needReplicateEdge) + .setUseEdgeBalancePartition(useEdgeBalancePartition) + .setEpochNum(5) + + struc2Vec.setOutputDir(output) + val df = GraphIO.load(input, isWeighted = isWeighted, srcIndex, dstIndex, weightIndex, sep = sep) + val mapping = struc2Vec.transform(df) + + mapping.show() + + val path = mapping.select("path") + + // get node embeddings by word2vec + val word2Vec = new Word2Vec() + .setInputCol("path") + .setOutputCol("result") + .setVectorSize(10) + .setMinCount(0) + .setWindowSize(3) + + val model = word2Vec.fit(mapping) + val result = model.transform(mapping) + val vec = model.getVectors + + +// visulization by pca + val pca = new PCA() + .setInputCol("vector") + .setOutputCol("pcaFeatures") + .setK(2) + .fit(vec) + + val result2 = pca.transform(vec) + + result2.show(68,false) + + result2.select("word","pcaFeatures").show(68,false) + +// result2.select("word","pcaFeatures").rdd.saveAsTextFile("C:\\Users\\Lzh\\Desktop\\result.txt") +// vec.show() +// result.select("result").take(3).foreach(println) +// println(s"vec(0) = ${vecArr(0)}") + + + + stop() + } + + def start(mode: String = "local[4]"): Unit = { + val conf = new SparkConf() + conf.setMaster(mode) + conf.setAppName("Struc2Vec") + conf.set(AngelConf.ANGEL_PSAGENT_UPDATE_SPLIT_ADAPTION_ENABLE, "false") + val sc = new SparkContext(conf) + sc.setLogLevel("ERROR") + sc.setCheckpointDir("data/cp") + //PSContext.getOrCreate(sc) + } + + + def stop(): Unit = { + SparkContext.getOrCreate().stop() + } + +} + + + diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestDTW.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestDTW.scala new file mode 100644 index 000000000..77ea3519e --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestDTW.scala @@ -0,0 +1,13 @@ +package com.tencent.angel.graph.embedding.struc2vec.test + +import com.tencent.angel.graph.embedding.struc2vec.DTW + +object TestDTW { + def main(args: Array[String]): Unit = { + // val a = Array(1, 1, 1, 1, 2, 2, 3, 3, 4, 3, 2, 2, 1, 1, 1, 1) + // val b = Array(1, 1, 2, 2, 3, 3, 4, 4, 4, 4, 3, 3, 2, 2, 1, 1) + // println(DTW.compute(a, b)) + println(DTW.compute(Array(1, 1), Array(2))) + } + +} diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestFloyd.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestFloyd.scala new file mode 100644 index 000000000..c47818378 --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestFloyd.scala @@ -0,0 +1,30 @@ +package com.tencent.angel.graph.embedding.struc2vec.test +import com.tencent.angel.graph.embedding.struc2vec.Floyd + + +object TestFloyd { + val INF = Floyd.INF + def main(args: Array[String]): Unit = { + val len: Int = 3 + // initialize adjacency matrix + val adjMatrix = Array.ofDim[Int](len, len) + adjMatrix(0)(0) = 0 + adjMatrix(0)(1) = 1 + adjMatrix(0)(2) = INF + adjMatrix(1)(0) = 1 + adjMatrix(1)(1) = 0 + adjMatrix(1)(2) = 1 + adjMatrix(2)(0) = INF + adjMatrix(2)(1) = 1 + adjMatrix(2)(2) = 0 + + val res = Floyd.hopCount(adjMatrix) + for (i <- 0 to res.length - 1; j <- 0 to res(0).length - 1) { + println("Element " + i + j + " = " + res(i)(j)) + } + + + + + } +} diff --git a/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestStructSimi.scala b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestStructSimi.scala new file mode 100644 index 000000000..92e21b363 --- /dev/null +++ b/spark-on-angel/graph/src/main/scala/com/tencent/angel/graph/embedding/struc2vec/test/TestStructSimi.scala @@ -0,0 +1,36 @@ +package com.tencent.angel.graph.embedding.struc2vec.test + +import com.tencent.angel.graph.embedding.struc2vec._ + +object TestStructSimi { + val INF = Floyd.INF + def main(args:Array[String]): Unit ={ + val adjMatrix = Array(Array(0,1,INF),Array(1,0,1),Array(INF,1,0)) + val struct_simi = new StructureSimilarity(adjMatrix) + val diam = struct_simi.diam + val degrees = struct_simi.degrees + val hc = struct_simi.hopCountResult + val ring = struct_simi.getHopRingK(1,1) + println(diam) + for (i <- 0 to hc.length - 1; j <- 0 to hc(0).length - 1) { + println("Element " + i + j + " = " + hc(i)(j)) + } + + println(struct_simi.getHopRingK(0,0)(0)) + println(struct_simi.getHopRingK(1,0)(0)) + +// println(DTW.compute(struct_simi.getHopRingK(0,0),struct_simi.getHopRingK(0,0))) + +// for (i <- 0 to degrees.length - 1) +// println("Element " + i + " = " + degrees(i)) + +// for (i <- 0 to ring.length - 1) +// println("ring Element " + i + " = " + ring(i)) + +// println(ring.length) + + + + } + +}