From a4f22cc06a5642bbc19064947c869082fe38773b Mon Sep 17 00:00:00 2001 From: Rakhi Agrawal Date: Fri, 11 Oct 2024 14:56:26 -0700 Subject: [PATCH] Refactored RPC Routing for grpc implementation (#447) * Refactored * Fixed the refactoring issues in the code * Fixed import * Added unit tests * Addressed comments * Addressed comments * Cleaned code * Changed to constructor --------- Co-authored-by: Rakhi Agrawal --- .../linkedin/metadata/dao/BaseLocalDAO.java | 43 ++++++++-------- ...RestliCompliantPreUpdateRoutingClient.java | 40 --------------- .../RestliPreUpdateAspectRegistry.java | 24 --------- .../preupdate/PreUpdateAspectRegistry.java | 44 +++++++++++++++++ .../preupdate/PreUpdateResponse.java | 12 +++++ .../preupdate/PreUpdateRoutingAccessor.java | 15 ++++++ .../PreUpdateRoutingClient.java | 9 ++-- .../metadata/dao/BaseLocalDAOTest.java | 35 +++++++++++-- .../SamplePreUpdateAspectRegistryImpl.java | 28 ----------- .../SamplePreUpdateRoutingClient.java | 36 +++----------- .../preupdate/PreUpdateResponseTest.java | 23 +++++++++ .../PreUpdateRoutingAccessorTest.java | 32 ++++++++++++ .../restli/BaseAspectRoutingResource.java | 19 +++---- .../restli/BaseAspectRoutingResourceTest.java | 49 ++++++++++++++----- .../SamplePreUpdateAspectRegistryImpl.java | 32 ------------ .../SamplePreUpdateRoutingClient.java | 37 +++----------- 16 files changed, 245 insertions(+), 233 deletions(-) delete mode 100644 dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/RestliCompliantPreUpdateRoutingClient.java delete mode 100644 dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/RestliPreUpdateAspectRegistry.java create mode 100644 dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateAspectRegistry.java create mode 100644 dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponse.java create mode 100644 dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingAccessor.java rename dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/{ => preupdate}/PreUpdateRoutingClient.java (54%) delete mode 100644 dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/SamplePreUpdateAspectRegistryImpl.java create mode 100644 dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponseTest.java create mode 100644 dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingAccessorTest.java delete mode 100644 restli-resources/src/test/java/com/linkedin/metadata/restli/ingestion/SamplePreUpdateAspectRegistryImpl.java diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java index 223d6ffbe..d5016b644 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java @@ -1,7 +1,6 @@ package com.linkedin.metadata.dao; import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.Message; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; import com.linkedin.data.DataMap; @@ -25,8 +24,10 @@ import com.linkedin.metadata.dao.exception.ModelValidationException; import com.linkedin.metadata.dao.ingestion.BaseLambdaFunction; import com.linkedin.metadata.dao.ingestion.LambdaFunctionRegistry; -import com.linkedin.metadata.dao.ingestion.RestliPreUpdateAspectRegistry; -import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient; +import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingAccessor; +import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry; +import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse; +import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient; import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer; import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer; import com.linkedin.metadata.dao.retention.IndefiniteRetention; @@ -182,7 +183,7 @@ public static class AspectUpdateLambda { protected UrnPathExtractor _urnPathExtractor; private LambdaFunctionRegistry _lambdaFunctionRegistry; - private RestliPreUpdateAspectRegistry _restliPreUpdateAspectRegistry = null; + private PreUpdateAspectRegistry _preUpdateAspectRegistry = null; // Maps an aspect class to the corresponding retention policy private final Map, Retention> _aspectRetentionMap = new HashMap<>(); @@ -402,16 +403,16 @@ public void setLambdaFunctionRegistry(@Nullable LambdaFunctionRegistry lambdaFun /** * Set pre ingestion aspect registry. */ - public void setRestliPreUpdateAspectRegistry( - @Nullable RestliPreUpdateAspectRegistry restliPreUpdateAspectRegistry) { - _restliPreUpdateAspectRegistry = restliPreUpdateAspectRegistry; + public void setPreUpdateAspectRegistry( + @Nullable PreUpdateAspectRegistry preUpdateAspectRegistry) { + _preUpdateAspectRegistry = preUpdateAspectRegistry; } /** * Get pre ingestion aspect registry. */ - public RestliPreUpdateAspectRegistry getRestliPreUpdateAspectRegistry() { - return _restliPreUpdateAspectRegistry; + public PreUpdateAspectRegistry getPreUpdateAspectRegistry() { + return _preUpdateAspectRegistry; } @@ -1659,20 +1660,20 @@ protected ASPECT updatePreIngestionLambdas(@Nonn /** * This method routes the update request to the appropriate custom API for pre-ingestion processing. * @param urn the urn of the asset - * @param newValue the new aspect value + * @param newAspect the new aspect value * @return the updated aspect */ - protected ASPECT preUpdateRouting(URN urn, ASPECT newValue) { - if (_restliPreUpdateAspectRegistry != null && _restliPreUpdateAspectRegistry.isRegistered( - newValue.getClass())) { - RestliCompliantPreUpdateRoutingClient client = - _restliPreUpdateAspectRegistry.getPreUpdateRoutingClient(newValue); - Message updatedAspect = - client.routingLambda(client.convertUrnToMessage(urn), client.convertAspectToMessage(newValue)); - RecordTemplate convertedAspect = client.convertAspectToRecordTemplate(updatedAspect); - log.info("PreUpdateRouting completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, convertedAspect); - return (ASPECT) convertedAspect; + protected ASPECT preUpdateRouting(URN urn, ASPECT newAspect) { + if (_preUpdateAspectRegistry != null && _preUpdateAspectRegistry.isRegistered( + newAspect.getClass())) { + PreUpdateRoutingAccessor preUpdateRoutingAccessor = _preUpdateAspectRegistry.getPreUpdateRoutingAccessor(newAspect.getClass()); + PreUpdateRoutingClient client = + preUpdateRoutingAccessor.getPreUpdateClient(); + PreUpdateResponse preUpdateResponse = client.preUpdate(urn, newAspect); + ASPECT updatedAspect = (ASPECT) preUpdateResponse.getUpdatedAspect(); + log.info("PreUpdateRouting completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, updatedAspect); + return (ASPECT) updatedAspect; } - return newValue; + return newAspect; } } diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/RestliCompliantPreUpdateRoutingClient.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/RestliCompliantPreUpdateRoutingClient.java deleted file mode 100644 index 0b2925a83..000000000 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/RestliCompliantPreUpdateRoutingClient.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.linkedin.metadata.dao.ingestion; - -import com.google.protobuf.Message; - -import com.linkedin.common.urn.Urn; -import com.linkedin.data.template.RecordTemplate; - -/** - * A restli client to route update request to the appropriate to custom APIs. - *

This interface extends {@link PreUpdateRoutingClient} and provides additional methods for converting - * * URNs and aspects between different representations (e.g., from Pegasus to Protobuf).

- * * - */ - -public interface RestliCompliantPreUpdateRoutingClient extends PreUpdateRoutingClient { - - /** - * Converts a URN to a Protobuf message. - * - * @param pegasusUrn the URN to be converted - * @return the converted Protobuf message - */ - Message convertUrnToMessage(Urn pegasusUrn); - - /** - * Converts a {@link RecordTemplate} aspect to a Protobuf message aspect. - * - * @param pegasusAspect the aspect to be converted - * @return the converted Protobuf message aspect - */ - ASPECT convertAspectToMessage(RecordTemplate pegasusAspect); - - /** - * Converts a Protobuf message aspect to a {@link RecordTemplate} aspect. - * - * @param messageAspect the Protobuf message aspect to be converted - * @return the converted {@link RecordTemplate} aspect - */ - RecordTemplate convertAspectToRecordTemplate(ASPECT messageAspect); -} \ No newline at end of file diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/RestliPreUpdateAspectRegistry.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/RestliPreUpdateAspectRegistry.java deleted file mode 100644 index fd635c16e..000000000 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/RestliPreUpdateAspectRegistry.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.linkedin.metadata.dao.ingestion; - -import com.linkedin.data.template.RecordTemplate; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - - -/** - * A registry which maintains mapping of aspects and their getPreUpdateRoutingClient. - */ -public interface RestliPreUpdateAspectRegistry { - - /** - * Get PreUpdateRoutingClient for an aspect. - */ - @Nullable - RestliCompliantPreUpdateRoutingClient getPreUpdateRoutingClient(@Nonnull final ASPECT aspect); - - /** - * Check if PreUpdateRoutingClient is registered for an aspect. - */ - boolean isRegistered(@Nonnull final Class aspectClass); - -} \ No newline at end of file diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateAspectRegistry.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateAspectRegistry.java new file mode 100644 index 000000000..ef39b5df9 --- /dev/null +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateAspectRegistry.java @@ -0,0 +1,44 @@ +package com.linkedin.metadata.dao.ingestion.preupdate; + +import com.linkedin.data.template.RecordTemplate; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; + + +/** + * A registry which maintains mapping of aspects and their PreUpdateRoutingClient. + */ +@Slf4j +public class PreUpdateAspectRegistry { + + private final Map, PreUpdateRoutingAccessor> _preUpdateLambdaMap; + + /** + * Constructor to register pre-update routing accessors for multiple aspects at once. + * @param preUpdateMap map containing aspect classes and their corresponding accessors + */ + public PreUpdateAspectRegistry(@Nonnull Map, PreUpdateRoutingAccessor> preUpdateMap) { + _preUpdateLambdaMap = new HashMap<>(preUpdateMap); + log.info("Registered pre-update routing accessors for aspects: {}", _preUpdateLambdaMap.keySet()); + } + + /** + * Get Pre Update Routing Accessor for an aspect class. + * @param aspectClass the class of the aspect to retrieve the accessor for + * @return PreUpdateRoutingAccessor for the given aspect class, or null if not found + */ + public PreUpdateRoutingAccessor getPreUpdateRoutingAccessor( + @Nonnull Class aspectClass) { + return _preUpdateLambdaMap.get(aspectClass); + } + + /** + * Check if Pre Update Routing Accessor is registered for an aspect. + */ + public boolean isRegistered(@Nonnull final Class aspectClass) { + return _preUpdateLambdaMap.containsKey(aspectClass); + } + +} \ No newline at end of file diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponse.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponse.java new file mode 100644 index 000000000..b77d12a45 --- /dev/null +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponse.java @@ -0,0 +1,12 @@ +package com.linkedin.metadata.dao.ingestion.preupdate; + +import com.linkedin.data.template.RecordTemplate; +import lombok.Data; + +/** + * Response of pre-update process that includes the updated aspect. + */ +@Data +public class PreUpdateResponse { + private final ASPECT updatedAspect; +} diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingAccessor.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingAccessor.java new file mode 100644 index 000000000..2fa6fda58 --- /dev/null +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingAccessor.java @@ -0,0 +1,15 @@ +package com.linkedin.metadata.dao.ingestion.preupdate; + +import com.linkedin.data.template.RecordTemplate; +import lombok.Data; + + +@Data +public class PreUpdateRoutingAccessor { + + public PreUpdateRoutingClient preUpdateClient; + + public enum RoutingAction { + PROCEED, SKIP + } +} diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/PreUpdateRoutingClient.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingClient.java similarity index 54% rename from dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/PreUpdateRoutingClient.java rename to dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingClient.java index 01335f408..90441798f 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/PreUpdateRoutingClient.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingClient.java @@ -1,18 +1,19 @@ -package com.linkedin.metadata.dao.ingestion; +package com.linkedin.metadata.dao.ingestion.preupdate; -import com.google.protobuf.Message; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; /** * An interface that defines methods to route update requests to the appropriate custom APIs for pre-ingestion process. */ -public interface PreUpdateRoutingClient { +public interface PreUpdateRoutingClient { /** * A method that routes the update request to the appropriate custom API. * @param urn the urn of the asset * @param aspect the aspect to be updated * @return the updated aspect */ - ASPECT routingLambda(Message urn, ASPECT aspect); + PreUpdateResponse preUpdate(Urn urn, ASPECT aspect); } \ No newline at end of file diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java index ed47a977e..d23db0b2f 100644 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java @@ -6,7 +6,9 @@ import com.linkedin.data.template.UnionTemplate; import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates; import com.linkedin.metadata.dao.ingestion.SampleLambdaFunctionRegistryImpl; -import com.linkedin.metadata.dao.ingestion.SamplePreUpdateAspectRegistryImpl; +import com.linkedin.metadata.dao.ingestion.SamplePreUpdateRoutingClient; +import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingAccessor; +import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry; import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer; import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer; import com.linkedin.metadata.dao.retention.TimeBasedRetention; @@ -31,6 +33,7 @@ import java.sql.Timestamp; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -658,7 +661,16 @@ public void testPreUpdateRoutingFromFooToBar() throws URISyntaxException { FooUrn urn = new FooUrn(1); AspectFoo foo = new AspectFoo().setValue("foo"); AspectFoo bar = new AspectFoo().setValue("bar"); - _dummyLocalDAO.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl()); + + PreUpdateRoutingAccessor preUpdateRoutingAccessor = new PreUpdateRoutingAccessor(); + preUpdateRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); + + Map, PreUpdateRoutingAccessor> preUpdateMap = new HashMap<>(); + preUpdateMap.put(AspectFoo.class, preUpdateRoutingAccessor); + + PreUpdateAspectRegistry preUpdateAspectRegistry = new PreUpdateAspectRegistry(preUpdateMap); + _dummyLocalDAO.setPreUpdateAspectRegistry(preUpdateAspectRegistry); + AspectFoo result = _dummyLocalDAO.preUpdateRouting(urn, foo); assertEquals(result, bar); } @@ -669,7 +681,15 @@ public void testMAEEmissionForPreUpdateRouting() throws URISyntaxException { AspectFoo foo = new AspectFoo().setValue("foo"); AspectFoo bar = new AspectFoo().setValue("bar"); _dummyLocalDAO.setAlwaysEmitAuditEvent(true); - _dummyLocalDAO.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl()); + PreUpdateRoutingAccessor preUpdateRoutingAccessor = new PreUpdateRoutingAccessor(); + preUpdateRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); + + Map, PreUpdateRoutingAccessor> preUpdateMap = new HashMap<>(); + preUpdateMap.put(AspectFoo.class, preUpdateRoutingAccessor); + + PreUpdateAspectRegistry preUpdateAspectRegistry = new PreUpdateAspectRegistry(preUpdateMap); + + _dummyLocalDAO.setPreUpdateAspectRegistry(preUpdateAspectRegistry); expectGetLatest(urn, AspectFoo.class, Arrays.asList(makeAspectEntry(null, null), makeAspectEntry(foo, _dummyAuditStamp))); @@ -687,7 +707,14 @@ public void testPreUpdateRoutingWithUnregisteredAspect() throws URISyntaxExcepti AspectBar foo = new AspectBar().setValue("foo"); // Inject RestliPreIngestionAspectRegistry with no registered aspect - _dummyLocalDAO.setRestliPreUpdateAspectRegistry(new SamplePreUpdateAspectRegistryImpl()); + PreUpdateRoutingAccessor preUpdateRoutingAccessor = new PreUpdateRoutingAccessor(); + preUpdateRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); + + Map, PreUpdateRoutingAccessor> preUpdateMap = new HashMap<>(); + preUpdateMap.put(AspectFoo.class, preUpdateRoutingAccessor); + + PreUpdateAspectRegistry preUpdateAspectRegistry = new PreUpdateAspectRegistry(preUpdateMap); + _dummyLocalDAO.setPreUpdateAspectRegistry(preUpdateAspectRegistry); // Call the add method AspectBar result = _dummyLocalDAO.preUpdateRouting(urn, foo); diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/SamplePreUpdateAspectRegistryImpl.java b/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/SamplePreUpdateAspectRegistryImpl.java deleted file mode 100644 index 76021c058..000000000 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/SamplePreUpdateAspectRegistryImpl.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.linkedin.metadata.dao.ingestion; - -import com.google.common.collect.ImmutableMap; -import com.linkedin.data.template.RecordTemplate; -import com.linkedin.testing.AspectFoo; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - - -public class SamplePreUpdateAspectRegistryImpl implements RestliPreUpdateAspectRegistry { - private final ImmutableMap, RestliCompliantPreUpdateRoutingClient> registry; - - public SamplePreUpdateAspectRegistryImpl() { - registry = new ImmutableMap.Builder, RestliCompliantPreUpdateRoutingClient>() - .put(AspectFoo.class, new SamplePreUpdateRoutingClient()) - .build(); - } - @Nullable - @Override - public RestliCompliantPreUpdateRoutingClient getPreUpdateRoutingClient(@Nonnull ASPECT aspect) { - return registry.get(aspect.getClass()); - } - - @Override - public boolean isRegistered(@Nonnull Class aspectClass) { - return registry.containsKey(aspectClass); - } -} diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/SamplePreUpdateRoutingClient.java b/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/SamplePreUpdateRoutingClient.java index acf675be1..af5973f8b 100644 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/SamplePreUpdateRoutingClient.java +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/SamplePreUpdateRoutingClient.java @@ -1,40 +1,18 @@ package com.linkedin.metadata.dao.ingestion; -import com.google.protobuf.Any; -import com.google.protobuf.Message; -import com.google.protobuf.StringValue; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse; +import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient; import com.linkedin.testing.AspectFoo; -public class SamplePreUpdateRoutingClient implements RestliCompliantPreUpdateRoutingClient { - @Override - public Message routingLambda(Message urn, Message aspect) { - // For testing, change the aspect value to "bar" - return Any.pack(StringValue.of("bar")); - } - - @Override - public Message convertUrnToMessage(Urn urn) { - // Directly wrap the URN string into a Protobuf message for testing - return Any.pack(StringValue.of(urn.toString())); - } - - @Override - public Message convertAspectToMessage(RecordTemplate pegasusAspect) { - // For testing, convert AspectFoo to a TestMessageProtos.AspectMessage - // Assuming the aspect has a `value` field and its string representation can be used for now - String aspectString = pegasusAspect.toString(); // Extracting the aspect as a string (e.g., {value=foo}) - - // Wrap the aspect string into a simple Protobuf message for testing - return Any.pack(StringValue.of(aspectString)); - } +public class SamplePreUpdateRoutingClient implements PreUpdateRoutingClient { @Override - public RecordTemplate convertAspectToRecordTemplate(Message messageAspect) { - // For testing, convert TestMessageProtos.AspectMessage back to AspectFoo - // Create a new RecordTemplate (AspectFoo in this case) and set the value field - return new AspectFoo().setValue("bar"); + public PreUpdateResponse preUpdate(Urn urn, RecordTemplate recordTemplate) { + AspectFoo aspectFoo = (AspectFoo) recordTemplate; + aspectFoo.setValue("bar"); + return new PreUpdateResponse(aspectFoo); } } diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponseTest.java b/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponseTest.java new file mode 100644 index 000000000..74c7195ff --- /dev/null +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponseTest.java @@ -0,0 +1,23 @@ +package com.linkedin.metadata.dao.ingestion.preupdate; + +import com.linkedin.data.template.RecordTemplate; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; +import static org.testng.AssertJUnit.*; + + +public class PreUpdateResponseTest { + + @Test + public void testConstructorAndGetter() { + // Create a mock instance of RecordTemplate + RecordTemplate mockAspect = mock(RecordTemplate.class); + + // Create an instance of PreUpdateResponse with the mock aspect + PreUpdateResponse response = new PreUpdateResponse<>(mockAspect); + + // Verify that the getter returns the correct value + assertEquals(mockAspect, response.getUpdatedAspect()); + } +} diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingAccessorTest.java b/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingAccessorTest.java new file mode 100644 index 000000000..584be50a3 --- /dev/null +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingAccessorTest.java @@ -0,0 +1,32 @@ +package com.linkedin.metadata.dao.ingestion.preupdate; + +import com.google.protobuf.Message; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.AssertJUnit.*; +import static org.mockito.Mockito.*; + + +public class PreUpdateRoutingAccessorTest { + private PreUpdateRoutingAccessor routingInfo; + private PreUpdateRoutingClient mockPreUpdateClient; + + @BeforeMethod + public void setUp() { + routingInfo = new PreUpdateRoutingAccessor(); + mockPreUpdateClient = mock(PreUpdateRoutingClient.class); + } + + @Test + public void testPreUpdateClientSetterAndGetter() { + routingInfo.setPreUpdateClient(mockPreUpdateClient); + assertEquals(mockPreUpdateClient, routingInfo.getPreUpdateClient()); + } + + @Test + public void testRoutingActionEnum() { + assertEquals("PROCEED", PreUpdateRoutingAccessor.RoutingAction.PROCEED.name()); + assertEquals("SKIP", PreUpdateRoutingAccessor.RoutingAction.SKIP.name()); + } +} diff --git a/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseAspectRoutingResource.java b/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseAspectRoutingResource.java index 2edfcef9d..49633ad5a 100644 --- a/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseAspectRoutingResource.java +++ b/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseAspectRoutingResource.java @@ -1,6 +1,5 @@ package com.linkedin.metadata.restli; -import com.google.protobuf.Message; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; @@ -8,8 +7,10 @@ import com.linkedin.data.template.StringArray; import com.linkedin.data.template.UnionTemplate; import com.linkedin.metadata.dao.AspectKey; -import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient; -import com.linkedin.metadata.dao.ingestion.RestliPreUpdateAspectRegistry; +import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingAccessor; +import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry; +import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse; +import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient; import com.linkedin.metadata.dao.utils.ModelUtils; import com.linkedin.metadata.events.IngestionTrackingContext; import com.linkedin.metadata.internal.IngestionParams; @@ -416,7 +417,7 @@ private void ingestAspect(Set> aspectsToIgnore, if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) { try { // get the updated aspect if there is a preupdate routing lambda registered - RestliPreUpdateAspectRegistry registry = getLocalDAO().getRestliPreUpdateAspectRegistry(); + PreUpdateAspectRegistry registry = getLocalDAO().getPreUpdateAspectRegistry(); if (!skipExtraProcessing && registry != null && registry.isRegistered(aspect.getClass())) { log.info(String.format("Executing registered pre-update routing lambda for aspect class %s.", aspect.getClass())); aspect = preUpdateRouting((URN) urn, aspect, registry); @@ -689,10 +690,10 @@ private List getValueFromRoutingGms(@Nonnull URN urn, * @param aspect the new aspect value * @return the updated aspect */ - private RecordTemplate preUpdateRouting(URN urn, RecordTemplate aspect, RestliPreUpdateAspectRegistry registry) { - RestliCompliantPreUpdateRoutingClient client = registry.getPreUpdateRoutingClient(aspect); - Message updatedAspect = - client.routingLambda(client.convertUrnToMessage(urn), client.convertAspectToMessage(aspect)); - return client.convertAspectToRecordTemplate(updatedAspect); + private RecordTemplate preUpdateRouting(URN urn, RecordTemplate aspect, PreUpdateAspectRegistry registry) { + PreUpdateRoutingAccessor preUpdateRoutingAccessor = registry.getPreUpdateRoutingAccessor(aspect.getClass()); + PreUpdateRoutingClient preUpdateClient = preUpdateRoutingAccessor.getPreUpdateClient(); + PreUpdateResponse preUpdateResponse = preUpdateClient.preUpdate(urn, aspect); + return preUpdateResponse.getUpdatedAspect(); } } diff --git a/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseAspectRoutingResourceTest.java b/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseAspectRoutingResourceTest.java index e47dd8de4..c082c61a4 100644 --- a/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseAspectRoutingResourceTest.java +++ b/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseAspectRoutingResourceTest.java @@ -8,10 +8,12 @@ import com.linkedin.metadata.dao.BaseBrowseDAO; import com.linkedin.metadata.dao.BaseLocalDAO; import com.linkedin.metadata.dao.BaseSearchDAO; +import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingAccessor; +import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry; import com.linkedin.metadata.dao.utils.ModelUtils; import com.linkedin.metadata.dao.utils.RecordUtils; import com.linkedin.metadata.events.IngestionTrackingContext; -import com.linkedin.metadata.restli.ingestion.SamplePreUpdateAspectRegistryImpl; +import com.linkedin.metadata.restli.ingestion.SamplePreUpdateRoutingClient; import com.linkedin.parseq.BaseEngineTest; import com.linkedin.restli.common.ComplexResourceKey; import com.linkedin.restli.common.EmptyRecord; @@ -36,6 +38,7 @@ import java.net.URISyntaxException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -305,7 +308,7 @@ public void testIngestWithRoutingAspect() { verify(_mockLocalDAO, times(1)).add(eq(urn), eq(bar), any(), eq(null), eq(null)); verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foo)); verify(_mockAspectAttributeGmsClient, times(1)).ingest(eq(urn), eq(attributes)); - verify(_mockLocalDAO, times(2)).getRestliPreUpdateAspectRegistry(); + verify(_mockLocalDAO, times(2)).getPreUpdateAspectRegistry(); verify(_mockLocalDAO, times(1)).rawAdd(eq(urn), eq(foo), any(), any(), any()); } @@ -325,7 +328,7 @@ public void testIngestWithTrackingWithRoutingAspect() { verify(_mockLocalDAO, times(1)).add(eq(urn), eq(bar), any(), eq(trackingContext), eq(null)); verify(_mockAspectFooGmsClient, times(1)).ingestWithTracking(eq(urn), eq(foo), eq(trackingContext), eq(null)); verify(_mockAspectAttributeGmsClient, times(1)).ingestWithTracking(eq(urn), eq(attributes), eq(trackingContext), eq(null)); - verify(_mockLocalDAO, times(2)).getRestliPreUpdateAspectRegistry(); + verify(_mockLocalDAO, times(2)).getPreUpdateAspectRegistry(); verify(_mockLocalDAO, times(1)).rawAdd(eq(urn), eq(foo), any(), any(), any()); } @@ -353,7 +356,7 @@ public void testIngestWithOnlyRoutingAspect() { runAndWait(_resource.ingest(snapshot)); - verify(_mockLocalDAO, times(2)).getRestliPreUpdateAspectRegistry(); + verify(_mockLocalDAO, times(2)).getPreUpdateAspectRegistry(); verify(_mockLocalDAO, times(1)).rawAdd(eq(urn), eq(foo), any(), any(), any()); // verify(_mockGmsClient, times(1)).ingest(eq(urn), eq(foo)); verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foo)); @@ -559,13 +562,19 @@ public void testPreUpdateRoutingWithRegisteredAspect() { List aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo)); EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects); - SamplePreUpdateAspectRegistryImpl registry = new SamplePreUpdateAspectRegistryImpl(); - when(_mockLocalDAO.getRestliPreUpdateAspectRegistry()).thenReturn(registry); + PreUpdateRoutingAccessor preUpdateRoutingAccessor = new PreUpdateRoutingAccessor(); + preUpdateRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); + Map, PreUpdateRoutingAccessor> preUpdateMap = new HashMap<>(); + preUpdateMap.put(AspectFoo.class, preUpdateRoutingAccessor); + + PreUpdateAspectRegistry registry = new PreUpdateAspectRegistry(preUpdateMap); + + when(_mockLocalDAO.getPreUpdateAspectRegistry()).thenReturn(registry); // given: ingest a snapshot containing a routed aspect which has a registered pre-update lambda. runAndWait(_resource.ingest(snapshot)); - verify(_mockLocalDAO, times(1)).getRestliPreUpdateAspectRegistry(); + verify(_mockLocalDAO, times(1)).getPreUpdateAspectRegistry(); // expected: the pre-update lambda is executed first (aspect value is changed from foo to foobar) and then the aspect is dual-written. AspectFoo foobar = new AspectFoo().setValue("foobar"); // dual write pt1: ensure the ingestion request is forwarded to the routed GMS. @@ -600,8 +609,16 @@ public void testPreUpdateRoutingWithNonRoutedAspectAndRegisteredPreUpdate() { AspectBar bar = new AspectBar().setValue("bar"); List aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, bar)); EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects); - SamplePreUpdateAspectRegistryImpl registry = new SamplePreUpdateAspectRegistryImpl(); - when(_mockLocalDAO.getRestliPreUpdateAspectRegistry()).thenReturn(registry); + + PreUpdateRoutingAccessor preUpdateRoutingAccessor = new PreUpdateRoutingAccessor(); + preUpdateRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); + + Map, PreUpdateRoutingAccessor> preUpdateMap = new HashMap<>(); + preUpdateMap.put(AspectFoo.class, preUpdateRoutingAccessor); + + PreUpdateAspectRegistry registry = new PreUpdateAspectRegistry(preUpdateMap); + + when(_mockLocalDAO.getPreUpdateAspectRegistry()).thenReturn(registry); // given: ingest a snapshot which contains a non-routed aspect which has a registered pre-update lambda. runAndWait(_resource.ingest(snapshot)); @@ -647,12 +664,18 @@ public void testPreUpdateRoutingWithSkipIngestion() throws NoSuchFieldException, AspectFoo foo = new AspectFoo().setValue("foo"); List aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo)); EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects); - SamplePreUpdateAspectRegistryImpl registry = new SamplePreUpdateAspectRegistryImpl(); - when(_mockLocalDAO.getRestliPreUpdateAspectRegistry()).thenReturn(registry); + + PreUpdateRoutingAccessor preUpdateRoutingAccessor = new PreUpdateRoutingAccessor(); + preUpdateRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); + Map, PreUpdateRoutingAccessor> preUpdateMap = new HashMap<>(); + preUpdateMap.put(AspectFoo.class, preUpdateRoutingAccessor); + PreUpdateAspectRegistry registry = new PreUpdateAspectRegistry(preUpdateMap); + + when(_mockLocalDAO.getPreUpdateAspectRegistry()).thenReturn(registry); runAndWait(_resource.ingest(snapshot)); verify(_mockAspectFooGmsClient, times(0)).ingest(any(), any()); - verify(_mockLocalDAO, times(1)).getRestliPreUpdateAspectRegistry(); + verify(_mockLocalDAO, times(1)).getPreUpdateAspectRegistry(); // Should not add to local DAO verifyNoMoreInteractions(_mockLocalDAO); } @@ -677,7 +700,7 @@ public void testPreUpdateRoutingWithSkipIngestionNoPreLambda() throws NoSuchFiel // Should not skip ingestion verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foo)); // Should check for pre lambda - verify(_mockLocalDAO, times(1)).getRestliPreUpdateAspectRegistry(); + verify(_mockLocalDAO, times(1)).getPreUpdateAspectRegistry(); // Should continue to dual-write into local DAO verify(_mockLocalDAO, times(1)).rawAdd(eq(urn), eq(foo), any(), any(), any()); verifyNoMoreInteractions(_mockLocalDAO); diff --git a/restli-resources/src/test/java/com/linkedin/metadata/restli/ingestion/SamplePreUpdateAspectRegistryImpl.java b/restli-resources/src/test/java/com/linkedin/metadata/restli/ingestion/SamplePreUpdateAspectRegistryImpl.java deleted file mode 100644 index 5489a9728..000000000 --- a/restli-resources/src/test/java/com/linkedin/metadata/restli/ingestion/SamplePreUpdateAspectRegistryImpl.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.linkedin.metadata.restli.ingestion; - -import com.google.common.collect.ImmutableMap; -import com.linkedin.data.template.RecordTemplate; -import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient; -import com.linkedin.metadata.dao.ingestion.RestliPreUpdateAspectRegistry; -import com.linkedin.testing.AspectBar; -import com.linkedin.testing.AspectFoo; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - - -public class SamplePreUpdateAspectRegistryImpl implements RestliPreUpdateAspectRegistry { - private final ImmutableMap, RestliCompliantPreUpdateRoutingClient> registry; - - public SamplePreUpdateAspectRegistryImpl() { - registry = new ImmutableMap.Builder, RestliCompliantPreUpdateRoutingClient>() - .put(AspectFoo.class, new SamplePreUpdateRoutingClient()) - .put(AspectBar.class, new SamplePreUpdateRoutingClient()) - .build(); - } - @Nullable - @Override - public RestliCompliantPreUpdateRoutingClient getPreUpdateRoutingClient(@Nonnull ASPECT aspect) { - return registry.get(aspect.getClass()); - } - - @Override - public boolean isRegistered(@Nonnull Class aspectClass) { - return registry.containsKey(aspectClass); - } -} diff --git a/restli-resources/src/test/java/com/linkedin/metadata/restli/ingestion/SamplePreUpdateRoutingClient.java b/restli-resources/src/test/java/com/linkedin/metadata/restli/ingestion/SamplePreUpdateRoutingClient.java index dcc2d5817..a0ca313fa 100644 --- a/restli-resources/src/test/java/com/linkedin/metadata/restli/ingestion/SamplePreUpdateRoutingClient.java +++ b/restli-resources/src/test/java/com/linkedin/metadata/restli/ingestion/SamplePreUpdateRoutingClient.java @@ -1,41 +1,20 @@ package com.linkedin.metadata.restli.ingestion; -import com.google.protobuf.Any; -import com.google.protobuf.Message; -import com.google.protobuf.StringValue; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; -import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient; +import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse; +import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient; import com.linkedin.testing.AspectFoo; -public class SamplePreUpdateRoutingClient implements RestliCompliantPreUpdateRoutingClient { - @Override - public Message routingLambda(Message urn, Message aspect) { - // For testing, change the aspect value to "foobar" - return Any.pack(StringValue.of("foobar")); - } - - @Override - public Message convertUrnToMessage(Urn urn) { - // Directly wrap the URN string into a Protobuf message for testing - return Any.pack(StringValue.of(urn.toString())); - } +public class SamplePreUpdateRoutingClient implements PreUpdateRoutingClient { @Override - public Message convertAspectToMessage(RecordTemplate pegasusAspect) { - // For testing, convert AspectFoo to a TestMessageProtos.AspectMessage - // Assuming the aspect has a `value` field and its string representation can be used for now - String aspectString = pegasusAspect.toString(); // Extracting the aspect as a string (e.g., {value=foo}) - - // Wrap the aspect string into a simple Protobuf message for testing - return Any.pack(StringValue.of(aspectString)); - } + public PreUpdateResponse preUpdate(Urn urn, RecordTemplate recordTemplate) { - @Override - public RecordTemplate convertAspectToRecordTemplate(Message messageAspect) { - // For testing, convert TestMessageProtos.AspectMessage back to AspectFoo - // Create a new RecordTemplate (AspectFoo in this case) and set the value field to foobar - return new AspectFoo().setValue("foobar"); + // For testing, change the aspect value to "bar" + RecordTemplate updatedAspect = new AspectFoo().setValue("foobar"); + // Return a new PreUpdateResponse with the updated aspect + return new PreUpdateResponse<>(updatedAspect); } }