Skip to content

Commit

Permalink
kafka fixup when caching only if the coordinator is the source of the…
Browse files Browse the repository at this point in the history
… event
  • Loading branch information
Vincent Royer committed May 25, 2021
1 parent 67be4ad commit 5678d87
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,15 @@ public List<SourceRecord> poll() {
pk,
UUID.fromString(nodeId),
new ArrayList<>(consistencyLevels),
getSelectStatement()
getSelectStatement(),
md5Digest
);
Object value = null;
if (tuple._1 != null) {
value = cassandraConverterAndStatementFinal.getConverter().buildStruct(tuple._1);
}
log.debug("Record partition={} key={} value={}", consumerRecord.partition(), mutationKey, value);
log.debug("Record partition={} mutationNodeId={} coordinatorId={} md5Digest={} key={} value={}",
consumerRecord.partition(), nodeId, tuple._3, md5Digest, mutationKey, value);
SourceRecord sourceRecord = new SourceRecord(
ImmutableMap.of(),
ImmutableMap.of(),
Expand All @@ -292,7 +294,7 @@ public List<SourceRecord> poll() {
cassandraConverterAndStatementFinal.getConverter().getSchema(),
value);
sourceRecords.add(sourceRecord);
if (tuple._3 != null && tuple._3.equals(md5Digest)) {
if (tuple._3 != null && tuple._3.equals(UUID.fromString(nodeId))) {
// cache the mutation digest if the coordinator is the source of this event.
mutationCache.addMutationMd5(mutationKey, md5Digest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ public Record<GenericRecord> read() throws Exception {
pk,
mutationValue.getNodeId(),
Lists.newArrayList(ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.LOCAL_ONE),
getSelectStatement());
getSelectStatement(),
mutationValue.getMd5Digest());

Object value = tuple._1 == null ? null : converterAndQueryFinal.getConverter().toConnectData(tuple._1);
KeyValue<Object, Object> keyValue = new KeyValue(mutationKey, value);
Expand Down
20 changes: 11 additions & 9 deletions source/src/main/java/com/datastax/oss/cdc/CassandraClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,32 +264,34 @@ public Tuple2<KeyspaceMetadata, TableMetadata> getTableMetadata(String keyspace,
}

public Tuple3<Row, ConsistencyLevel, UUID> selectRow(List<Object> pkValues,
UUID nodeId,
List<ConsistencyLevel> consistencyLevels,
PreparedStatement preparedStatement)
UUID nodeId,
List<ConsistencyLevel> consistencyLevels,
PreparedStatement preparedStatement,
String md5Digest)
throws ExecutionException, InterruptedException {
return selectRowAsync(pkValues, nodeId, consistencyLevels, preparedStatement)
return selectRowAsync(pkValues, nodeId, consistencyLevels, preparedStatement, md5Digest)
.toCompletableFuture().get();
}

/**
* Try to read CL=ALL (could be LOCAL_ALL), retry LOCAL_QUORUM, retry LOCAL_ONE.
*/
public CompletionStage<Tuple3<Row, ConsistencyLevel, UUID>> selectRowAsync(List<Object> pkValues,
UUID nodeId,
List<ConsistencyLevel> consistencyLevels,
PreparedStatement preparedStatement) {
UUID nodeId,
List<ConsistencyLevel> consistencyLevels,
PreparedStatement preparedStatement,
String md5Digest) {
BoundStatement statement = preparedStatement.bind(pkValues.toArray(new Object[pkValues.size()]));

// set the coordinator node
Node node = null;
if (nodeId != null) {
node = cqlSession.getMetadata().getNodes().get(nodeId);
if (node != null) {
statement.setNode(node);
statement = statement.setNode(node);
}
}
log.info("Fetching query={} pk={} coordinator={}", preparedStatement.getQuery(), pkValues, node);
log.debug("Fetching md5Digest={} coordinator={} query={} pk={} ", md5Digest, node, preparedStatement.getQuery(), pkValues);

return executeWithDowngradeConsistencyRetry(cqlSession, statement, consistencyLevels)
.thenApply(tuple -> {
Expand Down

0 comments on commit 5678d87

Please sign in to comment.