Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebalancing occurred for batch Kafka listener when non BatchListenerFailedException is thrown #3712

Open
user734879 opened this issue Jan 17, 2025 · 13 comments

Comments

@user734879
Copy link

user734879 commented Jan 17, 2025

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.3.1

Describe the bug

Intention was to process entire batch of messages as a single entity and retry it several times, then skip/recover entire batch if all retries exhausted.

Throwing BatchListenerFailedException is not applicable since

Documentation suggests throwing non BatchListenerFailedException and it works only if backoff time is less than max.poll.interval. When delays are longer it causes rebalancing since FallbackBatchErrorHandler sleeps for backoff time between poll operations. Configuring ContainerPausingBackOffHandler as a backoff error handler in order to handle long delays does not have impact.

To Reproduce

Use batch Kafka listener which throws non-BatchListenerFailedException to retry entire batch of events.

Expected behavior

ContainerPausingBackOffHandler is applied when non-BatchListenerFailedException error is thrown.

Sample

Configuration of ErrorHandler:

final ExponentialBackOff backOff = new ExponentialBackOff(A, B);
backOff.setMaxInterval(C); // longer than max.poll.interval
backOff.setMaxElapsedTime(X);
return new DefaultErrorHandler(null, backOff, new ContainerPausingBackOffHandler(new ListenerContainerPauseService(null, taskScheduler));
@sobychacko
Copy link
Contributor

@user734879 Have you observed that when the maxInterval is less than the max.poll.interval, it works as expected? In that case, this is something that we might have to look into the ContainerPausingBackOffHandler. Do you have a small sample where we can reproduce your scenario? Thanks!

@user734879
Copy link
Author

@sobychacko yes, it works as expected when maxInterval is less than max.poll.interval (at the same time ContainerPausingBackOffHandler is not invoked at all as far as I can see)

added sample project here https://github.com/user734879/spring-kafka-retry-test

@sobychacko
Copy link
Contributor

Thanks for the sample. Looks like this SO thread is related - https://stackoverflow.com/questions/79288249/containerpausingbackoffhandler-never-called
Same problem there.

I think what's happening is that even though you set the ContainerPausingBackOffHandler in your DefaultErrorHandler, see its constructor.
https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java#L106

super(recoverer, backOff, backOffHandler, createFallback(backOff, recoverer));

and that createFallback creates the FallbackBatchErrorHandler.

Here is what the super constructor looks like:

public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
			@Nullable BackOffHandler backOffHandler, CommonErrorHandler fallbackHandler) {

		super(recoverer, backOff, backOffHandler);
		this.fallbackBatchHandler = fallbackHandler;
	}

So, on a quick look, it doesn't seem like the FallbackBatchHandler considers the backoff handler when falling back to this handler.

See here also: https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java#L152

I don't see that the backoff handler is invoked in that path.

This maybe by design. When batch records are retried, you either need to throw a BatchListenerFailedException, but you said you had limitations with that approach. The other option is throwing non-BatchListenerFailedException, but then make sure that your backoff max interval is less than the max.poll.interval value. That way you avoid this rebalance. The fix may involve much more invasive changes. Do you have the option of increasing the max.poll.interval value?

@user734879
Copy link
Author

it doesn't seem like the FallbackBatchHandler considers the backoff handler when falling back to this handler.

Yes that's the behaviour I observed while debugging the issue..

Do you have the option of increasing the max.poll.interval value?

I don't think so, unfortunately. Increasing it will impact the detection of really died consumers, while we just want to achieve ability to have longer retries/longer delays between retries.

Is there any work around to tell listener "this is the last retry attempt - you can to not throw any exception and skip records itself", while for previous retries BatchListenerFailedException still will be thrown? Or telling BatchListenerFailedException to skip entire batch instead of particular record and commit offsets for all messages in result..

@user734879
Copy link
Author

@sobychacko is there anything else required from me as a reporter (based on the label added) to proceed with analysis whether it can be fixed and is there any W/A to skip entire batch after retries exhausted available please?

@sobychacko
Copy link
Contributor

@user734879 Not at this point. We will have to triage it further and see what's going on. Possibly after the 3.3.2 release, which is happening today.

@artembilan
Copy link
Member

The DefaultErrorHandler does this:

	private static CommonErrorHandler createFallback(BackOff backOff, @Nullable ConsumerRecordRecoverer recoverer) {
		return new FallbackBatchErrorHandler(backOff, recoverer);
	}

That FallbackBatchErrorHandler has this logic in its handleBatch():

ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
					this.seeker, this.recoverer, this.logger, getLogLevel(), this.retryListeners, getClassifier(),
					this.reclassifyOnExceptionChange);

That one, in turn:

		Set<TopicPartition> assignment = consumer.assignment();
		consumer.pause(assignment);

So, even if a ContainerPausingBackOffHandler is not involved, there is a pause/resume logic in the batch retry algorithm.
Therefore it is not clear how rebalance may happen.
Will look into your sample to reproduce a bit later.

Thank you!

@user734879
Copy link
Author

So, even if a ContainerPausingBackOffHandler is not involved, there is a pause/resume logic in the batch retry algorithm.
Therefore it is not clear how rebalance may happen.

@artembilan the pause/resume itself is not enough - it should also do some poll operations in parallel to be considered alive (and receive 0 records since consumer is on pause).
But it does only one poll operation and then makes the same execution thread to sleep for the backoff time (delay between retries) which is already greater than max.poll.interval (which is happenning here org.springframework.kafka.listener.ListenerUtils#conditionalSleep). As result poll operations are not executed, and consumer is kicked out of the group

@artembilan
Copy link
Member

Got it!
So, we do agree that pause happens anyway.
Only the problem that we don't poll() within that batch retry loop.
Is that correct?

@user734879
Copy link
Author

@artembilan yes, correct.
I believe ContainerPausingBackOffHandler does not use Thread.sleep and poll operations are continued while consumer is paused.

@artembilan
Copy link
Member

(delay between retries) which is already greater than max.poll.interval.

I don't think this is OK, since retry here is really blocking.
So, we definitely must sleep not more than max.poll.interval to be able to loop back to that consumer.poll(Duration.ZERO);.

@user734879
Copy link
Author

@artembilan sorry, two questions then

  • does this mean that ContainerPausingBackOffHandler allows to have non-blocking retries? as it solves exactly the problem with having delays longer than max.poll.interval
  • why the same retry logic can't be used for batch listener which throws BatchListenerFailedException and another one? (apologies, not aware of technical complexity to support that but it sounds reasonable to me to have the same logic regardless of whether entire batch or only one record from the batch will be skipped)

@artembilan
Copy link
Member

Right. We have same doubts.
We will investigate how ContainerPausingBackOffHandler works in records mode and come back to you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants