Skip to content

Commit

Permalink
Merge branch 'improve_latency' into develop
Browse files Browse the repository at this point in the history
merging the latency improvement into develop
  • Loading branch information
DumbIndividual committed Sep 16, 2024
2 parents dfe77db + f13d89f commit cab7540
Showing 1 changed file with 25 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ public void setTableSet(TableSet streamGraphTableSet) {
protected TableSet performGrouping() {
getTableEnv().createTemporaryView(TABLE_VERTICES, tableSet.getVertices());
getTableEnv().createTemporaryView(TABLE_EDGES, tableSet.getEdges());
/*
return furtherPreparedVertices
.window(Tumble.over(windowConfig.getWindowExpression()).on($(FIELD_VERTEX_EVENT_TIME)).as(FIELD_SUPER_VERTEX_EVENT_WINDOW))
.groupBy(buildVertexGroupExpressions())
.select(buildVertexProjectExpressions());
*/

Table preparedEdges = deduplicateEdges();

// 1. Deduplicate vertices
// Returns: | vertex_event_time | vertex_id | vertex_label | vertex_properties |
Expand All @@ -122,7 +130,7 @@ protected TableSet performGrouping() {

// 6. Assign super vertices to edges and replace source_id and target_id with the ids of the super vertices
// returns: | edge_id | event_time | source_id | target_id | edge_label | edge_properties
Table edgesWithSuperVertices = createEdgesWithExpandedVertices(tableSet.getEdges(), expandedVertices);
Table edgesWithSuperVertices = createEdgesWithExpandedVertices(preparedEdges, expandedVertices);

// 7. Write grouping or aggregating properties in own column, extract from edge_properties column
// return: | edge_id | event_time | source_id | target_id | [edge_label] | [prop_grouping | ..] [prop_agg | ... ]
Expand Down Expand Up @@ -153,6 +161,20 @@ public Table deduplicateVertices() {
);
}

public Table deduplicateEdges() {
return this.getTableEnv().sqlQuery(
"SELECT " +
FIELD_EDGE_ID + " as " + FIELD_EDGE_ID + ", " +
FIELD_EDGE_LABEL + " as " + FIELD_EDGE_LABEL + ", " +
FIELD_EDGE_PROPERTIES + " as " + FIELD_EDGE_PROPERTIES + ", " +
"window_time as event_time, target_id as target_id, source_id as source_id " +
"FROM TABLE ( TUMBLE ( TABLE " + this.tableSet.getEdges() + ", " +
"DESCRIPTOR(" + FIELD_EVENT_TIME + "), " + windowConfig.getSqlApiExpression() + ")) " +
"GROUP BY window_time, " + FIELD_EDGE_ID + ", " + FIELD_EDGE_LABEL + ", " +
FIELD_EDGE_PROPERTIES + ", window_start, window_end, source_id, target_id"
);
}

public Table enhanceVerticesByPropertyColumns(Table preparedVertices) {
return preparedVertices
.select(buildVertexGroupProjectExpressions());
Expand Down Expand Up @@ -222,19 +244,15 @@ public Table createEdgesWithExpandedVertices(Table edges, Table expandedVertices
$(FIELD_EVENT_TIME).as(vertexTargetEventTime)))
.where(
$(FIELD_TARGET_ID).isEqual($(vertexTargetId))
.and($(FIELD_EVENT_TIME).isLessOrEqual($(vertexTargetEventTime)))
.and($(FIELD_EVENT_TIME).isGreater($(vertexTargetEventTime).minus(windowConfig.getWindowExpression()))))

.and($(FIELD_EVENT_TIME).isLessOrEqual($(FIELD_EVENT_TIME))))
.join(
expandedVertices.select(
$(FIELD_VERTEX_ID).as(vertexSourceId),
$(FIELD_SUPER_VERTEX_ID).as(superVertexSourceId),
$(FIELD_EVENT_TIME).as(vertexSourceEventTime)))
.where(
$(FIELD_SOURCE_ID).isEqual($(vertexSourceId))
.and($(FIELD_EVENT_TIME).isLessOrEqual($(vertexSourceEventTime)))
.and($(FIELD_EVENT_TIME).isGreater($(vertexSourceEventTime).minus(windowConfig.getWindowExpression()))))

.and($(FIELD_EVENT_TIME).isLessOrEqual($(FIELD_EVENT_TIME))))
.select(
$(FIELD_EDGE_ID),
$(FIELD_EVENT_TIME),
Expand Down

0 comments on commit cab7540

Please sign in to comment.