Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimizied soft delete relationship query by batching #468

Merged
merged 16 commits into from
Nov 25, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.exception.RetryLimitReached;
import com.linkedin.metadata.dao.internal.BaseGraphWriterDAO;
import com.linkedin.metadata.dao.utils.GraphUtils;
import com.linkedin.metadata.dao.utils.RecordUtils;
Expand All @@ -11,17 +12,21 @@
import com.linkedin.metadata.validator.RelationshipValidator;
import io.ebean.EbeanServer;
import io.ebean.SqlUpdate;
import io.ebean.Transaction;
import io.ebean.annotation.Transactional;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.metadata.dao.utils.ModelUtils.*;

@Slf4j
public class EbeanLocalRelationshipWriterDAO extends BaseGraphWriterDAO {
private static final String DEFAULT_ACTOR = "urn:li:principal:UNKNOWN";
private final EbeanServer _server;
Expand All @@ -36,6 +41,11 @@ private static class CommonColumnName {
private static final String LAST_MODIFIED_ON = "lastmodifiedon";
private static final String LAST_MODIFIED_BY = "lastmodifiedby";
}
private static final int BATCH_SIZE = 10000; // Process rows in batches of 10,000
private static final int MAX_BATCHES = 1000; // Maximum number of batches to process
private static final String LIMIT = " LIMIT ";
@Getter
private int batchCount = 0;

public EbeanLocalRelationshipWriterDAO(EbeanServer server) {
_server = server;
Expand All @@ -61,20 +71,61 @@ public void processLocalRelationshipUpdates(@Nonnull Urn urn,

/**
* This method clears all the relationships from a source entity urn using REMOVE_ALL_EDGES_FROM_SOURCE.
* @param urn entity urn could be either source or destination, depends on the RemovalOption
* @param relationshipClass relationship that needs to be cleared
* @param isTestMode whether to use test schema
*
* @param urn entity urn could be either source or destination, depends on the RemovalOption
* @param relationshipClass relationship that needs to be cleared
* @param isTestMode whether to use test schema
*/
public void clearRelationshipsByEntity(@Nonnull Urn urn,
@Nonnull Class<? extends RecordTemplate> relationshipClass, boolean isTestMode) {
@Nonnull Class<? extends RecordTemplate> relationshipClass, boolean isTestMode) {
RelationshipValidator.validateRelationshipSchema(relationshipClass, isRelationshipInV2(relationshipClass));
SqlUpdate deletionSQL = _server.createSqlUpdate(SQLStatementUtils.deleteLocalRelationshipSQL(
String deletionQuery = SQLStatementUtils.deleteLocalRelationshipSQL(
isTestMode ? SQLSchemaUtils.getTestRelationshipTableName(relationshipClass)
: SQLSchemaUtils.getRelationshipTableName(relationshipClass), RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE));
: SQLSchemaUtils.getRelationshipTableName(relationshipClass), RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE) + LIMIT + BATCH_SIZE;
SqlUpdate deletionSQL = _server.createSqlUpdate(deletionQuery);
deletionSQL.setParameter(CommonColumnName.SOURCE, urn.toString());
deletionSQL.execute();
batchCount = 0;
while (batchCount < MAX_BATCHES) {
try {
// Use the runInTransactionWithRetry method to handle retries in case of transaction failures
int rowsAffected = runInTransactionWithRetry(deletionSQL::execute, 3); // Retry up to 3 times in case of transient failures
batchCount++;

if (log.isDebugEnabled()) {
log.debug("Deleted {} rows in batch {}", rowsAffected, batchCount);
}

if (rowsAffected < BATCH_SIZE) {
// Exit loop if fewer than BATCH_SIZE rows were affected, indicating all rows are processed
break;
}

// Sleep for 1 millisecond to reduce load
Thread.sleep(1);
} catch (RetryLimitReached e) {
log.error("Error while executing batch deletion after {} batches and retries", batchCount, e);
throw new RuntimeException("Batch deletion failed due to retry limit", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore interrupted status
throw new RuntimeException("Batch deletion interrupted", e);
} catch (Exception e) {
log.error("Error while executing batch deletion after {} batches", batchCount, e);
throw new RuntimeException("Batch deletion failed", e);
}
}

if (batchCount >= MAX_BATCHES) {
log.warn(
"Reached maximum batch count of {}, consider increasing MAX_BATCH_COUNT or debugging the deletion logic.",
MAX_BATCHES);
}

if (log.isDebugEnabled()) {
log.info("Cleared relationships in {} batches", batchCount);
}
}


/**
* Persist the given list of relationships to the local relationship using REMOVE_ALL_EDGES_FROM_SOURCE.
* @param relationships the list of relationships to be persisted
Expand Down Expand Up @@ -186,4 +237,26 @@ private <RELATIONSHIP extends RecordTemplate> void removeRelationshipsBySource(@
deletionSQL.setParameter(CommonColumnName.SOURCE, source.toString());
deletionSQL.execute();
}

@Nonnull
protected <T> T runInTransactionWithRetry(@Nonnull Supplier<T> block, int maxTransactionRetry) {
int retryCount = 0;
RuntimeException lastException = null;
while (retryCount <= maxTransactionRetry) {
try (Transaction transaction = _server.beginTransaction()) {
T result = block.get();
transaction.commit();
return result; // Successful execution, return result
} catch (RuntimeException exception) {
lastException = exception;
retryCount++;
}
}
// If we exhausted retries, throw an exception.
if (lastException != null) {
throw new RetryLimitReached("Failed to execute after " + maxTransactionRetry + " retries", lastException);
} else {
throw new RetryLimitReached("Failed to execute after " + maxTransactionRetry + " retries due to unknown reasons");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ public void testClearRelationshipsByEntityUrn() throws URISyntaxException {
"bar", "urn:li:foo:456", "foo")));

BarUrn barUrn = BarUrn.createFromString("urn:li:bar:123");
FooUrn fooUrn = FooUrn.createFromString("urn:li:foo:123");

// Before processing
List<SqlRow> before = _server.createSqlQuery("select * from metadata_relationship_pairswith where deleted_ts is null").findList();
Expand All @@ -227,6 +226,29 @@ public void testClearRelationshipsByEntityUrn() throws URISyntaxException {
_server.execute(Ebean.createSqlUpdate("truncate metadata_relationship_pairswith"));
}

@Test
public void testClearRelationshipsByEntityUrnWithBatching() throws URISyntaxException {
// Insert a large number of relationships to trigger batch processing
for (int i = 0; i < 10001; i++) {
_server.execute(Ebean.createSqlUpdate(insertRelationships("metadata_relationship_pairswith", "urn:li:bar:123",
"bar", "urn:li:foo:" + i, "foo")));
}

BarUrn barUrn = BarUrn.createFromString("urn:li:bar:123");
// Before processing
List<SqlRow> before = _server.createSqlQuery("select * from metadata_relationship_pairswith where deleted_ts is null").findList();
assertEquals(before.size(), 10001);

_localRelationshipWriterDAO.clearRelationshipsByEntity(barUrn, PairsWith.class, false);

// After processing verification
List<SqlRow> all = _server.createSqlQuery("select * from metadata_relationship_pairswith where deleted_ts is null").findList();
assertEquals(all.size(), 0); // Total number of edges is 0
assertEquals(_localRelationshipWriterDAO.getBatchCount(), 2); //expect 2 batches
// Clean up
_server.execute(Ebean.createSqlUpdate("truncate metadata_relationship_pairswith"));
}

@Test
public void testRemoveRelationships() throws URISyntaxException {
BarUrn barUrn = BarUrn.createFromString("urn:li:bar:123");
Expand Down Expand Up @@ -254,7 +276,7 @@ public void testRemoveRelationships() throws URISyntaxException {

private String insertRelationships(String table, String sourceUrn, String sourceType, String destinationUrn, String destinationType) {
String insertTemplate = "INSERT INTO %s (metadata, source, source_type, destination, destination_type, lastmodifiedon, lastmodifiedby)"
+ " VALUES ('{\"metadata\": true}', '%s', '%s', '%s', '%s', '1970-01-01 00:00:01', 'unknown')";
+ " VALUES ('{\"metadata\": true}', '%s', '%s', '%s', '%s', CURRENT_TIMESTAMP, 'unknown')";
return String.format(insertTemplate, table, sourceUrn, sourceType, destinationUrn, destinationType);
}
}
Loading