Skip to content

Commit

Permalink
Addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Rakhi Agrawal committed Oct 11, 2024
1 parent 8bb1ad4 commit fe1743a
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.linkedin.metadata.dao.exception.ModelValidationException;
import com.linkedin.metadata.dao.ingestion.BaseLambdaFunction;
import com.linkedin.metadata.dao.ingestion.LambdaFunctionRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingInfo;
import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingAccessor;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient;
Expand Down Expand Up @@ -1666,9 +1666,9 @@ protected <ASPECT extends RecordTemplate> ASPECT updatePreIngestionLambdas(@Nonn
protected <ASPECT extends RecordTemplate> ASPECT preUpdateRouting(URN urn, ASPECT newAspect) {
if (_preUpdateAspectRegistry != null && _preUpdateAspectRegistry.isRegistered(
newAspect.getClass())) {
PreRoutingInfo routingMap = _preUpdateAspectRegistry.getPreUpdateRoutingClient(newAspect);
PreRoutingAccessor preRoutingAccessor = _preUpdateAspectRegistry.getPreUpdateRoutingClient(newAspect);
PreUpdateRoutingClient client =
routingMap.getPreUpdateClient();
preRoutingAccessor.getPreUpdateClient();
PreUpdateResponse preUpdateResponse = client.preUpdate(urn, newAspect);
ASPECT updatedAspect = (ASPECT) preUpdateResponse.getUpdatedAspect();
log.info("PreUpdateRouting completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, updatedAspect);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.google.protobuf.Message;
import com.linkedin.data.template.RecordTemplate;
import lombok.Data;


@Data
public class PreRoutingInfo {
public class PreRoutingAccessor {

public PreUpdateRoutingClient<? extends Message> preUpdateClient;
public PreUpdateRoutingClient<? extends RecordTemplate> preUpdateClient;

public enum RoutingAction {
PROCEED, SKIP
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.linkedin.data.template.RecordTemplate;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -11,21 +11,21 @@
* A registry which maintains mapping of aspects and their PreUpdateRoutingClient.
*/
@Slf4j
public class PreUpdateAspectRegistry<ASPECT extends RecordTemplate> {
public class PreUpdateAspectRegistry {

private Map<Class<? extends RecordTemplate>, PreRoutingInfo> _preUpdateLambdaMap = new ConcurrentHashMap<>();
private Map<Class<? extends RecordTemplate>, PreRoutingAccessor> _preUpdateLambdaMap = new HashMap<>();

/**
* Get GrpcPreUpdateRoutingClient for an aspect.
*/
public PreRoutingInfo getPreUpdateRoutingClient(@Nonnull final ASPECT aspect) {
public <ASPECT extends RecordTemplate> PreRoutingAccessor getPreUpdateRoutingClient(@Nonnull final ASPECT aspect) {
return _preUpdateLambdaMap.get(aspect.getClass());
}

/**
* Check if GrpcPreUpdateRoutingClient is registered for an aspect.
*/
public boolean isRegistered(@Nonnull final Class<ASPECT> aspectClass) {
public <ASPECT extends RecordTemplate> boolean isRegistered(@Nonnull final Class<ASPECT> aspectClass) {
return _preUpdateLambdaMap.containsKey(aspectClass);
}

Expand All @@ -35,7 +35,7 @@ public boolean isRegistered(@Nonnull final Class<ASPECT> aspectClass) {
* @param preUpdateRoutingInfo pre update routing map
*/
public void registerPreUpdateLambda(@Nonnull Class<? extends RecordTemplate> aspectClass,
@Nonnull PreRoutingInfo preUpdateRoutingInfo) {
@Nonnull PreRoutingAccessor preUpdateRoutingInfo) {
log.info("Registering pre update lambda: {}, {}", aspectClass.getCanonicalName(), preUpdateRoutingInfo);
_preUpdateLambdaMap.put(aspectClass, preUpdateRoutingInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import com.linkedin.data.template.RecordTemplate;
import lombok.Data;


/**
* Response of pre-update process that includes the updated aspect.
*/
@Data
public class PreUpdateResponse<ASPECT extends RecordTemplate> {
private final ASPECT updatedAspect;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.ingestion.SampleLambdaFunctionRegistryImpl;
import com.linkedin.metadata.dao.ingestion.SamplePreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingInfo;
import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingAccessor;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
Expand Down Expand Up @@ -661,11 +661,11 @@ public void testPreUpdateRoutingFromFooToBar() throws URISyntaxException {
AspectFoo foo = new AspectFoo().setValue("foo");
AspectFoo bar = new AspectFoo().setValue("bar");

PreRoutingInfo preRoutingInfo = new PreRoutingInfo();
preRoutingInfo.setPreUpdateClient(new SamplePreUpdateRoutingClient());
PreRoutingAccessor preRoutingAccessor = new PreRoutingAccessor();
preRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient());

PreUpdateAspectRegistry preUpdateAspectRegistry = new PreUpdateAspectRegistry();
preUpdateAspectRegistry.registerPreUpdateLambda(AspectFoo.class, preRoutingInfo);
preUpdateAspectRegistry.registerPreUpdateLambda(AspectFoo.class, preRoutingAccessor);
_dummyLocalDAO.setPreUpdateAspectRegistry(preUpdateAspectRegistry);

AspectFoo result = _dummyLocalDAO.preUpdateRouting(urn, foo);
Expand All @@ -678,10 +678,10 @@ public void testMAEEmissionForPreUpdateRouting() throws URISyntaxException {
AspectFoo foo = new AspectFoo().setValue("foo");
AspectFoo bar = new AspectFoo().setValue("bar");
_dummyLocalDAO.setAlwaysEmitAuditEvent(true);
PreRoutingInfo preRoutingInfo = new PreRoutingInfo();
preRoutingInfo.setPreUpdateClient(new SamplePreUpdateRoutingClient());
PreRoutingAccessor preRoutingAccessor = new PreRoutingAccessor();
preRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient());
PreUpdateAspectRegistry preUpdateAspectRegistry = new PreUpdateAspectRegistry();
preUpdateAspectRegistry.registerPreUpdateLambda(AspectFoo.class, preRoutingInfo);
preUpdateAspectRegistry.registerPreUpdateLambda(AspectFoo.class, preRoutingAccessor);

_dummyLocalDAO.setPreUpdateAspectRegistry(preUpdateAspectRegistry);
expectGetLatest(urn, AspectFoo.class,
Expand All @@ -701,10 +701,10 @@ public void testPreUpdateRoutingWithUnregisteredAspect() throws URISyntaxExcepti
AspectBar foo = new AspectBar().setValue("foo");

// Inject RestliPreIngestionAspectRegistry with no registered aspect
PreRoutingInfo preRoutingInfo = new PreRoutingInfo();
preRoutingInfo.setPreUpdateClient(new SamplePreUpdateRoutingClient());
PreRoutingAccessor preRoutingAccessor = new PreRoutingAccessor();
preRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient());
PreUpdateAspectRegistry preUpdateAspectRegistry = new PreUpdateAspectRegistry();
preUpdateAspectRegistry.registerPreUpdateLambda(AspectFoo.class, preRoutingInfo);
preUpdateAspectRegistry.registerPreUpdateLambda(AspectFoo.class, preRoutingAccessor);
_dummyLocalDAO.setPreUpdateAspectRegistry(preUpdateAspectRegistry);

// Call the add method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
import static org.mockito.Mockito.*;


public class PreRoutingInfoTest {
private PreRoutingInfo routingInfo;
public class PreRoutingAccessorTest {
private PreRoutingAccessor routingInfo;
private PreUpdateRoutingClient<? extends Message> mockPreUpdateClient;

@BeforeMethod
public void setUp() {
routingInfo = new PreRoutingInfo();
routingInfo = new PreRoutingAccessor();
mockPreUpdateClient = mock(PreUpdateRoutingClient.class);
}

Expand All @@ -26,7 +26,7 @@ public void testPreUpdateClientSetterAndGetter() {

@Test
public void testRoutingActionEnum() {
assertEquals("PROCEED", PreRoutingInfo.RoutingAction.PROCEED.name());
assertEquals("SKIP", PreRoutingInfo.RoutingAction.SKIP.name());
assertEquals("PROCEED", PreRoutingAccessor.RoutingAction.PROCEED.name());
assertEquals("SKIP", PreRoutingAccessor.RoutingAction.SKIP.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.linkedin.data.template.StringArray;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.dao.AspectKey;
import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingInfo;
import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingAccessor;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient;
Expand Down Expand Up @@ -687,8 +687,8 @@ private List<? extends RecordTemplate> getValueFromRoutingGms(@Nonnull URN urn,
* @return the updated aspect
*/
private RecordTemplate preUpdateRouting(URN urn, RecordTemplate aspect, PreUpdateAspectRegistry registry) {
PreRoutingInfo routingMap = registry.getPreUpdateRoutingClient(aspect);
PreUpdateRoutingClient preUpdateClient = routingMap.getPreUpdateClient();
PreRoutingAccessor preRoutingAccessor = registry.getPreUpdateRoutingClient(aspect);
PreUpdateRoutingClient preUpdateClient = preRoutingAccessor.getPreUpdateClient();
PreUpdateResponse preUpdateResponse = preUpdateClient.preUpdate(urn, aspect);
return preUpdateResponse.getUpdatedAspect();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.linkedin.metadata.dao.BaseBrowseDAO;
import com.linkedin.metadata.dao.BaseLocalDAO;
import com.linkedin.metadata.dao.BaseSearchDAO;
import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingInfo;
import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingAccessor;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.dao.utils.RecordUtils;
Expand Down Expand Up @@ -562,9 +562,9 @@ public void testPreUpdateRoutingWithRegisteredAspect() {
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);

PreUpdateAspectRegistry registry = new PreUpdateAspectRegistry();
PreRoutingInfo preRoutingInfo = new PreRoutingInfo();
preRoutingInfo.setPreUpdateClient(new SamplePreUpdateRoutingClient());
registry.registerPreUpdateLambda(AspectFoo.class, preRoutingInfo);
PreRoutingAccessor preRoutingAccessor = new PreRoutingAccessor();
preRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient());
registry.registerPreUpdateLambda(AspectFoo.class, preRoutingAccessor);
when(_mockLocalDAO.getPreUpdateAspectRegistry()).thenReturn(registry);

// given: ingest a snapshot containing a routed aspect which has a registered pre-update lambda.
Expand Down Expand Up @@ -607,9 +607,9 @@ public void testPreUpdateRoutingWithNonRoutedAspectAndRegisteredPreUpdate() {
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);

PreUpdateAspectRegistry registry = new PreUpdateAspectRegistry();
PreRoutingInfo preRoutingInfo = new PreRoutingInfo();
preRoutingInfo.setPreUpdateClient(new SamplePreUpdateRoutingClient());
registry.registerPreUpdateLambda(AspectFoo.class, preRoutingInfo);
PreRoutingAccessor preRoutingAccessor = new PreRoutingAccessor();
preRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient());
registry.registerPreUpdateLambda(AspectFoo.class, preRoutingAccessor);

when(_mockLocalDAO.getPreUpdateAspectRegistry()).thenReturn(registry);

Expand Down Expand Up @@ -658,9 +658,9 @@ public void testPreUpdateRoutingWithSkipIngestion() throws NoSuchFieldException,
List<EntityAspectUnion> aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo));
EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects);
PreUpdateAspectRegistry registry = new PreUpdateAspectRegistry();
PreRoutingInfo preRoutingInfo = new PreRoutingInfo();
preRoutingInfo.setPreUpdateClient(new SamplePreUpdateRoutingClient());
registry.registerPreUpdateLambda(AspectFoo.class, preRoutingInfo);
PreRoutingAccessor preRoutingAccessor = new PreRoutingAccessor();
preRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient());
registry.registerPreUpdateLambda(AspectFoo.class, preRoutingAccessor);
when(_mockLocalDAO.getPreUpdateAspectRegistry()).thenReturn(registry);

runAndWait(_resource.ingest(snapshot));
Expand Down

0 comments on commit fe1743a

Please sign in to comment.