Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to have a common interface for InUpdate Callback #451

Merged
merged 12 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 43 additions & 24 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@
import com.linkedin.metadata.dao.exception.ModelValidationException;
import com.linkedin.metadata.dao.ingestion.BaseLambdaFunction;
import com.linkedin.metadata.dao.ingestion.LambdaFunctionRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingAccessor;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.preupdate.InUpdateAspectRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.InUpdateResponse;
import com.linkedin.metadata.dao.ingestion.preupdate.InUpdateRoutingClient;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.retention.IndefiniteRetention;
Expand Down Expand Up @@ -160,6 +159,13 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {
}
}

@Data
@AllArgsConstructor
protected static class AspectUpdateResult<ASPECT extends RecordTemplate> {
private ASPECT updatedAspect;
private boolean skipProcessing;
}

private static final String DEFAULT_ID_NAMESPACE = "global";

private static final String BACKFILL_EMITTER = "dao_backfill_endpoint";
Expand All @@ -183,7 +189,7 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {
protected UrnPathExtractor<URN> _urnPathExtractor;

private LambdaFunctionRegistry _lambdaFunctionRegistry;
private PreUpdateAspectRegistry _preUpdateAspectRegistry = null;
private InUpdateAspectRegistry _inUpdateAspectRegistry = null;

// Maps an aspect class to the corresponding retention policy
private final Map<Class<? extends RecordTemplate>, Retention> _aspectRetentionMap = new HashMap<>();
Expand Down Expand Up @@ -216,6 +222,8 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {

private Clock _clock = Clock.systemUTC();

private boolean skipInUpdate = false;

/**
* Constructor for BaseLocalDAO.
*
Expand Down Expand Up @@ -314,6 +322,14 @@ public void setClock(@Nonnull Clock clock) {
_clock = clock;
}

/**
* Set setSkipInUpdate when rawAdd() is called.
* @param skipInUpdate whether to skip inUpdate
*/
public void setSkipInUpdate(@Nonnull boolean skipInUpdate) {
rakhiagr marked this conversation as resolved.
Show resolved Hide resolved
this.skipInUpdate = skipInUpdate;
}

/**
* Sets {@link Retention} for a specific aspect type.
*/
Expand Down Expand Up @@ -404,15 +420,15 @@ public void setLambdaFunctionRegistry(@Nullable LambdaFunctionRegistry lambdaFun
* Set pre ingestion aspect registry.
*/
public void setPreUpdateAspectRegistry(
rakhiagr marked this conversation as resolved.
Show resolved Hide resolved
@Nullable PreUpdateAspectRegistry preUpdateAspectRegistry) {
_preUpdateAspectRegistry = preUpdateAspectRegistry;
@Nullable InUpdateAspectRegistry inUpdateAspectRegistry) {
_inUpdateAspectRegistry = inUpdateAspectRegistry;
}

/**
* Get pre ingestion aspect registry.
*/
public PreUpdateAspectRegistry getPreUpdateAspectRegistry() {
return _preUpdateAspectRegistry;
public InUpdateAspectRegistry getInUpdateAspectRegistry() {
return _inUpdateAspectRegistry;
}


Expand Down Expand Up @@ -629,7 +645,13 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> aspectUpdateHelper(URN
if (_lambdaFunctionRegistry != null && _lambdaFunctionRegistry.isRegistered(updateTuple.getAspectClass())) {
newValue = updatePreIngestionLambdas(urn, oldValue, newValue);
}

if (!skipInUpdate) {
AspectUpdateResult result = inUpdateRouting(urn, newValue, oldValue);
newValue = (ASPECT) result.getUpdatedAspect();
if (result.isSkipProcessing()) {
return null;
}
}
checkValidAspect(newValue.getClass());

if (_modelValidationOnWrite) {
Expand Down Expand Up @@ -847,15 +869,14 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASP

/**
* Same as above {@link #add(Urn, RecordTemplate, AuditStamp)} but with tracking context.
* Note: If you update the lambda function (ignored - newValue), make sure to update {@link #preUpdateRouting(Urn, RecordTemplate)} as well
* Note: If you update the lambda function (ignored - newValue), make sure to update {@link #inUpdateRouting(Urn, RecordTemplate, Optional)} as well
* to avoid any inconsistency between the lambda function and the add method.
*/
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASPECT newValue,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext,
@Nullable IngestionParams ingestionParams) {
ASPECT updatedAspect = preUpdateRouting(urn, newValue);
return rawAdd(urn, updatedAspect, auditStamp, trackingContext, ingestionParams);
return rawAdd(urn, newValue, auditStamp, trackingContext, ingestionParams);
}

/**
Expand Down Expand Up @@ -1660,20 +1681,18 @@ protected <ASPECT extends RecordTemplate> ASPECT updatePreIngestionLambdas(@Nonn
/**
* This method routes the update request to the appropriate custom API for pre-ingestion processing.
* @param urn the urn of the asset
* @param newAspect the new aspect value
* @param newAspectValue the new aspect value
* @return the updated aspect
*/
protected <ASPECT extends RecordTemplate> ASPECT preUpdateRouting(URN urn, ASPECT newAspect) {
if (_preUpdateAspectRegistry != null && _preUpdateAspectRegistry.isRegistered(
newAspect.getClass())) {
PreUpdateRoutingAccessor preUpdateRoutingAccessor = _preUpdateAspectRegistry.getPreUpdateRoutingAccessor(newAspect.getClass());
PreUpdateRoutingClient client =
preUpdateRoutingAccessor.getPreUpdateClient();
PreUpdateResponse preUpdateResponse = client.preUpdate(urn, newAspect);
ASPECT updatedAspect = (ASPECT) preUpdateResponse.getUpdatedAspect();
protected <ASPECT extends RecordTemplate> AspectUpdateResult inUpdateRouting(URN urn, ASPECT newAspectValue, Optional<ASPECT> oldAspectValue) {
rakhiagr marked this conversation as resolved.
Show resolved Hide resolved
if (_inUpdateAspectRegistry != null && _inUpdateAspectRegistry.isRegistered(
newAspectValue.getClass())) {
InUpdateRoutingClient client = _inUpdateAspectRegistry.getInUpdateRoutingClient(newAspectValue.getClass());
InUpdateResponse inUpdateResponse = client.inUpdate(urn, newAspectValue, oldAspectValue);
ASPECT updatedAspect = (ASPECT) inUpdateResponse.getUpdatedAspect();
log.info("PreUpdateRouting completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, updatedAspect);
rakhiagr marked this conversation as resolved.
Show resolved Hide resolved
return (ASPECT) updatedAspect;
return new AspectUpdateResult(updatedAspect, client.isSkipProcessing());
}
return newAspect;
return new AspectUpdateResult(newAspectValue, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.linkedin.data.template.RecordTemplate;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


/**
* A registry which maintains mapping of aspects and their InUpdateRoutingClient.
*/
@Slf4j
public class InUpdateAspectRegistry {

private final Map<Class<? extends RecordTemplate>, InUpdateRoutingClient> _inUpdateLambdaMap;

/**
* Constructor to register in-update routing accessors for multiple aspects at once.
* @param inUpdateMap map containing aspect classes and their corresponding accessors
*/
public InUpdateAspectRegistry(@Nonnull Map<Class<? extends RecordTemplate>, InUpdateRoutingClient> inUpdateMap) {
_inUpdateLambdaMap = new HashMap<>(inUpdateMap);
log.info("Registered pre-update routing accessors for aspects: {}", _inUpdateLambdaMap.keySet());
}

/**
* Get In Update Routing Client for an aspect class.
* @param aspectClass the class of the aspect to retrieve the client
* @return InUpdateRoutingClient for the given aspect class, or null if not found
*/
public <ASPECT extends RecordTemplate> InUpdateRoutingClient getInUpdateRoutingClient(
@Nonnull Class<ASPECT> aspectClass) {
return _inUpdateLambdaMap.get(aspectClass);
}

/**
* Check if In Update Routing Client is registered for an aspect.
*/
public <ASPECT extends RecordTemplate> boolean isRegistered(@Nonnull final Class<ASPECT> aspectClass) {
return _inUpdateLambdaMap.containsKey(aspectClass);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import lombok.Data;

/**
* Response of pre-update process that includes the updated aspect.
* Response of in-update process that includes the updated aspect.
*/
@Data
public class PreUpdateResponse<ASPECT extends RecordTemplate> {
public class InUpdateResponse<ASPECT extends RecordTemplate> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this Response class here instead of just return aspect? Are we planning to add more field in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For SourceLineage Ingestion Hook, MM team needs Auditstamp as part of response as well. More details: https://docs.google.com/document/d/1PlPPb4gHJYCViH9pcuwChVoiJDcmll-_1yREvoG3iCI/edit

private final ASPECT updatedAspect;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import java.util.Optional;


/**
* An interface that defines methods to route update requests to the appropriate custom APIs for pre-ingestion process.
*/

public interface InUpdateRoutingClient<ASPECT extends RecordTemplate> {
/**
* A method that routes the update request to the appropriate custom API.
* @param urn the urn of the asset
* @param newAspectValue the aspect to be updated
* @param existingAspectValue the existing aspect value
* @return the updated aspect
*/
InUpdateResponse<ASPECT> inUpdate(Urn urn, ASPECT newAspectValue, Optional<ASPECT> existingAspectValue);

/**
* A method that returns whether to skip processing further ingestion.
* @return true if the ingestion should be skipped, false otherwise
*/
default boolean isSkipProcessing() {
return false;
}
}

This file was deleted.

This file was deleted.

This file was deleted.

Loading
Loading