Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backfill relationship from entity table #302

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.backfill.BackfillMode;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.equality.DefaultEqualityTester;
import com.linkedin.metadata.dao.equality.EqualityTester;
import com.linkedin.metadata.dao.exception.ModelValidationException;
Expand Down Expand Up @@ -806,8 +807,10 @@ public abstract <ASPECT extends RecordTemplate> void updateLocalIndex(@Nonnull U
*
* @param urn the URN for the entity the aspect (which the local relationship is derived from) is attached to
* @param aspectClass class of the aspect to backfill
* @return A list of local relationship updates executed.
*/
public abstract <ASPECT extends RecordTemplate> void backfillLocalRelationshipsFromEntityTables(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass);
public abstract <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLocalRelationshipsFromEntityTables(
zhixuanjia marked this conversation as resolved.
Show resolved Hide resolved
@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass);

/**
* Returns list of urns from local secondary index that satisfy the given filter conditions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
*/
public abstract class BaseLocalRelationshipBuilder<ASPECT extends RecordTemplate> {

private Class<ASPECT> _aspectClass;
private final Class<ASPECT> _aspectClass;

@Value
public class LocalRelationshipUpdates {
public static class LocalRelationshipUpdates {
List<? extends RecordTemplate> relationships;
BaseGraphWriterDAO.RemovalOption removalOption;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.common.AuditStamp;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.retention.TimeBasedRetention;
Expand Down Expand Up @@ -84,8 +85,9 @@ public <ASPECT extends RecordTemplate> void updateEntityTables(@Nonnull FooUrn u
}

@Override
public <ASPECT extends RecordTemplate> void backfillLocalRelationshipsFromEntityTables(@Nonnull FooUrn urn, @Nonnull Class<ASPECT> aspectClass) {

public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLocalRelationshipsFromEntityTables(
@Nonnull FooUrn urn, @Nonnull Class<ASPECT> aspectClass) {
return null;
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.SetMode;
import com.linkedin.metadata.aspect.AuditedAspect;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.builder.LocalRelationshipBuilderRegistry;
import com.linkedin.metadata.dao.scsi.EmptyPathExtractor;
import com.linkedin.metadata.dao.scsi.UrnPathExtractor;
Expand All @@ -29,6 +29,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -134,13 +135,18 @@ public <ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPEC
}

@Override
public <ASPECT extends RecordTemplate> void addRelationships(@Nonnull URN urn, @Nonnull ASPECT aspect, @Nonnull Class<ASPECT> aspectClass) {
public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates>
addRelationships(@Nonnull URN urn, @Nonnull ASPECT aspect, @Nonnull Class<ASPECT> aspectClass) {
if (_localRelationshipBuilderRegistry != null && _localRelationshipBuilderRegistry.isRegistered(aspectClass)) {
List<BaseLocalRelationshipBuilder<ASPECT>.LocalRelationshipUpdates> localRelationshipUpdates =
List<LocalRelationshipUpdates> localRelationshipUpdates =
_localRelationshipBuilderRegistry.getLocalRelationshipBuilder(aspect).buildRelationships(urn, aspect);

_localRelationshipWriterDAO.processLocalRelationshipUpdates(localRelationshipUpdates);

return localRelationshipUpdates;
}

return new ArrayList<>();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.builder.LocalRelationshipBuilderRegistry;
import com.linkedin.metadata.dao.exception.ModelConversionException;
import com.linkedin.metadata.dao.exception.RetryLimitReached;
Expand Down Expand Up @@ -578,19 +579,19 @@ public <ASPECT extends RecordTemplate> void updateEntityTables(@Nonnull URN urn,
}, 1);
}

public <ASPECT extends RecordTemplate> void backfillLocalRelationshipsFromEntityTables(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass) {
public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLocalRelationshipsFromEntityTables(
@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass) {
if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY) {
throw new UnsupportedOperationException("Local relationship tables cannot be used in OLD_SCHEMA_ONLY mode, so they cannot be backfilled.");
}
AspectKey<URN, ASPECT> key = new AspectKey<>(aspectClass, urn, LATEST_VERSION);
runInTransactionWithRetry(() -> {
return runInTransactionWithRetry(() -> {
List<EbeanMetadataAspect> results = _localAccess.batchGetUnion(Collections.singletonList(key), 1, 0);
if (results.size() == 0) {
return null; // unused
return new ArrayList<>();
}
Optional<ASPECT> aspect = toRecordTemplate(aspectClass, results.get(0));
aspect.ifPresent(value -> _localAccess.addRelationships(urn, value, aspectClass));
return null; // unused
return aspect.map(value -> _localAccess.addRelationships(urn, value, aspectClass)).orElse(new ArrayList<>());
}, 1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.internal.BaseGraphWriterDAO;
import com.linkedin.metadata.dao.utils.GraphUtils;
import com.linkedin.metadata.dao.utils.RecordUtils;
Expand Down Expand Up @@ -48,9 +48,9 @@ public EbeanLocalRelationshipWriterDAO(EbeanServer server) {
*/
@Transactional
public <ASPECT extends RecordTemplate> void processLocalRelationshipUpdates(
@Nonnull List<BaseLocalRelationshipBuilder<ASPECT>.LocalRelationshipUpdates> relationshipUpdates) {
@Nonnull List<LocalRelationshipUpdates> relationshipUpdates) {

for (BaseLocalRelationshipBuilder<ASPECT>.LocalRelationshipUpdates relationshipUpdate : relationshipUpdates) {
for (LocalRelationshipUpdates relationshipUpdate : relationshipUpdates) {
addRelationships(relationshipUpdate.getRelationships(), relationshipUpdate.getRemovalOption());
}
}
Expand Down Expand Up @@ -95,7 +95,7 @@ private <RELATIONSHIP extends RecordTemplate> void addRelationshipGroup(@Nonnull
RELATIONSHIP firstRelationship = relationshipGroup.get(0);
RelationshipValidator.validateRelationshipSchema(firstRelationship.getClass());

// Process remove option to delete some local relationships if nedded before adding new relationships.
// Process remove option to delete some local relationships if needed before adding new relationships.
processRemovalOption(SQLSchemaUtils.getRelationshipTableName(firstRelationship), firstRelationship, removalOption);

long now = Instant.now().toEpochMilli();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.builder.LocalRelationshipBuilderRegistry;
import com.linkedin.metadata.dao.scsi.UrnPathExtractor;
import com.linkedin.metadata.query.IndexFilter;
Expand Down Expand Up @@ -37,8 +38,11 @@ <ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPECT newVa
* @param urn urn associated with the relationships
* @param relationship aspect from which the relationships are derived from
* @param aspectClass class of the aspect
* @return relationship updates applied on relationship table
*/
<ASPECT extends RecordTemplate> void addRelationships(@Nonnull URN urn, @Nonnull ASPECT relationship, @Nonnull Class<ASPECT> aspectClass);
@Nonnull
<ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> addRelationships(@Nonnull URN urn,
@Nonnull ASPECT relationship, @Nonnull Class<ASPECT> aspectClass);

/**
* Get read aspects from entity table. This a new schema implementation for batchGetUnion() in {@link EbeanLocalDAO}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.metadata.dao.EbeanLocalDAO.FindMethodology;
import com.linkedin.metadata.dao.EbeanLocalDAO.SchemaConfig;
import com.linkedin.metadata.dao.EbeanMetadataAspect.PrimaryKey;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder;
import com.linkedin.metadata.dao.equality.AlwaysFalseEqualityTester;
import com.linkedin.metadata.dao.equality.DefaultEqualityTester;
import com.linkedin.metadata.dao.exception.InvalidMetadataType;
Expand Down Expand Up @@ -104,12 +105,14 @@
import org.testng.annotations.Test;

import static com.linkedin.common.AuditStamps.*;
import static com.linkedin.metadata.dao.internal.BaseGraphWriterDAO.RemovalOption.*;
import static com.linkedin.metadata.dao.utils.EBeanDAOUtils.*;
import static com.linkedin.metadata.dao.utils.SQLSchemaUtils.*;
import static com.linkedin.testing.TestUtils.*;
import static org.mockito.Mockito.*;
import static org.testng.Assert.*;


public class EbeanLocalDAOTest {
private long _now;
private EbeanServer _server;
Expand Down Expand Up @@ -3061,16 +3064,36 @@ public void testBackfillLocalRelationshipsFromEntityTables() throws URISyntaxExc
BarUrn barUrn1 = BarUrn.createFromString("urn:li:bar:1");
BarUrn barUrn2 = BarUrn.createFromString("urn:li:bar:2");
BarUrn barUrn3 = BarUrn.createFromString("urn:li:bar:3");
AspectFooBar aspectFooBar = new AspectFooBar().setBars(new BarUrnArray(barUrn1, barUrn2, barUrn3));
BarUrnArray barUrns = new BarUrnArray(barUrn1, barUrn2, barUrn3);
AspectFooBar aspectFooBar = new AspectFooBar().setBars(barUrns);
dao.add(fooUrn, aspectFooBar, _dummyAuditStamp);

// clear local relationship table
_server.createSqlUpdate("delete from metadata_relationship_belongsto").execute();

List<BaseLocalRelationshipBuilder.LocalRelationshipUpdates> relationshipUpdates =
dao.backfillLocalRelationshipsFromEntityTables(fooUrn, AspectFooBar.class);

List<SqlRow> results = _server.createSqlQuery("select * from metadata_relationship_belongsto").findList();
assertEquals(3, results.size());
assertEquals(results.size(), 3);
assertEquals(relationshipUpdates.size(), 1);
assertEquals(relationshipUpdates.get(0).getRemovalOption(), REMOVE_ALL_EDGES_TO_DESTINATION);

BarUrnArray sources = new BarUrnArray();
for (int i = 0; i < results.size(); i++) {
try {
RecordTemplate relationship = relationshipUpdates.get(0).getRelationships().get(i);
Urn source = (Urn) relationship.getClass().getMethod("getSource").invoke(relationship);
Urn dest = (Urn) relationship.getClass().getMethod("getDestination").invoke(relationship);
assertEquals(dest.toString(), "urn:li:foo:1");
sources.add(BarUrn.createFromString(source.toString()));
assertEquals(relationshipUpdates.get(0).getRelationships().get(i).getClass().getSimpleName(), "BelongsTo");
} catch (Exception e) {
throw new RuntimeException(e);
}
}

assertEquals(sources, barUrns);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.linkedin.metadata.dao.localrelationship.builder.ReportsToLocalRelationshipBuilder;
import com.linkedin.metadata.dao.localrelationship.builder.VersionOfLocalRelationshipBuilder;
import com.linkedin.metadata.dao.utils.EmbeddedMariaInstance;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.testing.BarUrnArray;
import com.linkedin.testing.localrelationship.AspectFooBar;
import com.linkedin.testing.urn.BarUrn;
Expand Down Expand Up @@ -47,7 +47,7 @@ public void testAddRelationshipWithRemoveAllEdgesToDestination() throws URISynta
BarUrn.createFromString("urn:li:bar:456"),
BarUrn.createFromString("urn:li:bar:789")));

List<BaseLocalRelationshipBuilder<AspectFooBar>.LocalRelationshipUpdates> updates = new BelongsToLocalRelationshipBuilder(AspectFooBar.class)
List<LocalRelationshipUpdates> updates = new BelongsToLocalRelationshipBuilder(AspectFooBar.class)
.buildRelationships(FooUrn.createFromString("urn:li:foo:123"), aspectFooBar);

// Before processing
Expand Down Expand Up @@ -82,7 +82,7 @@ public void testAddRelationshipWithRemoveNone() throws URISyntaxException {
BarUrn.createFromString("urn:li:bar:456"),
BarUrn.createFromString("urn:li:bar:789")));

List<BaseLocalRelationshipBuilder<AspectFooBar>.LocalRelationshipUpdates> updates = new ReportsToLocalRelationshipBuilder(AspectFooBar.class)
List<LocalRelationshipUpdates> updates = new ReportsToLocalRelationshipBuilder(AspectFooBar.class)
.buildRelationships(FooUrn.createFromString("urn:li:foo:123"), aspectFooBar);

// Before processing
Expand Down Expand Up @@ -114,7 +114,7 @@ public void testAddRelationshipWithRemoveAllEdgesFromSourceToDestination() throw

AspectFooBar aspectFooBar = new AspectFooBar().setBars(new BarUrnArray(BarUrn.createFromString("urn:li:bar:123")));

List<BaseLocalRelationshipBuilder<AspectFooBar>.LocalRelationshipUpdates> updates = new PairsWithLocalRelationshipBuilder(AspectFooBar.class)
List<LocalRelationshipUpdates> updates = new PairsWithLocalRelationshipBuilder(AspectFooBar.class)
.buildRelationships(FooUrn.createFromString("urn:li:foo:123"), aspectFooBar);

// Before processing
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testAddRelationshipWithRemoveAllEdgesFromSource() throws URISyntaxEx

AspectFooBar aspectFooBar = new AspectFooBar().setBars(new BarUrnArray(BarUrn.createFromString("urn:li:bar:123")));

List<BaseLocalRelationshipBuilder<AspectFooBar>.LocalRelationshipUpdates> updates = new VersionOfLocalRelationshipBuilder(AspectFooBar.class)
List<LocalRelationshipUpdates> updates = new VersionOfLocalRelationshipBuilder(AspectFooBar.class)
.buildRelationships(FooUrn.createFromString("urn:li:foo:123"), aspectFooBar);

// Before processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public <URN extends Urn> List<LocalRelationshipUpdates> buildRelationships(@Nonn
reportsToRelationships.add(new ReportsTo().setSource(barUrn).setDestination(urn));
}

LocalRelationshipUpdates localRelationshipUpdates = new LocalRelationshipUpdates(reportsToRelationships,
BaseGraphWriterDAO.RemovalOption.REMOVE_NONE);
LocalRelationshipUpdates localRelationshipUpdates =
new LocalRelationshipUpdates(reportsToRelationships, BaseGraphWriterDAO.RemovalOption.REMOVE_NONE);

return Collections.singletonList(localRelationshipUpdates);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,42 @@ public Task<BackfillResult> backfillEntityTables(@ActionParam(PARAM_URNS) @Nonnu
});
}

/**
* Backfill the relationship tables from entity table.
*/
@Action(name = ACTION_BACKFILL_RELATIONSHIP_TABLES)
@Nonnull
public Task<BackfillResult> backfillRelationshipTables(@ActionParam(PARAM_URNS) @Nonnull String[] urns,
@ActionParam(PARAM_ASPECTS) @Nonnull String[] aspectNames) {
final BackfillResult backfillResult = new BackfillResult()
.setEntities(new BackfillResultEntityArray())
.setRelationships(new BackfillResultRelationshipArray());

for (String urn : urns) {
for (Class<? extends RecordTemplate> aspect : parseAspectsParam(aspectNames)) {
getLocalDAO().backfillLocalRelationshipsFromEntityTables(parseUrnParam(urn), aspect).forEach(relationshipUpdates -> {
relationshipUpdates.getRelationships().forEach(relationship -> {
try {
Urn source = (Urn) relationship.getClass().getMethod("getSource").invoke(relationship);
Urn dest = (Urn) relationship.getClass().getMethod("getDestination").invoke(relationship);
BackfillResultRelationship backfillResultRelationship = new BackfillResultRelationship()
.setSource(source)
.setDestination(dest)
.setRemovalOption(relationshipUpdates.getRemovalOption().name())
.setRelationship(relationship.getClass().getSimpleName());

backfillResult.getRelationships().add(backfillResultRelationship);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
});
});
}
}

return RestliUtils.toTask(() -> backfillResult);
}

/**
* An action method for emitting MAE backfill messages for a set of entities using SCSI.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ private RestliConstants() { }
public static final String ACTION_AUTOCOMPLETE = "autocomplete";
public static final String ACTION_BACKFILL = "backfill";
public static final String ACTION_BACKFILL_ENTITY_TABLES = "backfillEntityTables";
public static final String ACTION_BACKFILL_RELATIONSHIP_TABLES = "backfillRelationshipTables";
public static final String ACTION_BACKFILL_WITH_URNS = "backfillWithUrns";
public static final String ACTION_BACKFILL_WITH_NEW_VALUE = "backfillWithNewValue";
public static final String ACTION_BACKFILL_LEGACY = "backfillLegacy";
Expand Down
Loading