Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: update backfillMAE API signature #330

Merged
merged 2 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import com.linkedin.metadata.dao.exception.InvalidMetadataType;
import com.linkedin.metadata.events.IngestionMode;
import com.linkedin.metadata.restli.dao.LocalDaoRegistry;
import com.linkedin.parseq.Task;
import com.linkedin.restli.server.annotations.Action;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -45,50 +47,51 @@ public abstract class BaseEntityAgnosticResource {
* Backfill MAE for the given {@link BackfillItem} and the ingestion mode. Only registered and present aspects
* in database table will be backfilled.
*
* @param backfillRequests a list of {@link BackfillItem} to be backfilled. Empty aspect list means backfill all aspects.
* @param backfillRequests an array of {@link BackfillItem} to be backfilled. Empty aspect list means backfill all aspects.
* @param ingestionMode {@link IngestionMode} to indicate the processing strategy. Live mode together with no-change
* should represent no-op, empty map will be returned. Backfill is to redo
* any metadata update that is missed or skipped in the past.
* Bootstrap indicates building the metadata from scratch.
* @return a list of {@link BackfillItem} that is backfilled, failed urns and aspects will be filtered out
* @return an array of {@link BackfillItem} that is backfilled, failed urns and aspects will be filtered out
*/
@Action(name = ACTION_BACKFILL_MAE)
@Nonnull
public List<BackfillItem> backfillMAE(@Nonnull List<BackfillItem> backfillRequests, @Nonnull IngestionMode ingestionMode) {
final List<BackfillItem> backfillResults = new ArrayList<>();
final BackfillMode backfillMode = ALLOWED_INGESTION_BACKFILL_BIMAP.get(ingestionMode);
if (backfillMode == null) {
return backfillResults;
}

// Group requests by entity type
final Map<String, List<BackfillItem>> entityTypeToRequestsMap = new HashMap<>();
backfillRequests.forEach(request -> {
try {
final String entityType = Urn.createFromString(request.getUrn()).getEntityType();
entityTypeToRequestsMap.computeIfAbsent(entityType, k -> new ArrayList<>()).add(request);
} catch (URISyntaxException e) {
log.warn("Failed casting string to Urn, request: " + request, e);
public Task<BackfillItem[]> backfillMAE(@Nonnull BackfillItem[] backfillRequests, @Nonnull IngestionMode ingestionMode) {
return RestliUtils.toTask(() -> {
final List<BackfillItem> backfillRequestList = Arrays.asList(backfillRequests);
final BackfillMode backfillMode = ALLOWED_INGESTION_BACKFILL_BIMAP.get(ingestionMode);
if (backfillMode == null) {
return new BackfillItem[0];
}
});

// for each entity type, backfill MAE for each urn in parallel
for (String entityType : entityTypeToRequestsMap.keySet()) {
final Optional<BaseLocalDAO<? extends UnionTemplate, ? extends Urn>> dao = getLocalDaoByEntity(entityType);
if (!dao.isPresent()) {
log.warn("LocalDAO not found for entity type: " + entityType);
continue;
// Group requests by entity type
final List<BackfillItem> backfillResults = new ArrayList<>();
final Map<String, List<BackfillItem>> entityTypeToRequestsMap = new HashMap<>();
backfillRequestList.forEach(request -> {
try {
final String entityType = Urn.createFromString(request.getUrn()).getEntityType();
entityTypeToRequestsMap.computeIfAbsent(entityType, k -> new ArrayList<>()).add(request);
} catch (URISyntaxException e) {
log.warn("Failed casting string to Urn, request: " + request, e);
}
});

// for each entity type, backfill MAE for each urn in parallel
for (String entityType : entityTypeToRequestsMap.keySet()) {
final Optional<BaseLocalDAO<? extends UnionTemplate, ? extends Urn>> dao = getLocalDaoByEntity(entityType);
if (!dao.isPresent()) {
log.warn("LocalDAO not found for entity type: " + entityType);
continue;
}
final List<BackfillItem> items = entityTypeToRequestsMap.get(entityType);
backfillResults.addAll(items.parallelStream()
// immutable dao, should be thread-safe
.map(item -> backfillMAEForUrn(item.getUrn(), item.getAspects(), backfillMode, dao.get()).orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toList()));
}
final List<BackfillItem> items = entityTypeToRequestsMap.get(entityType);
backfillResults.addAll(
items.parallelStream()
// immutable dao, should be thread-safe
.map(item -> backfillMAEForUrn(item.getUrn(), item.getAspects(), backfillMode, dao.get()).orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toList())
);
}
return backfillResults; // insert order is not guaranteed the same as input
return backfillResults.toArray(new BackfillItem[0]); // insert order is not guaranteed the same as input
});
}

protected Optional<BackfillItem> backfillMAEForUrn(@Nonnull String urn, @Nonnull List<String> aspectSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
import com.linkedin.testing.EntityAspectUnion;
import com.linkedin.testing.urn.BarUrn;
import com.linkedin.testing.urn.FooUrn;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -72,12 +72,12 @@ public void testBackfillMAESpecificAspectSuccess() {
.thenReturn(ImmutableMap.of(urn, singleAspectSet));
}

List<BackfillItem> result = testResource.backfillMAE(provideBackfillItems(fooUrnSet, multiAspectsSet), IngestionMode.BACKFILL);
BackfillItem[] result = runAndWait(testResource.backfillMAE(provideBackfillItems(fooUrnSet, multiAspectsSet), IngestionMode.BACKFILL));
for (String urn : fooUrnSet) {
verify(_fooLocalDAO, times(1)).backfillMAE(BackfillMode.BACKFILL_INCLUDING_LIVE_INDEX,
multiAspectsSet, Collections.singleton(urn));
}
assertEquals(ImmutableSet.of(result), ImmutableSet.of(provideBackfillItems(fooUrnSet, singleAspectSet)));
assertEqualBackfillItemArrays(result, provideBackfillItems(fooUrnSet, singleAspectSet));
}

@Test
Expand All @@ -88,12 +88,12 @@ public void testBackfillMAENullAspectSuccess() {
.thenReturn(ImmutableMap.of(urn, multiAspectsSet));
}

List<BackfillItem> result = testResource.backfillMAE(provideBackfillItems(fooUrnSet, null), IngestionMode.BACKFILL);
BackfillItem[] result = runAndWait(testResource.backfillMAE(provideBackfillItems(fooUrnSet, null), IngestionMode.BACKFILL));
for (String urn : fooUrnSet) {
verify(_fooLocalDAO, times(1)).backfillMAE(BackfillMode.BACKFILL_INCLUDING_LIVE_INDEX,
null, Collections.singleton(urn));
}
assertEquals(ImmutableSet.of(result), ImmutableSet.of(provideBackfillItems(fooUrnSet, multiAspectsSet)));
assertEqualBackfillItemArrays(result, provideBackfillItems(fooUrnSet, multiAspectsSet));
}

@Test
Expand All @@ -114,7 +114,7 @@ public void testBackfillMAEMultiEntitiesSuccess() {
allUrnSet.addAll(fooUrnSet);

TestResource testResource = new TestResource();
List<BackfillItem> result = testResource.backfillMAE(provideBackfillItems(allUrnSet, null), IngestionMode.BACKFILL);
BackfillItem[] result = runAndWait(testResource.backfillMAE(provideBackfillItems(allUrnSet, null), IngestionMode.BACKFILL));

// verify all aspects are backfilled for each urn
for (String urn : fooUrnSet) {
Expand All @@ -125,9 +125,8 @@ public void testBackfillMAEMultiEntitiesSuccess() {
verify(_barLocalDAO, times(1)).backfillMAE(BackfillMode.BACKFILL_INCLUDING_LIVE_INDEX,
null, Collections.singleton(urn));
}
List<BackfillItem> expectedItems = provideBackfillItems(allUrnSet, multiAspectsSet);
assertEquals(result.size(), expectedItems.size());
assertTrue(result.containsAll(expectedItems));
BackfillItem[] expectedItems = provideBackfillItems(allUrnSet, multiAspectsSet);
assertEqualBackfillItemArrays(result, expectedItems);
verify(_fooLocalDAO, times(1)).getUrnClass();
verify(_barLocalDAO, times(1)).getUrnClass();
verifyNoMoreInteractions(_fooLocalDAO);
Expand All @@ -139,8 +138,8 @@ public void testBackfillMAEEmptyBackfillResult() {
TestResource testResource = new TestResource();
// no mockito stubbing, so dao.backfillMAE will return null
assertEquals(
testResource.backfillMAE(provideBackfillItems(fooUrnSet, null), IngestionMode.BACKFILL),
Collections.emptyList()
runAndWait(testResource.backfillMAE(provideBackfillItems(fooUrnSet, null), IngestionMode.BACKFILL)),
new BackfillItem[0]
);
verify(_fooLocalDAO, times(3)).backfillMAE(any(), any(), any());
}
Expand All @@ -150,8 +149,8 @@ public void testBackfillMAENoSuchEntity() {
TestResource testResource = new TestResource();
Set<String> badUrnSet = ImmutableSet.of(makeBazUrn(1).toString(), makeBazUrn(2).toString(), makeBazUrn(3).toString());
assertEquals(
testResource.backfillMAE(provideBackfillItems(badUrnSet, null), IngestionMode.BACKFILL),
Collections.emptyList()
runAndWait(testResource.backfillMAE(provideBackfillItems(badUrnSet, null), IngestionMode.BACKFILL)),
new BackfillItem[0]
);
verify(_fooLocalDAO, times(0)).backfillMAE(any(), any(), any());
verify(_barLocalDAO, times(0)).backfillMAE(any(), any(), any());
Expand All @@ -161,8 +160,8 @@ public void testBackfillMAENoSuchEntity() {
public void testBackfillMAENoopMode() {
TestResource testResource = new TestResource();
assertEquals(
testResource.backfillMAE(provideBackfillItems(fooUrnSet, null), IngestionMode.LIVE),
Collections.emptyList()
runAndWait(testResource.backfillMAE(provideBackfillItems(fooUrnSet, null), IngestionMode.LIVE)),
new BackfillItem[0]
);
verify(_fooLocalDAO, times(0)).backfillMAE(any(), any(), any());
}
Expand All @@ -177,25 +176,31 @@ public void testBackfillMAEException() {
doThrow(IllegalArgumentException.class).when(_fooLocalDAO).backfillMAE(BackfillMode.BACKFILL_INCLUDING_LIVE_INDEX, multiAspectsSet,
Collections.singleton(makeFooUrn(1).toString()));

List<BackfillItem> result = testResource.backfillMAE(provideBackfillItems(fooUrnSet, multiAspectsSet), IngestionMode.BACKFILL);
BackfillItem[] result = runAndWait(testResource.backfillMAE(provideBackfillItems(fooUrnSet, multiAspectsSet), IngestionMode.BACKFILL));
for (String urn : fooUrnSet) {
verify(_fooLocalDAO, times(1)).backfillMAE(BackfillMode.BACKFILL_INCLUDING_LIVE_INDEX,
multiAspectsSet, Collections.singleton(urn));
}
List<BackfillItem> expectedItems =
BackfillItem[] expectedItems =
provideBackfillItems(ImmutableSet.of(makeFooUrn(2).toString(), makeFooUrn(3).toString()), multiAspectsSet);
assertEquals(result.size(), expectedItems.size());
assertTrue(result.containsAll(expectedItems));
assertEqualBackfillItemArrays(result, expectedItems);
}

private List<BackfillItem> provideBackfillItems(Set<String> urnSet, Set<String> aspects) {
private BackfillItem[] provideBackfillItems(Set<String> urnSet, Set<String> aspects) {
return urnSet.stream().map(urn -> {
BackfillItem item = new BackfillItem();
item.setUrn(urn);
if (aspects != null) {
item.setAspects(new StringArray(aspects));
}
return item;
}).collect(Collectors.toList());
}).toArray(BackfillItem[]::new);
}

private void assertEqualBackfillItemArrays(BackfillItem[] actual, BackfillItem[] expected) {
List<BackfillItem> expectedList = Arrays.asList(expected);
List<BackfillItem> actualList = Arrays.asList(actual);
assertEquals(actualList.size(), expectedList.size());
assertTrue(actualList.containsAll(expectedList));
}
}
Loading