Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimizied soft delete relationship query by batching #468

Merged
merged 16 commits into from
Nov 25, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@
import com.linkedin.metadata.validator.RelationshipValidator;
import io.ebean.EbeanServer;
import io.ebean.SqlUpdate;
import io.ebean.Transaction;
import io.ebean.annotation.Transactional;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;

import static com.linkedin.metadata.dao.utils.ModelUtils.*;

@Slf4j
public class EbeanLocalRelationshipWriterDAO extends BaseGraphWriterDAO {
private static final String DEFAULT_ACTOR = "urn:li:principal:UNKNOWN";
private final EbeanServer _server;
Expand All @@ -36,6 +39,10 @@ private static class CommonColumnName {
private static final String LAST_MODIFIED_ON = "lastmodifiedon";
private static final String LAST_MODIFIED_BY = "lastmodifiedby";
}
private static final int BATCH_SIZE = 10000; // Process rows in batches of 10,000
private static final String LIMIT = " LIMIT ";
@Getter
private int batchCount = 0; //Only for unit test purpose
rakhiagr marked this conversation as resolved.
Show resolved Hide resolved

public EbeanLocalRelationshipWriterDAO(EbeanServer server) {
_server = server;
Expand Down Expand Up @@ -77,15 +84,42 @@ public void clearRelationshipsByEntity(@Nonnull Urn urn,
return;
}
RelationshipValidator.validateRelationshipSchema(relationshipClass);
SqlUpdate deletionSQL = _server.createSqlUpdate(SQLStatementUtils.deleteLocalRelationshipSQL(
String deletionQuery = SQLStatementUtils.deleteLocalRelationshipSQL(
isTestMode ? SQLSchemaUtils.getTestRelationshipTableName(relationshipClass)
: SQLSchemaUtils.getRelationshipTableName(relationshipClass), removalOption));
: SQLSchemaUtils.getRelationshipTableName(relationshipClass), removalOption) + LIMIT + BATCH_SIZE;
SqlUpdate deletionSQL = _server.createSqlUpdate(deletionQuery);
if (removalOption == RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE) {
deletionSQL.setParameter(CommonColumnName.SOURCE, urn.toString());
} else if (removalOption == RemovalOption.REMOVE_ALL_EDGES_TO_DESTINATION) {
deletionSQL.setParameter(CommonColumnName.DESTINATION, urn.toString());
}
deletionSQL.execute();
batchCount = 0;
while (true) {
rakhiagr marked this conversation as resolved.
Show resolved Hide resolved
try (Transaction transaction = _server.beginTransaction()) {
rakhiagr marked this conversation as resolved.
Show resolved Hide resolved
int rowsAffected = deletionSQL.execute();
batchCount++;
// Commit the transaction for this batch
transaction.commit();
log.info("Deleted {} rows in batch {}", rowsAffected, batchCount);
rakhiagr marked this conversation as resolved.
Show resolved Hide resolved

if (rowsAffected < BATCH_SIZE) {
// Exit loop if fewer than BATCH_SIZE rows were affected, indicating all rows are processed
break;
}

// Sleep for 1 millisecond to reduce load
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore interrupted status
throw new RuntimeException("Batch deletion interrupted", e);
rakhiagr marked this conversation as resolved.
Show resolved Hide resolved
}
} catch (Exception e) {
log.error("Error while executing batch deletion after {} batches", batchCount, e);
throw new RuntimeException("Batch deletion failed", e);
}
}
log.info("Cleared relationships in {} batches", batchCount);
rakhiagr marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,30 @@ public void testClearRelationshipsByEntityUrn() throws URISyntaxException {
_server.execute(Ebean.createSqlUpdate("truncate metadata_relationship_pairswith"));
}

@Test
public void testClearRelationshipsByEntityUrnWithBatching() throws URISyntaxException {
// Insert a large number of relationships to trigger batch processing
for (int i = 0; i < 10001; i++) {
_server.execute(Ebean.createSqlUpdate(insertRelationships("metadata_relationship_pairswith", "urn:li:bar:123",
"bar", "urn:li:foo:" + i, "foo")));
}

BarUrn barUrn = BarUrn.createFromString("urn:li:bar:123");
// Before processing
List<SqlRow> before = _server.createSqlQuery("select * from metadata_relationship_pairswith where deleted_ts is null").findList();
assertEquals(before.size(), 10001);

_localRelationshipWriterDAO.clearRelationshipsByEntity(barUrn, PairsWith.class,
BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE, false);

// After processing verification
List<SqlRow> all = _server.createSqlQuery("select * from metadata_relationship_pairswith where deleted_ts is null").findList();
assertEquals(all.size(), 0); // Total number of edges is 0
assertEquals(_localRelationshipWriterDAO.getBatchCount(), 2); //expect 2 batches
// Clean up
_server.execute(Ebean.createSqlUpdate("truncate metadata_relationship_pairswith"));
}

@Test
public void testRemoveRelationships() throws URISyntaxException {
BarUrn barUrn = BarUrn.createFromString("urn:li:bar:123");
Expand Down Expand Up @@ -371,7 +395,7 @@ public void testRemoveRelationships() throws URISyntaxException {

private String insertRelationships(String table, String sourceUrn, String sourceType, String destinationUrn, String destinationType) {
String insertTemplate = "INSERT INTO %s (metadata, source, source_type, destination, destination_type, lastmodifiedon, lastmodifiedby)"
+ " VALUES ('{\"metadata\": true}', '%s', '%s', '%s', '%s', '1970-01-01 00:00:01', 'unknown')";
+ " VALUES ('{\"metadata\": true}', '%s', '%s', '%s', '%s', CURRENT_TIMESTAMP, 'unknown')";
return String.format(insertTemplate, table, sourceUrn, sourceType, destinationUrn, destinationType);
}
}
Loading