Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to have a common interface for InUpdate Callback #451

Merged
merged 12 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 52 additions & 37 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@
import com.linkedin.metadata.dao.exception.ModelValidationException;
import com.linkedin.metadata.dao.ingestion.BaseLambdaFunction;
import com.linkedin.metadata.dao.ingestion.LambdaFunctionRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingAccessor;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.preupdate.AspectCallbackRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.InUpdateResponse;
import com.linkedin.metadata.dao.ingestion.preupdate.InUpdateRoutingClient;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.retention.IndefiniteRetention;
Expand Down Expand Up @@ -160,6 +159,13 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {
}
}

@Data
@AllArgsConstructor
protected static class AspectUpdateResult<ASPECT extends RecordTemplate> {
private ASPECT updatedAspect;
private boolean skipProcessing;
}

private static final String DEFAULT_ID_NAMESPACE = "global";

private static final String BACKFILL_EMITTER = "dao_backfill_endpoint";
Expand All @@ -183,7 +189,7 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {
protected UrnPathExtractor<URN> _urnPathExtractor;

private LambdaFunctionRegistry _lambdaFunctionRegistry;
private PreUpdateAspectRegistry _preUpdateAspectRegistry = null;
private AspectCallbackRegistry _aspectCallbackRegistry = null;

// Maps an aspect class to the corresponding retention policy
private final Map<Class<? extends RecordTemplate>, Retention> _aspectRetentionMap = new HashMap<>();
Expand Down Expand Up @@ -216,6 +222,7 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {

private Clock _clock = Clock.systemUTC();


/**
* Constructor for BaseLocalDAO.
*
Expand Down Expand Up @@ -314,6 +321,7 @@ public void setClock(@Nonnull Clock clock) {
_clock = clock;
}


/**
* Sets {@link Retention} for a specific aspect type.
*/
Expand Down Expand Up @@ -403,16 +411,16 @@ public void setLambdaFunctionRegistry(@Nullable LambdaFunctionRegistry lambdaFun
/**
* Set pre ingestion aspect registry.
*/
public void setPreUpdateAspectRegistry(
@Nullable PreUpdateAspectRegistry preUpdateAspectRegistry) {
_preUpdateAspectRegistry = preUpdateAspectRegistry;
public void setAspectCallbackRegistry(
@Nullable AspectCallbackRegistry aspectCallbackRegistry) {
_aspectCallbackRegistry = aspectCallbackRegistry;
}

/**
* Get pre ingestion aspect registry.
*/
public PreUpdateAspectRegistry getPreUpdateAspectRegistry() {
return _preUpdateAspectRegistry;
public AspectCallbackRegistry getInUpdateAspectRegistry() {
rakhiagr marked this conversation as resolved.
Show resolved Hide resolved
return _aspectCallbackRegistry;
}


Expand Down Expand Up @@ -584,12 +592,12 @@ public List<ASPECT_UNION> addMany(@Nonnull URN urn,
if (_enableAtomicMultipleUpdate) {
// atomic multiple update enabled: run in a single transaction
results = runInTransactionWithRetry(() ->
aspectUpdateLambdas.stream().map(x -> aspectUpdateHelper(urn, x, auditStamp, trackingContext)).collect(Collectors.toList()),
aspectUpdateLambdas.stream().map(x -> aspectUpdateHelper(urn, x, auditStamp, trackingContext, false)).collect(Collectors.toList()),
maxTransactionRetry);
} else {
// no atomic multiple updates: run each in its own transaction. This is the same as repeated calls to add
results = aspectUpdateLambdas.stream().map(x -> runInTransactionWithRetry(() ->
aspectUpdateHelper(urn, x, auditStamp, trackingContext), maxTransactionRetry)).collect(Collectors.toList());
aspectUpdateHelper(urn, x, auditStamp, trackingContext, false), maxTransactionRetry)).collect(Collectors.toList());
}

// send the audit events etc
Expand All @@ -610,7 +618,7 @@ public List<ASPECT_UNION> addMany(@Nonnull URN urn, @Nonnull List<? extends Reco
}

private <ASPECT extends RecordTemplate> AddResult<ASPECT> aspectUpdateHelper(URN urn, AspectUpdateLambda<ASPECT> updateTuple,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext) {
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext, boolean inSkipUpdate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isRawUpdate?

AspectEntry<ASPECT> latest = getLatest(urn, updateTuple.getAspectClass(), updateTuple.getIngestionParams().isTestMode());

// TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards
Expand All @@ -629,6 +637,13 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> aspectUpdateHelper(URN
if (_lambdaFunctionRegistry != null && _lambdaFunctionRegistry.isRegistered(updateTuple.getAspectClass())) {
newValue = updatePreIngestionLambdas(urn, oldValue, newValue);
}
if (!inSkipUpdate) {
AspectUpdateResult result = inUpdateRouting(urn, newValue, oldValue);
newValue = (ASPECT) result.getUpdatedAspect();
if (result.isSkipProcessing()) {
return null;
}
}

checkValidAspect(newValue.getClass());

Expand Down Expand Up @@ -725,7 +740,7 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull Cla
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull Function<Optional<ASPECT>, ASPECT> updateLambda, @Nonnull AuditStamp auditStamp,
int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext) {
return add(urn, new AspectUpdateLambda<>(aspectClass, updateLambda), auditStamp, maxTransactionRetry, trackingContext);
return add(urn, new AspectUpdateLambda<>(aspectClass, updateLambda), auditStamp, maxTransactionRetry, trackingContext, false);
}

/**
Expand All @@ -734,8 +749,8 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull Cla
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull Function<Optional<ASPECT>, ASPECT> updateLambda, @Nonnull AuditStamp auditStamp,
int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext, @Nonnull IngestionParams ingestionParams) {
return add(urn, new AspectUpdateLambda<>(aspectClass, updateLambda, ingestionParams), auditStamp, maxTransactionRetry, trackingContext);
int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext, @Nonnull IngestionParams ingestionParams, boolean inSkipUpdate) {
return add(urn, new AspectUpdateLambda<>(aspectClass, updateLambda, ingestionParams), auditStamp, maxTransactionRetry, trackingContext, inSkipUpdate);
}

/**
Expand All @@ -757,22 +772,22 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull Cla
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, AspectUpdateLambda<ASPECT> updateLambda,
@Nonnull AuditStamp auditStamp, int maxTransactionRetry) {
return add(urn, updateLambda, auditStamp, maxTransactionRetry, null);
return add(urn, updateLambda, auditStamp, maxTransactionRetry, null, false);
}

/**
* Same as above {@link #add(Urn, AspectUpdateLambda, AuditStamp, int)} but with tracking context.
*/
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, AspectUpdateLambda<ASPECT> updateLambda,
rakhiagr marked this conversation as resolved.
Show resolved Hide resolved
@Nonnull AuditStamp auditStamp, int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext) {
@Nonnull AuditStamp auditStamp, int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext, boolean inSkipUpdate) {
final Class<ASPECT> aspectClass = updateLambda.getAspectClass();
checkValidAspect(aspectClass);

// default test mode is false being set in
// {@link #rawAdd(Urn, RecordTemplate, AuditStamp, IngestionTrackingContext, IngestionParams)}}
final AddResult<ASPECT> result =
runInTransactionWithRetry(() -> aspectUpdateHelper(urn, updateLambda, auditStamp, trackingContext),
runInTransactionWithRetry(() -> aspectUpdateHelper(urn, updateLambda, auditStamp, trackingContext, inSkipUpdate),
maxTransactionRetry);

// skip MAE producing and post update hook in test mode
Expand Down Expand Up @@ -831,8 +846,8 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull Cla
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull Function<Optional<ASPECT>, ASPECT> updateLambda, @Nonnull AuditStamp auditStamp,
@Nullable IngestionTrackingContext trackingContext, @Nonnull IngestionParams ingestionParams) {
return add(urn, aspectClass, updateLambda, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY, trackingContext, ingestionParams);
@Nullable IngestionTrackingContext trackingContext, @Nonnull IngestionParams ingestionParams, boolean inSkipUpdate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, I hope raw update is the only place where we will set the flag to be true

return add(urn, aspectClass, updateLambda, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY, trackingContext, ingestionParams, inSkipUpdate);
}

/**
Expand All @@ -847,15 +862,17 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASP

/**
* Same as above {@link #add(Urn, RecordTemplate, AuditStamp)} but with tracking context.
* Note: If you update the lambda function (ignored - newValue), make sure to update {@link #preUpdateRouting(Urn, RecordTemplate)} as well
* Note: If you update the lambda function (ignored - newValue), make sure to update {@link #inUpdateRouting(Urn, RecordTemplate, Optional)} as well
* to avoid any inconsistency between the lambda function and the add method.
*/
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASPECT newValue,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext,
@Nullable IngestionParams ingestionParams) {
ASPECT updatedAspect = preUpdateRouting(urn, newValue);
return rawAdd(urn, updatedAspect, auditStamp, trackingContext, ingestionParams);
final IngestionParams nonNullIngestionParams =
ingestionParams == null || !ingestionParams.hasTestMode() ? new IngestionParams().setIngestionMode(
IngestionMode.LIVE).setTestMode(false) : ingestionParams;
return add(urn, (Class<ASPECT>) newValue.getClass(), ignored -> newValue, auditStamp, trackingContext, nonNullIngestionParams, false);
}

/**
Expand All @@ -870,7 +887,7 @@ public <ASPECT extends RecordTemplate> ASPECT rawAdd(@Nonnull URN urn, @Nonnull
final IngestionParams nonNullIngestionParams =
ingestionParams == null || !ingestionParams.hasTestMode() ? new IngestionParams().setIngestionMode(
IngestionMode.LIVE).setTestMode(false) : ingestionParams;
return add(urn, (Class<ASPECT>) newValue.getClass(), ignored -> newValue, auditStamp, trackingContext, nonNullIngestionParams);
return add(urn, (Class<ASPECT>) newValue.getClass(), ignored -> newValue, auditStamp, trackingContext, nonNullIngestionParams, true);
}

/**
Expand Down Expand Up @@ -1660,20 +1677,18 @@ protected <ASPECT extends RecordTemplate> ASPECT updatePreIngestionLambdas(@Nonn
/**
* This method routes the update request to the appropriate custom API for pre-ingestion processing.
* @param urn the urn of the asset
* @param newAspect the new aspect value
* @param newAspectValue the new aspect value
* @return the updated aspect
*/
protected <ASPECT extends RecordTemplate> ASPECT preUpdateRouting(URN urn, ASPECT newAspect) {
if (_preUpdateAspectRegistry != null && _preUpdateAspectRegistry.isRegistered(
newAspect.getClass())) {
PreUpdateRoutingAccessor preUpdateRoutingAccessor = _preUpdateAspectRegistry.getPreUpdateRoutingAccessor(newAspect.getClass());
PreUpdateRoutingClient client =
preUpdateRoutingAccessor.getPreUpdateClient();
PreUpdateResponse preUpdateResponse = client.preUpdate(urn, newAspect);
ASPECT updatedAspect = (ASPECT) preUpdateResponse.getUpdatedAspect();
log.info("PreUpdateRouting completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, updatedAspect);
return (ASPECT) updatedAspect;
protected <ASPECT extends RecordTemplate> AspectUpdateResult inUpdateRouting(URN urn, ASPECT newAspectValue, Optional<ASPECT> oldAspectValue) {
rakhiagr marked this conversation as resolved.
Show resolved Hide resolved
if (_aspectCallbackRegistry != null && _aspectCallbackRegistry.isRegistered(
newAspectValue.getClass())) {
InUpdateRoutingClient client = _aspectCallbackRegistry.getInUpdateRoutingClient(newAspectValue.getClass());
rakhiagr marked this conversation as resolved.
Show resolved Hide resolved
InUpdateResponse inUpdateResponse = client.inUpdate(urn, newAspectValue, oldAspectValue);
ASPECT updatedAspect = (ASPECT) inUpdateResponse.getUpdatedAspect();
log.info("InUpdateRouting completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, updatedAspect);
return new AspectUpdateResult(updatedAspect, client.isSkipProcessing());
}
return newAspect;
return new AspectUpdateResult(newAspectValue, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.linkedin.data.template.RecordTemplate;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


/**
* A registry which maintains mapping of aspects and their InUpdateRoutingClient.
*/
@Slf4j
public class AspectCallbackRegistry {

private final Map<Class<? extends RecordTemplate>, InUpdateRoutingClient> _inUpdateLambdaMap;

/**
* Constructor to register in-update routing accessors for multiple aspects at once.
* @param inUpdateMap map containing aspect classes and their corresponding accessors
*/
public AspectCallbackRegistry(@Nonnull Map<Class<? extends RecordTemplate>, InUpdateRoutingClient> inUpdateMap) {
_inUpdateLambdaMap = new HashMap<>(inUpdateMap);
log.info("Registered pre-update routing accessors for aspects: {}", _inUpdateLambdaMap.keySet());
}

/**
* Get In Update Routing Client for an aspect class.
* @param aspectClass the class of the aspect to retrieve the client
* @return InUpdateRoutingClient for the given aspect class, or null if not found
*/
public <ASPECT extends RecordTemplate> InUpdateRoutingClient getInUpdateRoutingClient(
@Nonnull Class<ASPECT> aspectClass) {
return _inUpdateLambdaMap.get(aspectClass);
}

/**
* Check if In Update Routing Client is registered for an aspect.
*/
public <ASPECT extends RecordTemplate> boolean isRegistered(@Nonnull final Class<ASPECT> aspectClass) {
return _inUpdateLambdaMap.containsKey(aspectClass);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import lombok.Data;

/**
* Response of pre-update process that includes the updated aspect.
* Response of in-update process that includes the updated aspect.
*/
@Data
public class PreUpdateResponse<ASPECT extends RecordTemplate> {
public class InUpdateResponse<ASPECT extends RecordTemplate> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this Response class here instead of just return aspect? Are we planning to add more field in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For SourceLineage Ingestion Hook, MM team needs Auditstamp as part of response as well. More details: https://docs.google.com/document/d/1PlPPb4gHJYCViH9pcuwChVoiJDcmll-_1yREvoG3iCI/edit

private final ASPECT updatedAspect;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import java.util.Optional;


/**
* An interface that defines methods to route update requests to the appropriate custom APIs for pre-ingestion process.
*/

public interface InUpdateRoutingClient<ASPECT extends RecordTemplate> {
/**
* A method that routes the update request to the appropriate custom API.
* @param urn the urn of the asset
* @param newAspectValue the aspect to be updated
* @param existingAspectValue the existing aspect value
* @return the updated aspect
*/
InUpdateResponse<ASPECT> inUpdate(Urn urn, ASPECT newAspectValue, Optional<ASPECT> existingAspectValue);

/**
* A method that returns whether to skip processing further ingestion.
* @return true if the ingestion should be skipped, false otherwise
*/
default boolean isSkipProcessing() {
return false;
}
}

This file was deleted.

This file was deleted.

Loading
Loading