Skip to content

Commit

Permalink
[LI-HOTFIX] Catch InternalError in Fetcher Thread (#476)
Browse files Browse the repository at this point in the history
TICKET = LIKAFKA-54326
LI_DESCRIPTION =
Change FetcherEventThread to catch InternalError in addition to Exceptions to prevent the thread terminating when InternalError is thrown. The thread can continue processing other events. FetcherEventThread is responsible to complete Futures associated with events including RemovePartitions. In previous GCNs that were caused by disk corruptions, InternalError (unsafe memory access) was thrown due to accessing memory-mapped log files when processing TruncateAndFetch events. Without catching this InternalError, the fetcher thread is terminated, which causes handleLogDirFailure being stuck waiting future.get() for futures associated with unprocessed RemovePartitions events. We catch InternalError here to prevent the thread terminating right away and let handleLogDirFailure handle broker shutdown if necessary. There have been no issues other than disk failure caused InternalError so far.

EXIT_CRITERIA = N/A
  • Loading branch information
hshi2022 authored Aug 31, 2023
1 parent cc778ba commit 3475c3a
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 4 deletions.
14 changes: 11 additions & 3 deletions core/src/main/scala/kafka/server/FetcherEventManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
package kafka.server

import java.util.concurrent.TimeUnit

import com.yammer.metrics.core.Gauge
import kafka.cluster.BrokerEndPoint
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.utils.ShutdownableThread
import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.internals.{FatalExitError, KafkaFutureImpl}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{KafkaFuture, TopicPartition}

Expand Down Expand Up @@ -169,7 +168,16 @@ class FetcherEventManager(name: String,
processor.process(fetcherEvent)
}
} catch {
case e: Exception => error(s"Uncaught error processing event $fetcherEvent", e)
case e@(_: Exception |
// Catches InternalError here. In previous GCNs that were caused by disk corruptions,
// InternalError (unsafe memory access) was thrown here due to accessing memory-mapped log files when processing
// TruncateAndFetch events.
// Without catching this InternalError, this thread is terminated, which causes handleLogDirFailure being stuck
// waiting future.get() for futures associated with unprocessed RemovePartitions events.
// We catch InternalError here to prevent the thread terminating right away and let handleLogDirFailure
// handle broker shutdown if necessary. There have been no issues other than disk failure caused InternalError so far.
_: InternalError) =>
error(s"Uncaught error processing event $fetcherEvent", e)
}

_state = FetcherState.Idle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package integration.kafka.server

import kafka.cluster.BrokerEndPoint
import kafka.server._
import kafka.utils.TestUtils
import org.apache.kafka.common.utils.Time
import org.easymock.EasyMock.{createMock, expect, replay, verify}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test


Expand Down Expand Up @@ -98,5 +99,108 @@ class FetcherEventManagerTest {
fetcherEventManager.close()
}

@Test
def testEventExecutionThrowsFatalError(): Unit = {
val time = Time.SYSTEM
val fetcherEventBus = new FetcherEventBus(time)
@volatile var truncateAndFetchProcessed = 0
val processor : FetcherEventProcessor = new FetcherEventProcessor {
override def process(event: FetcherEvent): Unit = {
event match {
case TruncateAndFetch =>
truncateAndFetchProcessed += 1
throw new StackOverflowError()
}

}

override def fetcherStats: FetcherStats = ???

override def fetcherLagStats: FetcherLagStats = ???

override def sourceBroker: BrokerEndPoint = ???

override def close(): Unit = {}
}

val fetcherEventManager = new FetcherEventManager("thread-1", fetcherEventBus, processor, time)
fetcherEventManager.start()
// TruncateAndFetch should run once and then the thread is stopped.
TestUtils.waitUntilTrue(() => {
truncateAndFetchProcessed == 1
},
s"No TruncateAndFetch event is processed.", 5000)

TestUtils.waitUntilTrue(() => {
fetcherEventManager.isThreadFailed
},
s"The thread is not failed.", 5000)
fetcherEventManager.close()
}

@Test
def testEventExecutionThrowsJavaInternalError(): Unit = {
val time = Time.SYSTEM
val fetcherEventBus = new FetcherEventBus(time)

@volatile var addPartitionsProcessed = 0
@volatile var removePartitionsProcessed = 0
@volatile var getPartitionsProcessed = 0
@volatile var truncateAndFetchProcessed = 0
val processor : FetcherEventProcessor = new FetcherEventProcessor {
override def process(event: FetcherEvent): Unit = {
event match {
case AddPartitions(initialFetchStates, future) =>
addPartitionsProcessed += 1
future.complete(null)
case RemovePartitions(topicPartitions, future) =>
removePartitionsProcessed += 1
future.complete(null)
case GetPartitionCount(future) =>
getPartitionsProcessed += 1
future.complete(1)
case TruncateAndFetch =>
truncateAndFetchProcessed += 1
// InternalError should not terminate the thread.
throw new InternalError()
case GetPartitionState(_, future) =>
// ignore
}

}

override def fetcherStats: FetcherStats = ???

override def fetcherLagStats: FetcherLagStats = ???

override def sourceBroker: BrokerEndPoint = ???

override def close(): Unit = {}
}

val fetcherEventManager = new FetcherEventManager("thread-1", fetcherEventBus, processor, time)
fetcherEventManager.start()
val addPartitionsFuture = fetcherEventManager.addPartitions(Map.empty)
val removePartitionsFuture = fetcherEventManager.removePartitions(Set.empty)
val getPartitionCountFuture = fetcherEventManager.getPartitionsCount()

addPartitionsFuture.get()
removePartitionsFuture.get()
getPartitionCountFuture.get()

// The thread should not be terminated.
TestUtils.waitUntilTrue(() => {
truncateAndFetchProcessed == 1
},
s"No TruncateAndFetch event is processed.", 5000)
assertFalse(fetcherEventManager.isThreadFailed)

assertEquals(1, addPartitionsProcessed)
assertEquals(1, removePartitionsProcessed)
assertEquals(1, getPartitionsProcessed)

fetcherEventManager.close()
}

}

0 comments on commit 3475c3a

Please sign in to comment.