diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index f2102346a1661..b60ae433ae038 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -1233,8 +1233,10 @@ object GroupMetadataManager { val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V1).asInstanceOf[String] val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long] val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long] - - OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp) + if (expireTimestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP) + OffsetAndMetadata(offset, metadata, commitTimestamp) + else + OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp) } else if (version == 2) { val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long] val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String] diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 3e8a535946814..ab288af28bd29 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.api.{GroupCoordinatorRequest, OffsetCommitRequest, OffsetFetchRequest} +import kafka.api._ import kafka.consumer.SimpleConsumer import kafka.common.{OffsetAndMetadata, OffsetMetadata, OffsetMetadataAndError, TopicAndPartition} import kafka.utils._ @@ -30,6 +30,8 @@ import org.junit.Assert._ import java.util.Properties import java.io.File +import org.apache.kafka.common.requests.ApiVersionsResponse + import scala.util.Random import scala.collection._ @@ -46,12 +48,16 @@ class OffsetCommitTest extends ZooKeeperTestHarness { @Before override def setUp() { super.setUp() - val config: Properties = createBrokerConfig(1, zkConnect, enableDeleteTopic = true) - config.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") - config.setProperty(KafkaConfig.OffsetsRetentionCheckIntervalMsProp, retentionCheckInterval.toString) - val logDirPath = config.getProperty("log.dir") - logDir = new File(logDirPath) - server = TestUtils.createServer(KafkaConfig.fromProps(config), Time.SYSTEM) + + } + + @After + override def tearDown() { + + super.tearDown() + } + + def StartConsumer(): Unit = { simpleConsumer = new SimpleConsumer("localhost", TestUtils.boundPort(server), 1000000, 64*1024, "test-client") val consumerMetadataRequest = GroupCoordinatorRequest(group) Stream.continually { @@ -63,15 +69,26 @@ class OffsetCommitTest extends ZooKeeperTestHarness { }) } - @After - override def tearDown() { + def StartServerAndConsumer(protocolVersion: ApiVersion = ApiVersion.latestVersion): Unit = { + val config: Properties = createBrokerConfig(1, zkConnect, enableDeleteTopic = true) + config.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") + config.setProperty(KafkaConfig.OffsetsRetentionCheckIntervalMsProp, retentionCheckInterval.toString) + config.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, protocolVersion.toString) + val logDirPath = config.getProperty("log.dir") + logDir = new File(logDirPath) + server = TestUtils.createServer(KafkaConfig.fromProps(config), Time.SYSTEM) + StartConsumer() + } + + def ShutdownServerAndConsumer(): Unit = { simpleConsumer.close TestUtils.shutdownServers(Seq(server)) - super.tearDown() } @Test def testUpdateOffsets() { + StartServerAndConsumer() + val topic = "topic" // Commit an offset @@ -117,10 +134,14 @@ class OffsetCommitTest extends ZooKeeperTestHarness { assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse2.requestInfo.get(unknownTopicAndPartition).get) assertEquals(1, fetchResponse2.requestInfo.size) + + ShutdownServerAndConsumer() } @Test def testCommitAndFetchOffsets() { + StartServerAndConsumer() + val topic1 = "topic-1" val topic2 = "topic-2" val topic3 = "topic-3" @@ -187,10 +208,14 @@ class OffsetCommitTest extends ZooKeeperTestHarness { assertEquals(OffsetMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset) assertEquals(OffsetMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset) assertEquals(OffsetMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.offset) + + ShutdownServerAndConsumer() } @Test def testLargeMetadataPayload() { + StartServerAndConsumer() + val topicAndPartition = TopicAndPartition("large-metadata", 0) val expectedReplicaAssignment = Map(0 -> List(1)) createTopic(zkClient, topicAndPartition.topic, partitionReplicaAssignment = expectedReplicaAssignment, @@ -211,10 +236,14 @@ class OffsetCommitTest extends ZooKeeperTestHarness { val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1) assertEquals(Errors.OFFSET_METADATA_TOO_LARGE, commitResponse1.commitStatus.get(topicAndPartition).get) + + ShutdownServerAndConsumer() } @Test def testOffsetExpiration() { + StartServerAndConsumer() + // set up topic partition val topic = "topic" val topicPartition = TopicAndPartition(topic, 0) @@ -284,10 +313,43 @@ class OffsetCommitTest extends ZooKeeperTestHarness { Thread.sleep(retentionCheckInterval * 2) assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) + ShutdownServerAndConsumer() + } + + // Verify whether the committed offset is still there after bouncing a broker + // This test case reproduces the issue in KAFKA-8069 + @Test + def testOffsetExpirationInOldBrokerVersion() { + StartServerAndConsumer(KAFKA_1_1_IV0) + + // set up topic partition + val topic = "topic" + val topicPartition = TopicAndPartition(topic, 0) + createTopic(zkClient, topic, servers = Seq(server), numPartitions = 1) + + val fetchRequest = OffsetFetchRequest(group, Seq(TopicAndPartition(topic, 0))) + + val commitRequest = OffsetCommitRequest( + groupId = "test-group", + requestInfo = immutable.Map(TopicAndPartition(topic, 0) -> OffsetAndMetadata(5L, "metadata")), + versionId = 2) + assertEquals(Errors.NONE, simpleConsumer.commitOffsets(commitRequest).commitStatus.get(topicPartition).get) + + simpleConsumer.close() + server.shutdown() + server.startup() + StartConsumer() + + Thread.sleep(retentionCheckInterval * 2) + assertEquals(5L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) + + ShutdownServerAndConsumer() } @Test def testNonExistingTopicOffsetCommit() { + StartServerAndConsumer() + val topic1 = "topicDoesNotExists" val topic2 = "topic-2" @@ -302,10 +364,14 @@ class OffsetCommitTest extends ZooKeeperTestHarness { assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get) assertEquals(Errors.NONE, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get) + + ShutdownServerAndConsumer() } @Test def testOffsetsDeleteAfterTopicDeletion() { + StartServerAndConsumer() + // set up topic partition val topic = "topic" val topicPartition = TopicAndPartition(topic, 0) @@ -326,6 +392,8 @@ class OffsetCommitTest extends ZooKeeperTestHarness { val offsetMetadataAndErrorMap = simpleConsumer.fetchOffsets(fetchRequest) val offsetMetadataAndError = offsetMetadataAndErrorMap.requestInfo(topicPartition) assertEquals(OffsetMetadataAndError.NoOffset, offsetMetadataAndError) + + ShutdownServerAndConsumer() } }