Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 1242]complete implementation of struc2vec(local ver.) Zihan Li #1243

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions data/bc/mirrored_karate_club_network.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
0 1
0 2
0 3
0 4
0 5
0 6
0 7
0 8
0 10
0 11
0 12
0 13
0 17
0 19
0 21
0 31
0 34
1 2
1 3
1 7
1 13
1 17
1 19
1 21
1 30
2 3
2 7
2 8
2 9
2 13
2 27
2 28
2 32
3 7
3 12
3 13
4 6
4 10
5 6
5 10
5 16
6 16
8 30
8 32
8 33
9 33
13 33
14 32
14 33
15 32
15 33
18 32
18 33
19 33
20 32
20 33
22 32
22 33
23 25
23 27
23 29
23 32
23 33
24 25
24 27
24 31
25 31
26 29
26 33
27 33
28 31
28 33
29 32
29 33
30 32
30 33
31 32
31 33
32 33
34 35
34 36
34 37
34 38
34 39
34 40
34 41
34 42
34 44
34 45
34 46
34 47
34 51
34 53
34 55
34 65
35 36
35 37
35 41
35 47
35 51
35 53
35 55
35 64
36 37
36 41
36 42
36 43
36 47
36 61
36 62
36 66
37 41
37 46
37 47
38 40
38 44
39 40
39 44
39 50
40 50
42 64
42 66
42 67
43 67
47 67
48 66
48 67
49 66
49 67
52 66
52 67
53 67
54 66
54 67
56 66
56 67
57 59
57 61
57 63
57 66
57 67
58 59
58 61
58 65
59 65
60 63
60 67
61 67
62 65
62 67
63 66
63 67
64 66
64 67
65 66
65 67
66 67
Empty file added data/output/output1/_SUCCESS
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
20 32 31 24 25 24 27 24 25 31
13 2 32 2 0 12 0 5 16 6
19 1 21 1 2 9 33 8 2 8
21 1 19 0 17 1 3 1 2 8
4 10 0 8 0 13 0 5 16 5
15 32 31 32 14 33 8 32 2 1
16 5 6 4 10 4 0 5 16 6
22 33 22 32 14 33 18 32 15 33
28 2 28 33 8 2 1 19 33 28
29 33 31 24 25 24 27 24 25 31
25 24 27 2 28 31 33 26 29 33
11 0 17 0 2 27 33 13 2 13
30 32 30 32 29 26 29 26 29 33
32 15 33 27 2 27 33 26 29 33
27 23 33 23 27 24 27 33 15 33
0 4 0 31 25 24 27 2 8 32
24 31 28 31 25 31 28 31 25 24
14 33 23 25 23 27 24 25 31 24
33 9 2 7 1 0 13 3 13 1
23 27 24 31 25 31 28 31 25 31
1 17 0 21 1 30 32 2 28 31
6 0 11 0 8 33 8 30 32 31
17 0 13 3 1 30 32 15 33 32
3 13 3 12 3 1 3 12 0 19
7 0 11 0 10 5 6 16 6 16
12 0 4 6 16 5 6 4 10 0
8 2 9 33 15 33 13 2 28 31
9 33 31 32 23 29 26 33 32 22
18 33 31 32 23 29 23 29 33 32
31 28 2 1 30 8 30 1 3 2
26 33 32 33 9 2 13 0 2 9
10 4 6 16 6 5 0 19 1 0
5 0 7 2 7 0 6 5 16 5
2 0 21 1 0 6 16 6 0 12
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
20 33 20 32 20 32 23 25 23 27
13 0 7 0 11 0 31 24 27 23
19 33 20 33 18 33 19 0 17 0
21 1 21 1 3 13 2 32 23 27
4 10 0 12 0 21 1 7 0 13
15 33 14 32 2 7 2 32 15 32
16 6 16 6 5 0 10 5 0 13
22 33 20 33 18 32 23 25 31 0
28 33 14 32 2 7 2 32 15 32
29 26 33 13 33 28 2 9 2 0
25 23 25 23 32 33 20 32 8 30
11 0 7 0 10 5 16 6 4 0
30 1 21 0 12 3 1 3 0 13
32 23 25 23 32 30 1 3 0 13
27 33 15 33 19 33 20 33 18 33
0 19 33 28 33 9 33 20 32 15
24 27 23 32 20 32 29 23 33 30
14 33 30 32 8 2 27 24 27 2
33 32 22 32 8 2 7 2 8 30
23 27 23 25 24 27 24 31 32 15
1 30 1 3 12 0 8 30 33 20
6 4 0 31 33 23 33 14 33 14
17 1 13 1 13 1 13 33 19 33
3 2 13 33 32 31 25 31 24 27
7 2 13 1 7 2 9 33 30 1
12 0 19 33 14 32 8 30 32 22
8 32 22 32 31 28 33 23 25 24
9 33 8 0 8 32 31 24 31 28
18 32 22 32 31 28 31 24 31 28
31 25 24 27 23 32 31 24 25 24
26 33 32 31 32 2 27 2 27 33
10 4 0 17 1 17 0 12 3 0
5 0 4 0 19 33 26 29 26 29
2 27 24 25 23 29 23 33 26 29
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.tencent.angel.graph.embedding.struc2vec

