diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalRelationshipWriterDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalRelationshipWriterDAO.java index 7edfb5561..48e0535f2 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalRelationshipWriterDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalRelationshipWriterDAO.java @@ -3,6 +3,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates; +import com.linkedin.metadata.dao.exception.RetryLimitReached; import com.linkedin.metadata.dao.internal.BaseGraphWriterDAO; import com.linkedin.metadata.dao.utils.GraphUtils; import com.linkedin.metadata.dao.utils.RecordUtils; @@ -11,17 +12,21 @@ import com.linkedin.metadata.validator.RelationshipValidator; import io.ebean.EbeanServer; import io.ebean.SqlUpdate; +import io.ebean.Transaction; import io.ebean.annotation.Transactional; import java.sql.Timestamp; import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import static com.linkedin.metadata.dao.utils.ModelUtils.*; - +@Slf4j public class EbeanLocalRelationshipWriterDAO extends BaseGraphWriterDAO { private static final String DEFAULT_ACTOR = "urn:li:principal:UNKNOWN"; private final EbeanServer _server; @@ -36,6 +41,11 @@ private static class CommonColumnName { private static final String LAST_MODIFIED_ON = "lastmodifiedon"; private static final String LAST_MODIFIED_BY = "lastmodifiedby"; } + private static final int BATCH_SIZE = 10000; // Process rows in batches of 10,000 + private static final int MAX_BATCHES = 1000; // Maximum number of batches to process + private static final String LIMIT = " LIMIT "; + @Getter + private int batchCount = 0; public EbeanLocalRelationshipWriterDAO(EbeanServer server) { _server = server; @@ -61,20 +71,61 @@ public void processLocalRelationshipUpdates(@Nonnull Urn urn, /** * This method clears all the relationships from a source entity urn using REMOVE_ALL_EDGES_FROM_SOURCE. - * @param urn entity urn could be either source or destination, depends on the RemovalOption - * @param relationshipClass relationship that needs to be cleared - * @param isTestMode whether to use test schema + * + * @param urn entity urn could be either source or destination, depends on the RemovalOption + * @param relationshipClass relationship that needs to be cleared + * @param isTestMode whether to use test schema */ public void clearRelationshipsByEntity(@Nonnull Urn urn, - @Nonnull Class relationshipClass, boolean isTestMode) { + @Nonnull Class relationshipClass, boolean isTestMode) { RelationshipValidator.validateRelationshipSchema(relationshipClass, isRelationshipInV2(relationshipClass)); - SqlUpdate deletionSQL = _server.createSqlUpdate(SQLStatementUtils.deleteLocalRelationshipSQL( + String deletionQuery = SQLStatementUtils.deleteLocalRelationshipSQL( isTestMode ? SQLSchemaUtils.getTestRelationshipTableName(relationshipClass) - : SQLSchemaUtils.getRelationshipTableName(relationshipClass), RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE)); + : SQLSchemaUtils.getRelationshipTableName(relationshipClass), RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE) + LIMIT + BATCH_SIZE; + SqlUpdate deletionSQL = _server.createSqlUpdate(deletionQuery); deletionSQL.setParameter(CommonColumnName.SOURCE, urn.toString()); - deletionSQL.execute(); + batchCount = 0; + while (batchCount < MAX_BATCHES) { + try { + // Use the runInTransactionWithRetry method to handle retries in case of transaction failures + int rowsAffected = runInTransactionWithRetry(deletionSQL::execute, 3); // Retry up to 3 times in case of transient failures + batchCount++; + + if (log.isDebugEnabled()) { + log.debug("Deleted {} rows in batch {}", rowsAffected, batchCount); + } + + if (rowsAffected < BATCH_SIZE) { + // Exit loop if fewer than BATCH_SIZE rows were affected, indicating all rows are processed + break; + } + + // Sleep for 1 millisecond to reduce load + Thread.sleep(1); + } catch (RetryLimitReached e) { + log.error("Error while executing batch deletion after {} batches and retries", batchCount, e); + throw new RuntimeException("Batch deletion failed due to retry limit", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Restore interrupted status + throw new RuntimeException("Batch deletion interrupted", e); + } catch (Exception e) { + log.error("Error while executing batch deletion after {} batches", batchCount, e); + throw new RuntimeException("Batch deletion failed", e); + } + } + + if (batchCount >= MAX_BATCHES) { + log.warn( + "Reached maximum batch count of {}, consider increasing MAX_BATCH_COUNT or debugging the deletion logic.", + MAX_BATCHES); + } + + if (log.isDebugEnabled()) { + log.info("Cleared relationships in {} batches", batchCount); + } } + /** * Persist the given list of relationships to the local relationship using REMOVE_ALL_EDGES_FROM_SOURCE. * @param relationships the list of relationships to be persisted @@ -186,4 +237,26 @@ private void removeRelationshipsBySource(@ deletionSQL.setParameter(CommonColumnName.SOURCE, source.toString()); deletionSQL.execute(); } + + @Nonnull + protected T runInTransactionWithRetry(@Nonnull Supplier block, int maxTransactionRetry) { + int retryCount = 0; + RuntimeException lastException = null; + while (retryCount <= maxTransactionRetry) { + try (Transaction transaction = _server.beginTransaction()) { + T result = block.get(); + transaction.commit(); + return result; // Successful execution, return result + } catch (RuntimeException exception) { + lastException = exception; + retryCount++; + } + } + // If we exhausted retries, throw an exception. + if (lastException != null) { + throw new RetryLimitReached("Failed to execute after " + maxTransactionRetry + " retries", lastException); + } else { + throw new RetryLimitReached("Failed to execute after " + maxTransactionRetry + " retries due to unknown reasons"); + } + } } diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipWriterDAOTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipWriterDAOTest.java index 0cb3567a3..368082a97 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipWriterDAOTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipWriterDAOTest.java @@ -211,7 +211,6 @@ public void testClearRelationshipsByEntityUrn() throws URISyntaxException { "bar", "urn:li:foo:456", "foo"))); BarUrn barUrn = BarUrn.createFromString("urn:li:bar:123"); - FooUrn fooUrn = FooUrn.createFromString("urn:li:foo:123"); // Before processing List before = _server.createSqlQuery("select * from metadata_relationship_pairswith where deleted_ts is null").findList(); @@ -227,6 +226,29 @@ public void testClearRelationshipsByEntityUrn() throws URISyntaxException { _server.execute(Ebean.createSqlUpdate("truncate metadata_relationship_pairswith")); } + @Test + public void testClearRelationshipsByEntityUrnWithBatching() throws URISyntaxException { + // Insert a large number of relationships to trigger batch processing + for (int i = 0; i < 10001; i++) { + _server.execute(Ebean.createSqlUpdate(insertRelationships("metadata_relationship_pairswith", "urn:li:bar:123", + "bar", "urn:li:foo:" + i, "foo"))); + } + + BarUrn barUrn = BarUrn.createFromString("urn:li:bar:123"); + // Before processing + List before = _server.createSqlQuery("select * from metadata_relationship_pairswith where deleted_ts is null").findList(); + assertEquals(before.size(), 10001); + + _localRelationshipWriterDAO.clearRelationshipsByEntity(barUrn, PairsWith.class, false); + + // After processing verification + List all = _server.createSqlQuery("select * from metadata_relationship_pairswith where deleted_ts is null").findList(); + assertEquals(all.size(), 0); // Total number of edges is 0 + assertEquals(_localRelationshipWriterDAO.getBatchCount(), 2); //expect 2 batches + // Clean up + _server.execute(Ebean.createSqlUpdate("truncate metadata_relationship_pairswith")); + } + @Test public void testRemoveRelationships() throws URISyntaxException { BarUrn barUrn = BarUrn.createFromString("urn:li:bar:123"); @@ -254,7 +276,7 @@ public void testRemoveRelationships() throws URISyntaxException { private String insertRelationships(String table, String sourceUrn, String sourceType, String destinationUrn, String destinationType) { String insertTemplate = "INSERT INTO %s (metadata, source, source_type, destination, destination_type, lastmodifiedon, lastmodifiedby)" - + " VALUES ('{\"metadata\": true}', '%s', '%s', '%s', '%s', '1970-01-01 00:00:01', 'unknown')"; + + " VALUES ('{\"metadata\": true}', '%s', '%s', '%s', '%s', CURRENT_TIMESTAMP, 'unknown')"; return String.format(insertTemplate, table, sourceUrn, sourceType, destinationUrn, destinationType); } }