-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[da-vinci][dvc] Dropping unassigned partitions #1332
base: main
Are you sure you want to change the base?
Conversation
clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java
Outdated
Show resolved
Hide resolved
clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java
Outdated
Show resolved
Hide resolved
2055b04
to
52a557c
Compare
b94b49a
to
262bbc6
Compare
…iption. Enabled config flag over bootstrap subscription
…iption. Enabled config flag over bootstrap subscription
clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java
Outdated
Show resolved
Hide resolved
a163e56
to
9dbffc9
Compare
…cally. Integration Test [In Writing]
9dbffc9
to
fd9fa02
Compare
…cally. Integration Test [In Writing]
...ice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java
Outdated
Show resolved
Hide resolved
…cally. Integration Test [In Writing]
…cally. Integration Test [In Writing]
clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java
Show resolved
Hide resolved
StoreBackend storeBackend = getStoreOrThrow(storeName); | ||
storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version)); | ||
}); | ||
if (configLoader.getCombinedProperties().getBoolean(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a good start, but do we need to do clean up? @sixpluszero @kvargha ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we add a method to the interface which allows users to clean up unsubscribed state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gaojie proposed that we keep the state on disk to play it on the safe side, as with a new version it will automatically get cleaned up.
...ice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java
Outdated
Show resolved
Hide resolved
...ice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java
Show resolved
Hide resolved
…cally. Integration Test [In Writing]
// Test automatic new version ingestion | ||
for (int i = 0; i < 2; ++i) { | ||
// Test per-version partitioning parameters | ||
int partitionCount = i + 1; | ||
String iString = String.valueOf(i); | ||
cluster.useControllerClient(controllerClient -> { | ||
ControllerResponse response = controllerClient.updateStore( | ||
storeName1, | ||
new UpdateStoreQueryParams().setPartitionerClass(ConstantVenicePartitioner.class.getName()) | ||
.setPartitionCount(partitionCount) | ||
.setPartitionerParams( | ||
Collections.singletonMap(ConstantVenicePartitioner.CONSTANT_PARTITION, iString))); | ||
assertFalse(response.isError(), response.getError()); | ||
}); | ||
|
||
Integer expectedValue = cluster.createVersion(storeName1, KEY_COUNT); | ||
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> { | ||
for (int k = 0; k < KEY_COUNT; ++k) { | ||
Object readValue = client1.get(k).get(); | ||
assertEquals(readValue, expectedValue); | ||
} | ||
}); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this for this test. We can focus on one version to determine whether or not we're not automatically subscribing to partitions already on disk.
client1.subscribe(partitions); | ||
client1.unsubscribeAll(); | ||
|
||
DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend(); | ||
if (daVinciBackend != null) { | ||
StoreBackend storeBackend = daVinciBackend.getStoreOrThrow(storeName1); | ||
ComplementSet<Integer> subscription = storeBackend.getSubscription(); | ||
assertTrue(subscription.isEmpty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What we can do here instead is we can stick with the default partition count of 3 instead of modifying it, and in the new client we only subscribe to 2 partitions and validate that subscription
only has 2 in the ComplementSet
.
…ally. Integration Test [In Writing]
Summary, imperative, start upper case, don't end with a period
The objective is to remove unassigned partitions.
Applying this issue to Da Vinci Client which has information over partitions in storage.
This PR is an extension of the PR #1196 [https://github.com//pull/1196] that checks which partitions should be kept [in
StorageService
] and applies the check inVeniceServer
.Resolves #650
How was this PR tested?
A corresponding test will be written.
Does this PR introduce any user-facing changes?