object DTW {
def compute(a:Array[Int],b:Array[Int]):Double = {

// DTW(Dynamic Time Warping)
// a fundamental implementation based on dynamic programming
// dp[i][j] = Dist(i,j) + min{dp[i-1][j],dp[i][j-1],dp[i-1][j-1]}

if(a.length==0||b.length==0)
Double.NaN
else{
val dp: Array[Array[Double]] = Array.ofDim(a.length, b.length)

// set the boundary condition
// the first column, dp[i][0] = Dist(i,0)+ dp[i-1][0];

dp(0)(0) = dist(a(0),b(0))
for(i <- 1 until a.length)
dp(i)(0) = dp(i-1)(0)+dist(a(i),b(0))

// the first row, dp[0][j] = Dist(0,j)+ dp[0][j-1];

for (j <- 1 until b.length)
dp(0)(j) = dp(0)(j - 1) + dist(b(j), a(0))

// fill up the whole dp matrix from left to right, from top to bottom

for(i<-1 until a.length;j<- 1 until b.length)
dp(i)(j) = dist(a(i),b(j))+minAmongThree(dp(i-1)(j),dp(i)(j-1),dp(i-1)(j-1))

dp(a.length-1)(b.length-1)
}
}

def dist(a:Int,b:Int):Double = {
// Assume that a, b are both positive (the node degrees )
Math.max(a,b)/Math.min(a,b)-1
}

def minAmongThree(a:Double,b:Double,c:Double):Double = {
if(a<b){
Math.min(a,c)
}else
Math.min(b,c)
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.tencent.angel.graph.embedding.struc2vec

object Floyd {
val INF = Int.MaxValue
def hopCount(adj:Array[Array[Int]]):Array[Array[Int]] = {
// self implementation of hop-count demo based on Floyd algorithm
// Assume that adj is a square matrix
val len = adj.length
val result: Array[Array[Int]] = Array.ofDim(len, len)
adj.copyToArray(result)
for(k<- 0 until len;i <- 0 until len; j<- 0 until len )
if(!result(i)(k).equals(INF) && !result(k)(j).equals(INF))
result(i)(j) = Math.min(result(i)(j),result(i)(k)+result(k)(j))
result
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.tencent.angel.graph.embedding.struc2vec

import com.tencent.angel.graph.embedding.struc2vec.test.TestStructSimi.INF

object MainEnter {

// Test Structure similarity
// By computing the structure similarity of Barbell graph b(1,1)
// Input graph adjacency matrix = [[0,1,INF],[1,0,1],[INF,1,0]]

def main(args:Array[String]): Unit ={
val adjMatrix = Array(Array(0, 1, INF), Array(1, 0, 1), Array(INF, 1, 0))
val structureSimilarity = new StructureSimilarity(adjMatrix)
structureSimilarity.compute()
val result: Array[Array[Array[Double]]] = structureSimilarity.structSimi
println("Layer1: ")
for (i <- 0 to result.length - 1; j <- 0 to result(0).length - 1) {
print((i,j) + " = " + result(0)(i)(j)+" ")
}
println()
println("Layer2: ")
for (i <- 0 to result.length - 1; j <- 0 to result(1).length - 1) {
print( (i, j) + " = " + result(1)(i)(j) + " ")
}
println()
println("Layer3: ")
for (i <- 0 to result.length - 1; j <- 0 to result(2).length - 1) {
print((i, j) + " = " + result(2)(i)(j) + " ")
}
}




}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
**当前进展**
- 基于scala语言实现了论文前两部分:1.结构相似度计算的所有模块(包括DTW算法模块,Floyd算法模块以及相应的测试模块算法)2.多层网络构建的部分模块
- 待完成的为论文后续基于随机游走的采样的模块以及完成所有基于Spark编程并将功能模块嵌入
- 当前所完成的模块的入参(输入图)默认为邻接矩阵,由原始数据到此数据结构的代码仍有待完善
-
**Structure Similarity算法实现基本思路**:
- 输入图表示形式采用 adjacency matrix (|V| x | V|),其中直接相邻顶点间边的权设为1,不直接相邻设为 INF (Integer.MAX_VALUE)
- 方便处理,假设输入图为单个连通分量,且不同顶点间无multiple edges
- 使用Floyd算法获得以上输入图的hopCountResult矩阵,显然在此矩阵中,最大值(单个连通分量,max hop count != INF)为此图直径,第i行记录了顶点i到到其他所有顶点的 hop counts
- 一次遍历hopCountResult矩阵,获取图直径以及各顶点度数
- 基于层次计算的思想,自底向上计算得到图结构相似度,最后输出结果为三维矩阵 Integer[k][n][n] structSimi,显然structSimi[k][i][j] = fk(Nodei,Nodej)

**算法需要进行的优化**:
- Item1:无向图的adjacency matrix为对称矩阵,可以进行相应的矩阵压缩
- Item2:DTW算法可根据论文https://go.exlibris.link/35X6ykDp 优化为快速DTW计算
- Item3:其他代码细节的优化 .etc
-

**测试**
- 测试使用的图为 Barbell-Graph (1,1), 其对应邻接矩阵为:[[0,1,INF],[1,0,1],[INF,1,0]]
- 期望输出的相似度层次网络的矩阵(经手算验证)应为:[[[0,1,0],[1,0,1],[0,1,0]],[[0,3,0],[3,0,3],[0,3,0]],[[0,NaN,0],[NaN,NaN,NaN],[0,NaN,NaN]]]
- 相关具体输出结果见issue[1229]

------------更新于2022/9/11------------

**当前进展**
- 在第5、6周基本跑通local任务的基础上结合论文进行了进一步验证
- 最新的可视化结果表明算法实现基本正确
- 目前算法分布式版本任务除“Struc2VecPartition”和“Struc2VecPSModel”的实现外已基本完成
- 在Zachary’s Karate Network(第5&6周的测试输入图)的基础上构建Mirrored Zachary’s Karate Network(基于论文)
- 其中两个Zachary’s Karate Network由0号顶点与34号(0+33)顶点间的边所连结,即所有属于[0,33]号的顶点分别对应其顶点号+34的顶点所对应(例如:0 对应 34; 14 对应 48 ...)
- 输入Mirrored Zachary’s Karate network得到各个顶点的embedding,基于PCA得到压缩后的各顶点的二维向量,并使用Origin2021对得到的二维向量进行可视化

**struc2vec算法[local]实现基本思路**:
- 获得rawEdges的RDD后,使用collect()函数,建立图的完整邻接矩阵(local版本不进行切分)
- 基于类StructureSimilarity计算得到structure similarity distance network
- 建立multilayer weighted graph RDD
- 创建cross layer weight(level up weights)
- 创建the alias table
- sample node context,并注意如下实现细节:
- scala中Double.NaN不等于任何数(包括其自身),判断时应该使用 “.isNaN()”
- 构建层间跳跃时,忽视某顶点的高一层对应顶点可能其邻点集合为空,在实现时判断逻辑优化为如下:
- 在构建多层网络时在RDD中直接过滤掉邻点集为空的元素
- 在进行跨层判断时,先判断该顶点是否存在上层对应点,若不存在,则跨层只向下


**测试**

1. 输入:
- Mirrored Zachary’s Karate network(68 nodes 157 undirected edges)
- walk length: 15
- epochNum(训练轮数): 5
- vector size (word2vec) : 10
- window size (word2vec) : 3
- stay(停留在当前层的概率): 0.5

2. 期望输出:
- 在可视化散点图中所有属于[0,33]号的顶点分别与其对应其顶点号+34的顶点应该尽可能靠近(结构相似度最高)
- 相关具体输出结果见issue[1242]


**未来的工作**:
- Item1:继续进行算法分布式版本实现的工作,特别是实现“Struc2VecPartition”和“Struc2VecPSModel”
- Item2:调整算法的超参数,优化得到的结果
- Item3:在算法整体实现完成后的基础上根据论文完善对实现代码细节的优化
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.tencent.angel.graph.embedding.struc2vec
import scala.collection.mutable.ArrayBuffer

class StructureSimilarity(adjMatrix:Array[Array[Int]]) {
val INF = Floyd.INF
val n = adjMatrix.length
val degrees:Array[Int] = Array.ofDim(n)
val hopCountResult = Floyd.hopCount(adjMatrix)
val diam:Int = initDegreesAndDiam()
val structSimi:Array[Array[Array[Double]]] = Array.ofDim(diam+1,n,n)

def initDegreesAndDiam(): Int ={
var diam = 0
for(i<- 0 until n;j<-0 until n){
if(hopCountResult(i)(j) == 1)
degrees(i)+=1

// Considering that multiple connected component may exist

if(hopCountResult(i)(j) != INF)
diam = Math.max(diam,hopCountResult(i)(j))
}
diam
}

def compute() ={
for(k<- 0 to diam)
computeLayerK(k)
}

def computeLayerK(k:Int): Unit ={
if(k<0 || k>diam){

}else if(k==0){
for(i<- 0 until n; j<- 0 until n)
structSimi(0)(i)(j) = DTW.compute(getHopRingK(i,0),getHopRingK(j,0))
}else{
for(i<-0 until n; j<- 0 until n){
// notice that NaN == NaN (false!)
if (structSimi(k-1)(i)(j).isNaN)
structSimi(k)(i)(j) = Double.NaN
else{
val temp: Double = DTW.compute(getHopRingK(i,k),getHopRingK(j,k))
if(temp.isNaN)
structSimi(k)(i)(j) = Double.NaN
else
structSimi(k)(i)(j) = structSimi(k-1)(i)(j)+temp
}
}
}
}

def getHopRingK(node:Int,k:Int): Array[Int] = {
// return the ordered degree array s(R_k(node))
if (node < 0 || node > n - 1 || k < 0 || k > diam)
Array[Int]()
else{
val result = ArrayBuffer[Int]()
for(j<- 0 until n)
if(hopCountResult(node)(j)==k)
result.append(degrees(j))
result.sorted.toArray
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
package com.tencent.angel.graph.embedding.struc2vec.test

import breeze.stats.distributions.AliasTable
import com.tencent.angel.graph.data.neighbor.NeighborDataOps
import com.tencent.angel.graph.embedding.struc2vec.StructureSimilarity
import com.tencent.angel.graph.embedding.struc2vec.test.Struc2Vec.{addDstAndWeights, addSrcAndLayer, calcAverageLayerWeights, calcCrossLayerWeights, sumLayerWeights,process}
import com.tencent.angel.graph.utils.{GraphIO, Stats}
import com.tencent.angel.graph.utils.params.{HasArrayBoundsPath, HasBalancePartitionPercent, HasBatchSize, HasDstNodeIdCol, HasEpochNum, HasIsWeighted, HasMaxIteration, HasNeedReplicaEdge, HasOutputCoreIdCol, HasOutputNodeIdCol, HasPSPartitionNum, HasPartitionNum, HasSrcNodeIdCol, HasStorageLevel, HasUseBalancePartition, HasUseEdgeBalancePartition, HasWalkLength, HasWeightCol}
import com.tencent.angel.spark.context.PSContext
import org.apache.spark.{SPARK_BRANCH, SparkContext}
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.types.{StringType, StructField, StructType, _}
import org.apache.spark.storage.StorageLevel

import java.util.Random
import scala.collection.mutable.ArrayBuffer


class Struc2Vec(override val uid: String) extends Transformer
with HasSrcNodeIdCol with HasDstNodeIdCol with HasOutputNodeIdCol with HasOutputCoreIdCol
with HasStorageLevel with HasPartitionNum with HasPSPartitionNum with HasMaxIteration
with HasBatchSize with HasArrayBoundsPath with HasIsWeighted with HasWeightCol with HasUseBalancePartition
with HasNeedReplicaEdge with HasUseEdgeBalancePartition with HasWalkLength with HasEpochNum with HasBalancePartitionPercent {
private var output: String = _

def this() = this(Identifiable.randomUID("Struc2Vec"))

def setOutputDir(in: String): Unit = {
output = in
}

override def transform(dataset: Dataset[_]): DataFrame = {

//create origin edges RDD and data preprocessing

val rawEdges = NeighborDataOps.loadEdgesWithWeight(dataset, $(srcNodeIdCol), $(dstNodeIdCol), $(weightCol), $(isWeighted), $(needReplicaEdge), true, false, false)
rawEdges.repartition($(partitionNum)).persist(StorageLevel.DISK_ONLY)
val (minId, maxId, numEdges) = Stats.summarizeWithWeight(rawEdges)
println(s"minId=$minId maxId=$maxId numEdges=$numEdges level=${$(storageLevel)}")

val edges = rawEdges.map { case (src, dst, w) => (src, (dst, w)) }
val rmWeight = rawEdges.map{case (src, dst, w) => (src, dst)}

// create adjacency table
val adjTable = rmWeight.groupByKey()

// create adjacency matrix
val len:Int = adjTable.count().toInt
val adjArray = adjTable.collect()
val adjMatrix: Array[Array[Int]] = Array.ofDim(len,len)

// initialize adjMatrix
for (i <- 0 until len; j<- 0 until len)
if(i==j)
adjMatrix(i)(j) = 0
else
adjMatrix(i)(j) = Int.MaxValue

for(item <- adjArray){
for(j<-item._2)
adjMatrix(item._1.toInt)(j.toInt) = 1
}

val structureSimilarity = new StructureSimilarity(adjMatrix)

// val diam = structureSimilarity.diam
// println(s"diam = $diam")
// val hop = structureSimilarity.hopCountResult
// val degrees = structureSimilarity.degrees

structureSimilarity.compute()

// get the structure similarity distance network
val simi = structureSimilarity.structSimi
val diam = structureSimilarity.diam

// create multilayer weighted graph RDD
val multilayerRDD = adjTable.map(x => x._1.toInt).sortBy(x => x).flatMap(x => addSrcAndLayer(x,simi.length))
val multilayerGraph = multilayerRDD.map(x => addDstAndWeights(x._1,x._2,simi)).filter(x=>x._2.length!=0)

// create layer average weight array
val layerAverageWeight = multilayerGraph.map(x=>sumLayerWeights(x)).groupByKey().map(x=>calcAverageLayerWeights(x)).sortByKey().collect()

// create cross layer weight(level up weights)
val crossLayerWeights = multilayerGraph.map(x=>calcCrossLayerWeights(x,layerAverageWeight))
val crossArray = crossLayerWeights.collect()

// create the alias table
val aliasTable = multilayerGraph
.mapPartitionsWithIndex { case (partId, iter) =>
Struc2Vec.calcAliasTable(partId, iter)
}
val aliasArray = aliasTable.collect()

val temp = aliasTable.filter(x => x._1==0&&x._2==0).collect()(0)
println(s"temp: ${temp._3.length}")


// generate context for nodes
// create the raw sample path rdd, (epochNum,src)
val rawPathRDD = adjTable.map(x => x._1.toInt).sortBy(x => x).flatMap(x => addSrcAndLayer(x,$(epochNum)))
val pathRDD = rawPathRDD.map{case (epochNum,src) => (src,epochNum) }.map(x => process(x,aliasArray,crossArray,$(walkLength),diam))

val ele1 = multilayerGraph.count()
// println(s"crosslen = ${crossArray.length}, ele1len = $ele1, sum = ${emp1.sum}, len = ${emp2.length}")

// val item = simi(1)(33)(0)
// println(s"ele1 = $ele1 ele2 = $ele2 item = $item")

// println("Layer1: ")
// for (i <- 0 to simi(0).length - 1; j <- 0 to simi(0)(0).length - 1) {
// print((i, j) + " = " + simi(0)(i)(j) + " ")
// }
// println("Layer5: ")
// for (i <- 0 to simi(5).length - 1; j <- 0 to simi(0)(0).length - 1) {
// print((i, j) + " = " + simi(5)(i)(j).isNaN+ " ")
// }



// val ele = aliasTable.collect()(203)
// println(s"acc cnt = ${ele._4.length}")
// for (i <- 0 to ele.length - 1) {
// println("no."+ i + " = layer: " + ele(i)._1+ " src: "+ele(i)._2+" dst: "+ele(i)._3(0)+" wei: "+ele(i)._4(0)+" acc: "+ele(i)._5(0))
// }

// println("Degrees: ")
// for (i <- 0 to degrees.length - 1) {
// print("node"+ i + " = " + degrees(i)+ " ")
// }

// val fn =adjTable.take(17)(16)._2
// val temp = adjArray(0)._2
// val cnt = adjTable.count()
// val ele = adjMatrix(32)(33)
// println(s"get = $ele count = $cnt")

dataset.sparkSession.createDataFrame(pathRDD.map(x => Row(x._1,x._2,x._3)), transformSchema(dataset.schema))

}


override def transformSchema(schema: StructType): StructType = {
StructType(Seq(StructField("src",IntegerType, nullable = false),StructField("epochNum",IntegerType,nullable = false),StructField("path",ArrayType(StringType),nullable = false)))
}


override def copy(extra: ParamMap): Transformer = defaultCopy(extra)



}

object Struc2Vec {
def calcAliasTable(partId: Int, iter: Iterator[((Int,Int), Array[(Int, Double)])]): Iterator[(Int,Int, Array[Int], Array[Double], Array[Int])] = {
iter.map { case ((layer,src), neighbors) =>
val (events, weights) = neighbors.unzip
val weightsSum = weights.sum
val len = weights.length
val areaRatio = weights.map(_ / weightsSum * len)
val (accept, alias) = createAliasTable(areaRatio)
(layer, src, events, accept, alias)
}
}

def createAliasTable(areaRatio: Array[Double]): (Array[Double], Array[Int]) = {
val len = areaRatio.length
val small = ArrayBuffer[Int]()
val large = ArrayBuffer[Int]()
val accept = Array.fill(len)(0d)
val alias = Array.fill(len)(0)

for (idx <- areaRatio.indices) {
// note that both accept and alias carries the indices of corresponding event indices, not the corresponding node!
if (areaRatio(idx) < 1.0) small.append(idx) else large.append(idx)
}
while (small.nonEmpty && large.nonEmpty) {
val smallIndex = small.remove(small.size - 1)
val largeIndex = large.remove(large.size - 1)
accept(smallIndex) = areaRatio(smallIndex)
alias(smallIndex) = largeIndex
areaRatio(largeIndex) = areaRatio(largeIndex) - (1 - areaRatio(smallIndex))
if (areaRatio(largeIndex) < 1.0) small.append(largeIndex) else large.append(largeIndex)
}
while (small.nonEmpty) {
val smallIndex = small.remove(small.size - 1)
accept(smallIndex) = 1
}

while (large.nonEmpty) {
val largeIndex = large.remove(large.size - 1)
accept(largeIndex) = 1
}
(accept, alias)
}

def addSrcAndLayer(x: Int, layers: Int): Array[(Int, Int)] = {
val temp = ArrayBuffer[(Int, Int)]()
for (i: Int <- 0 until layers)
temp.append((i, x))
temp.toArray
}

def addDstAndWeights(layer:Int,src:Int,simi:Array[Array[Array[Double]]]):((Int,Int),Array[(Int,Double)])={
val temp = ArrayBuffer[(Int,Double)]()
for(dst<- 0 until simi(layer)(src).length){
// here dst != src
if(!simi(layer)(src)(dst).isNaN && dst!=src)
temp.append((dst,Math.exp(-simi(layer)(src)(dst))))
}
((layer,src),temp.toArray)
}

def sumLayerWeights(x:((Int,Int),Array[(Int,Double)])):(Int,(Double,Int))={
val (dst,weights) = x._2.unzip
val len = weights.length
(x._1._1,(weights.sum,len))
}

def calcAverageLayerWeights(x:(Int,Iterable[(Double,Int)])): (Int,Double)={
val (sum,cnt) = x._2.unzip
(x._1,sum.sum/cnt.sum)
}

def calcCrossLayerWeights(x:((Int,Int),Array[(Int,Double)]),layerAverageWeights:Array[(Int,Double)]):((Int,Int),Double)={
val (dst,w) = x._2.unzip
val temp = w.filter(y => y>layerAverageWeights(x._1._1)._2)
(x._1,Math.log(Math.E+temp.length))
}

def process(x: (Int, Int), aliasArray: Array[(Int, Int, Array[Int], Array[Double], Array[Int])], cross: Array[((Int, Int), Double)], walkLen: Int, diam:Int): (Int,Int,Array[String]) = {
// ((Int,Int,Array[Int]))
// initialize path
val path = new ArrayBuffer[Int]()
val (src, epochNum) = x
var curLayer = 0
path.append(src)

// set the stay probability
val stay = 0.5

// start a random walk at node $src at layer 0 during $epochNUm
while (path.length < walkLen) {
val curNode: Int = path(path.length - 1)
// decide whether stay in current layer
val ifStay = new Random().nextDouble()
if (ifStay < stay) {
// stay in this layer
val temp = aliasArray.filter(x => x._1 == curLayer && x._2 == curNode)(0)
val events = temp._3
val accepts = temp._4
val alias = temp._5
val index = new Random().nextInt(events.length)
val isAccept = new Random().nextDouble()
if(isAccept<accepts(index))
path.append(events(index))
else
// note that alias carries the indices of corresponding event indices, not the corresponding node!
path.append(events(alias(index)))
}else{
// change layer
if(curLayer==0) {
// layer 0 only levels up
curLayer += 1
}else if(curLayer == diam){
// layer diam only lowers down
curLayer -= 1
}else{
// decide up or down
val up = cross.filter(x => x._1._1==curLayer&&x._1._2==curNode)(0)._2
// since node in higher layer
if(cross.filter(x => x._1._1==curLayer+1 && x._1._2==curNode).length!=0){
val ifUp = new Random().nextDouble()
if (ifUp < (up / (up + 1)))
curLayer += 1
else
curLayer -= 1
}else
// not go to upper corresponding node with no neighbor
curLayer -=1
}
}
}
(src,epochNum,path.toArray.mkString(" ").split(" "))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package com.tencent.angel.graph.embedding.struc2vec.test

import com.tencent.angel.conf.AngelConf
import com.tencent.angel.graph.embedding.struc2vec.test.Struc2Vec
import com.tencent.angel.graph.utils.GraphIO
import com.tencent.angel.spark.context.PSContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.feature.{Word2Vec,PCA}
import scala.collection.mutable.ArrayBuffer

object Struc2VecExample {
def main(args: Array[String]): Unit = {
val input = "data/bc/mirrored_karate_club_network.txt"
val storageLevel = StorageLevel.fromString("MEMORY_ONLY")
val batchSize = 10
val output = "data/output/output1"
val srcIndex = 0
val dstIndex = 1
val weightIndex = 2
val psPartitionNum = 1
val partitionNum = 1
val useEdgeBalancePartition = false
val isWeighted = false
val needReplicateEdge = true

val sep = " "
val walkLength = 15


start()

val struc2Vec = new Struc2Vec()
.setStorageLevel(storageLevel)
.setPSPartitionNum(psPartitionNum)
.setSrcNodeIdCol("src")
.setDstNodeIdCol("dst")
.setWeightCol("weight")
.setBatchSize(batchSize)
.setWalkLength(walkLength)
.setPartitionNum(partitionNum)
.setIsWeighted(isWeighted)
.setNeedReplicaEdge(needReplicateEdge)
.setUseEdgeBalancePartition(useEdgeBalancePartition)
.setEpochNum(5)

struc2Vec.setOutputDir(output)
val df = GraphIO.load(input, isWeighted = isWeighted, srcIndex, dstIndex, weightIndex, sep = sep)
val mapping = struc2Vec.transform(df)

mapping.show()

val path = mapping.select("path")

// get node embeddings by word2vec
val word2Vec = new Word2Vec()
.setInputCol("path")
.setOutputCol("result")
.setVectorSize(10)
.setMinCount(0)
.setWindowSize(3)

val model = word2Vec.fit(mapping)
val result = model.transform(mapping)
val vec = model.getVectors


// visulization by pca
val pca = new PCA()
.setInputCol("vector")
.setOutputCol("pcaFeatures")
.setK(2)
.fit(vec)

val result2 = pca.transform(vec)

result2.show(68,false)

result2.select("word","pcaFeatures").show(68,false)

// result2.select("word","pcaFeatures").rdd.saveAsTextFile("C:\\Users\\Lzh\\Desktop\\result.txt")
// vec.show()
// result.select("result").take(3).foreach(println)
// println(s"vec(0) = ${vecArr(0)}")



stop()
}

def start(mode: String = "local[4]"): Unit = {
val conf = new SparkConf()
conf.setMaster(mode)
conf.setAppName("Struc2Vec")
conf.set(AngelConf.ANGEL_PSAGENT_UPDATE_SPLIT_ADAPTION_ENABLE, "false")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
sc.setCheckpointDir("data/cp")
//PSContext.getOrCreate(sc)
}


def stop(): Unit = {
SparkContext.getOrCreate().stop()
}

}



Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.tencent.angel.graph.embedding.struc2vec.test

import com.tencent.angel.graph.embedding.struc2vec.DTW

object TestDTW {
def main(args: Array[String]): Unit = {
// val a = Array(1, 1, 1, 1, 2, 2, 3, 3, 4, 3, 2, 2, 1, 1, 1, 1)
// val b = Array(1, 1, 2, 2, 3, 3, 4, 4, 4, 4, 3, 3, 2, 2, 1, 1)
// println(DTW.compute(a, b))
println(DTW.compute(Array(1, 1), Array(2)))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.tencent.angel.graph.embedding.struc2vec.test
import com.tencent.angel.graph.embedding.struc2vec.Floyd


object TestFloyd {
val INF = Floyd.INF
def main(args: Array[String]): Unit = {
val len: Int = 3
// initialize adjacency matrix
val adjMatrix = Array.ofDim[Int](len, len)
adjMatrix(0)(0) = 0
adjMatrix(0)(1) = 1
adjMatrix(0)(2) = INF
adjMatrix(1)(0) = 1
adjMatrix(1)(1) = 0
adjMatrix(1)(2) = 1
adjMatrix(2)(0) = INF
adjMatrix(2)(1) = 1
adjMatrix(2)(2) = 0

val res = Floyd.hopCount(adjMatrix)
for (i <- 0 to res.length - 1; j <- 0 to res(0).length - 1) {
println("Element " + i + j + " = " + res(i)(j))
}




}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.tencent.angel.graph.embedding.struc2vec.test

import com.tencent.angel.graph.embedding.struc2vec._

object TestStructSimi {
val INF = Floyd.INF
def main(args:Array[String]): Unit ={
val adjMatrix = Array(Array(0,1,INF),Array(1,0,1),Array(INF,1,0))
val struct_simi = new StructureSimilarity(adjMatrix)
val diam = struct_simi.diam
val degrees = struct_simi.degrees
val hc = struct_simi.hopCountResult
val ring = struct_simi.getHopRingK(1,1)
println(diam)
for (i <- 0 to hc.length - 1; j <- 0 to hc(0).length - 1) {
println("Element " + i + j + " = " + hc(i)(j))
}

println(struct_simi.getHopRingK(0,0)(0))
println(struct_simi.getHopRingK(1,0)(0))

// println(DTW.compute(struct_simi.getHopRingK(0,0),struct_simi.getHopRingK(0,0)))

// for (i <- 0 to degrees.length - 1)
// println("Element " + i + " = " + degrees(i))

// for (i <- 0 to ring.length - 1)
// println("ring Element " + i + " = " + ring(i))

// println(ring.length)



}

}