Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to have a common interface for InUpdate Callback #451

Merged
merged 12 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 89 additions & 34 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import com.linkedin.metadata.dao.equality.DefaultEqualityTester;
import com.linkedin.metadata.dao.equality.EqualityTester;
import com.linkedin.metadata.dao.exception.ModelValidationException;
import com.linkedin.metadata.dao.ingestion.AspectCallbackResponse;
import com.linkedin.metadata.dao.ingestion.BaseLambdaFunction;
import com.linkedin.metadata.dao.ingestion.LambdaFunctionRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingAccessor;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.AspectCallbackRegistry;
import com.linkedin.metadata.dao.ingestion.AspectCallbackRoutingClient;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.retention.IndefiniteRetention;
Expand Down Expand Up @@ -160,6 +159,13 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {
}
}

@Data
@AllArgsConstructor
protected static class AspectUpdateResult<ASPECT extends RecordTemplate> {
private ASPECT updatedAspect;
private boolean skipProcessing;
}

private static final String DEFAULT_ID_NAMESPACE = "global";

private static final String BACKFILL_EMITTER = "dao_backfill_endpoint";
Expand All @@ -183,7 +189,7 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {
protected UrnPathExtractor<URN> _urnPathExtractor;

private LambdaFunctionRegistry _lambdaFunctionRegistry;
private PreUpdateAspectRegistry _preUpdateAspectRegistry = null;
private AspectCallbackRegistry _aspectCallbackRegistry = null;

// Maps an aspect class to the corresponding retention policy
private final Map<Class<? extends RecordTemplate>, Retention> _aspectRetentionMap = new HashMap<>();
Expand Down Expand Up @@ -216,6 +222,7 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {

private Clock _clock = Clock.systemUTC();


/**
* Constructor for BaseLocalDAO.
*
Expand Down Expand Up @@ -314,6 +321,7 @@ public void setClock(@Nonnull Clock clock) {
_clock = clock;
}


/**
* Sets {@link Retention} for a specific aspect type.
*/
Expand Down Expand Up @@ -401,21 +409,20 @@ public void setLambdaFunctionRegistry(@Nullable LambdaFunctionRegistry lambdaFun
}

/**
* Set pre ingestion aspect registry.
* Set aspect callback registry.
*/
public void setPreUpdateAspectRegistry(
@Nullable PreUpdateAspectRegistry preUpdateAspectRegistry) {
_preUpdateAspectRegistry = preUpdateAspectRegistry;
public void setAspectCallbackRegistry(
@Nullable AspectCallbackRegistry aspectCallbackRegistry) {
_aspectCallbackRegistry = aspectCallbackRegistry;
}

/**
* Get pre ingestion aspect registry.
* Get aspect callback registry.
*/
public PreUpdateAspectRegistry getPreUpdateAspectRegistry() {
return _preUpdateAspectRegistry;
public AspectCallbackRegistry getAspectCallbackRegistry() {
return _aspectCallbackRegistry;
}


/**
* Enables or disables atomic updates of multiple aspects.
*/
Expand Down Expand Up @@ -611,6 +618,11 @@ public List<ASPECT_UNION> addMany(@Nonnull URN urn, @Nonnull List<? extends Reco

private <ASPECT extends RecordTemplate> AddResult<ASPECT> aspectUpdateHelper(URN urn, AspectUpdateLambda<ASPECT> updateTuple,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext) {
return aspectUpdateHelper(urn, updateTuple, auditStamp, trackingContext, false);
}

private <ASPECT extends RecordTemplate> AddResult<ASPECT> aspectUpdateHelper(URN urn, AspectUpdateLambda<ASPECT> updateTuple,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext, boolean isRawUpdate) {
AspectEntry<ASPECT> latest = getLatest(urn, updateTuple.getAspectClass(), updateTuple.getIngestionParams().isTestMode());

// TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards
Expand All @@ -629,6 +641,15 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> aspectUpdateHelper(URN
if (_lambdaFunctionRegistry != null && _lambdaFunctionRegistry.isRegistered(updateTuple.getAspectClass())) {
newValue = updatePreIngestionLambdas(urn, oldValue, newValue);
}
// this will skip the pre/in update callbacks
if (!isRawUpdate) {
AspectUpdateResult result = aspectCallbackHelper(urn, newValue, oldValue);
newValue = (ASPECT) result.getUpdatedAspect();
// skip the normal ingestion to the DAO
if (result.isSkipProcessing()) {
return null;
}
}

checkValidAspect(newValue.getClass());

Expand Down Expand Up @@ -738,6 +759,18 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull Cla
return add(urn, new AspectUpdateLambda<>(aspectClass, updateLambda, ingestionParams), auditStamp, maxTransactionRetry, trackingContext);
}

/**
* Same as above {@link #add(Urn, Class, Function, AuditStamp, int, IngestionTrackingContext, IngestionParams)} but to skip aspect callback routing.
* DO NOT USE THIS METHOD WITHOUT EXPLICIT PERMISSION FROM THE METADATA GRAPH TEAM.
* Please use the regular add method linked above.
*/
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT rawAdd(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull Function<Optional<ASPECT>, ASPECT> updateLambda, @Nonnull AuditStamp auditStamp,
int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext, @Nonnull IngestionParams ingestionParams) {
return add(urn, new AspectUpdateLambda<>(aspectClass, updateLambda, ingestionParams), auditStamp, maxTransactionRetry, trackingContext, true);
}

/**
* Adds a new version of an aspect for an entity.
*
Expand Down Expand Up @@ -766,13 +799,22 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, AspectUpdate
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, AspectUpdateLambda<ASPECT> updateLambda,
rakhiagr marked this conversation as resolved.
Show resolved Hide resolved
@Nonnull AuditStamp auditStamp, int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext) {
return add(urn, updateLambda, auditStamp, maxTransactionRetry, trackingContext, false);
}

/**
* Same as above {@link #add(Urn, AspectUpdateLambda, AuditStamp, int, IngestionTrackingContext)} but with a flag to skip aspect callback routing.
*/
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, AspectUpdateLambda<ASPECT> updateLambda,
@Nonnull AuditStamp auditStamp, int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext, boolean isRawUpdate) {
final Class<ASPECT> aspectClass = updateLambda.getAspectClass();
checkValidAspect(aspectClass);

// default test mode is false being set in
// {@link #rawAdd(Urn, RecordTemplate, AuditStamp, IngestionTrackingContext, IngestionParams)}}
final AddResult<ASPECT> result =
runInTransactionWithRetry(() -> aspectUpdateHelper(urn, updateLambda, auditStamp, trackingContext),
runInTransactionWithRetry(() -> aspectUpdateHelper(urn, updateLambda, auditStamp, trackingContext, isRawUpdate),
maxTransactionRetry);

// skip MAE producing and post update hook in test mode
Expand Down Expand Up @@ -835,6 +877,18 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull Cla
return add(urn, aspectClass, updateLambda, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY, trackingContext, ingestionParams);
}

/**
* Same as above {@link #add(Urn, Class, Function, AuditStamp, IngestionTrackingContext, IngestionParams)} but skips any aspect callbacks.
* DO NOT USE THIS METHOD WITHOUT EXPLICIT PERMISSION FROM THE METADATA GRAPH TEAM.
* Please use the regular add method linked above.
*/
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT rawAdd(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull Function<Optional<ASPECT>, ASPECT> updateLambda, @Nonnull AuditStamp auditStamp,
@Nullable IngestionTrackingContext trackingContext, @Nonnull IngestionParams ingestionParams) {
return rawAdd(urn, aspectClass, updateLambda, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY, trackingContext, ingestionParams);
}

/**
* Similar to {@link #add(Urn, Class, Function, AuditStamp)} but takes the new value directly.
*/
Expand All @@ -847,20 +901,22 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASP

/**
* Same as above {@link #add(Urn, RecordTemplate, AuditStamp)} but with tracking context.
* Note: If you update the lambda function (ignored - newValue), make sure to update {@link #preUpdateRouting(Urn, RecordTemplate)} as well
* Note: If you update the lambda function (ignored - newValue), make sure to update {@link #aspectCallbackHelper(Urn, RecordTemplate, Optional)} as well
* to avoid any inconsistency between the lambda function and the add method.
*/
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASPECT newValue,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext,
@Nullable IngestionParams ingestionParams) {
ASPECT updatedAspect = preUpdateRouting(urn, newValue);
return rawAdd(urn, updatedAspect, auditStamp, trackingContext, ingestionParams);
final IngestionParams nonNullIngestionParams =
ingestionParams == null || !ingestionParams.hasTestMode() ? new IngestionParams().setIngestionMode(
IngestionMode.LIVE).setTestMode(false) : ingestionParams;
return add(urn, (Class<ASPECT>) newValue.getClass(), ignored -> newValue, auditStamp, trackingContext, nonNullIngestionParams);
}

/**
* Same as above {@link #add(Urn, RecordTemplate, AuditStamp, IngestionTrackingContext, IngestionParams)} but
* skips any pre-update lambdas. DO NOT USE THIS METHOD WITHOUT EXPLICIT PERMISSION FROM THE METADATA GRAPH TEAM.
* skips any aspect callbacks. DO NOT USE THIS METHOD WITHOUT EXPLICIT PERMISSION FROM THE METADATA GRAPH TEAM.
* Please use the regular add method linked above.
*/
@Nonnull
Expand All @@ -870,7 +926,7 @@ public <ASPECT extends RecordTemplate> ASPECT rawAdd(@Nonnull URN urn, @Nonnull
final IngestionParams nonNullIngestionParams =
ingestionParams == null || !ingestionParams.hasTestMode() ? new IngestionParams().setIngestionMode(
IngestionMode.LIVE).setTestMode(false) : ingestionParams;
return add(urn, (Class<ASPECT>) newValue.getClass(), ignored -> newValue, auditStamp, trackingContext, nonNullIngestionParams);
return rawAdd(urn, (Class<ASPECT>) newValue.getClass(), ignored -> newValue, auditStamp, trackingContext, nonNullIngestionParams);
}

/**
Expand Down Expand Up @@ -1658,22 +1714,21 @@ protected <ASPECT extends RecordTemplate> ASPECT updatePreIngestionLambdas(@Nonn
}

/**
* This method routes the update request to the appropriate custom API for pre-ingestion processing.
* This method routes the aspect updates to the appropriate aspect callback clients and get the updated aspect as response.
* @param urn the urn of the asset
* @param newAspect the new aspect value
* @return the updated aspect
*/
protected <ASPECT extends RecordTemplate> ASPECT preUpdateRouting(URN urn, ASPECT newAspect) {
if (_preUpdateAspectRegistry != null && _preUpdateAspectRegistry.isRegistered(
newAspect.getClass())) {
PreUpdateRoutingAccessor preUpdateRoutingAccessor = _preUpdateAspectRegistry.getPreUpdateRoutingAccessor(newAspect.getClass());
PreUpdateRoutingClient client =
preUpdateRoutingAccessor.getPreUpdateClient();
PreUpdateResponse preUpdateResponse = client.preUpdate(urn, newAspect);
ASPECT updatedAspect = (ASPECT) preUpdateResponse.getUpdatedAspect();
log.info("PreUpdateRouting completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, updatedAspect);
return (ASPECT) updatedAspect;
* @param newAspectValue the new aspect value
* @param oldAspectValue the old aspect value
* @return AspectUpdateResult which contains updated aspect value
*/
protected <ASPECT extends RecordTemplate> AspectUpdateResult aspectCallbackHelper(URN urn, ASPECT newAspectValue, Optional<ASPECT> oldAspectValue) {
if (_aspectCallbackRegistry != null && _aspectCallbackRegistry.isRegistered(
newAspectValue.getClass())) {
AspectCallbackRoutingClient client = _aspectCallbackRegistry.getAspectCallbackRoutingClient(newAspectValue.getClass());
AspectCallbackResponse aspectCallbackResponse = client.routeAspectCallback(urn, newAspectValue, oldAspectValue);
ASPECT updatedAspect = (ASPECT) aspectCallbackResponse.getUpdatedAspect();
log.info("Aspect callback routing completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, updatedAspect);
return new AspectUpdateResult(updatedAspect, client.isSkipProcessing());
}
return newAspect;
return new AspectUpdateResult(newAspectValue, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.linkedin.metadata.dao.ingestion;

import com.linkedin.data.template.RecordTemplate;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


/**
* A registry which maintains mapping of aspects and their Aspect Routing Client.
*/
@Slf4j
public class AspectCallbackRegistry {

private final Map<Class<? extends RecordTemplate>, AspectCallbackRoutingClient> aspectCallbackMap;

/**
* Constructor to register aspect callback routing clients for aspects.
* @param aspectCallbackMap map containing aspect classes and their corresponding cleints
*/
public AspectCallbackRegistry(@Nonnull Map<Class<? extends RecordTemplate>, AspectCallbackRoutingClient> aspectCallbackMap) {
this.aspectCallbackMap = new HashMap<>(aspectCallbackMap);
log.info("Registered aspect callback clients for aspects: {}", aspectCallbackMap.keySet());
}

/**
* Get Aspect Callback Routing Client for an aspect class.
* @param aspectClass the class of the aspect to retrieve the client
* @return AspectCallbackRoutingClient for the given aspect class, or null if not found
*/
public <ASPECT extends RecordTemplate> AspectCallbackRoutingClient getAspectCallbackRoutingClient(
@Nonnull Class<ASPECT> aspectClass) {
return aspectCallbackMap.get(aspectClass);
}

/**
* Check if Aspect Callback Routing Client is registered for an aspect.
*/
public <ASPECT extends RecordTemplate> boolean isRegistered(@Nonnull final Class<ASPECT> aspectClass) {
return aspectCallbackMap.containsKey(aspectClass);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.linkedin.metadata.dao.ingestion;

import com.linkedin.data.template.RecordTemplate;
import lombok.Data;

/**
* Response of in-update process that includes the updated aspect. It can be extended to include additional information.
*/
@Data
public class AspectCallbackResponse<ASPECT extends RecordTemplate> {
private final ASPECT updatedAspect;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.linkedin.metadata.dao.ingestion;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import java.util.Optional;


/**
* An interface that defines the client for aspect callback routing.
*/
public interface AspectCallbackRoutingClient<ASPECT extends RecordTemplate> {
/**
* A method that routes the updates request to the appropriate custom API.
* @param urn the urn of the asset
* @param newAspectValue the aspect to be updated
* @param existingAspectValue the existing aspect value
* @return AspectCallbackResponse containing the updated aspect
*/
AspectCallbackResponse<ASPECT> routeAspectCallback(Urn urn, ASPECT newAspectValue, Optional<ASPECT> existingAspectValue);

/**
* A method that returns whether to skip processing further ingestion.
* @return true if the ingestion should be skipped, false otherwise
*/
default boolean isSkipProcessing() {
return false;
}
}

This file was deleted.

This file was deleted.

Loading
Loading