Skip to content

Commit

Permalink
Always set lastCommittedPartitionsAndOffsets in scala consumer (#26)
Browse files Browse the repository at this point in the history
Always set lastCommittedPartitionsAndOffsets in old ZookeeperConsumerConnector.

Currently lastCommittedPartitionsAndOffsets is only set when offsetsToCommit is not empty and commit succeed, this causes problems during consumer migration when commit fails or have nothing to commit, since during migration, consumer will first try to get lastCommittedPartitionsAndOffsets from zk, then commit them to kafka. If lastCommittedPartitionsAndOffsets is not updated when commit fails or nothing to commit, it will keep getting the old committed offsets, thus can overwrite correct offsets.
  • Loading branch information
kehuum authored Jun 12, 2019
1 parent 295b582 commit b82dc87
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
trace("OffsetMap: %s".format(offsetsToCommit))
var retriesRemaining = 1 + (if (isAutoCommit) 0 else config.offsetsCommitMaxRetries) // no retries for commits from auto-commit
var done = false
var committed = false;
while (!done) {
val committed = offsetsChannelLock synchronized {
committed = offsetsChannelLock synchronized {
// committed when we receive either no error codes or only MetadataTooLarge errors
if (offsetsToCommit.size > 0) {
if (config.offsetsStorage == "zookeeper") {
offsetsToCommit.foreach { case (topicAndPartition, offsetAndMetadata) =>
commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset)
}
lastCommittedPartitionsAndOffsets = offsetsToCommit
true
} else {
val offsetCommitRequest = OffsetCommitRequest(config.groupId, offsetsToCommit, clientId = config.clientId)
Expand All @@ -368,7 +368,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
if (error == Errors.NONE && config.dualCommitEnabled) {
val offset = offsetsToCommit(topicPartition).offset
commitOffsetToZooKeeper(topicPartition, offset)
lastCommittedPartitionsAndOffsets = offsetsToCommit
}

(folded._1 || // update commitFailed
Expand Down Expand Up @@ -421,6 +420,20 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
Thread.sleep(config.offsetsChannelBackoffMs)
}
}

// if committed offsets successfully, update lastCommittedPartitionsAndOffsets;
// otherwise commit failed, set lastCommittedPartitionsAndOffsets to null so that
// during consumer migration, if commit to zk failed, it won't get the previous committed
// offsets and try to commit that to kafka, which is a race condition where two consumers in
// the same consumer group can have discrepancy in committing offsets, i.e. consumer A first
// commits offsets successfully, then rebalance happens and A releases some of its partitions to B,
// then B commits offsets to both zk and kafka successfully, then A commit failed, it will fetch its previously
// committed offsets and commit to kafka again, which overwrites the correct offsets committed by B
// for the partitions that moved to B
if (committed)
lastCommittedPartitionsAndOffsets = offsetsToCommit
else
lastCommittedPartitionsAndOffsets = null
}

def getLastCommittedPartitionsAndOffsets = lastCommittedPartitionsAndOffsets
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ class LogManager(logDirs: Seq[File],
cleaner.updateCheckpoints(removedLog.dir.getParentFile)
}
removedLog.renameDir(Log.logDeleteDirName(topicPartition))
checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile)
checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile, false)
checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
addLogToBeDeleted(removedLog)
info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
Expand Down

0 comments on commit b82dc87

Please sign in to comment.