Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure if message goes to DLQ after retries conditionally, based on MessageHeaders and thrown Exception #3700

Open
santunioni opened this issue Jan 8, 2025 · 5 comments

Comments

@santunioni
Copy link

santunioni commented Jan 8, 2025

Expected Behavior

I should be able to decide if message goes to DLT based on MessageHeaders predicate. I want to provide classes that implement something like the interface

@FunctionalInterface
public interface DltStrategy {
	boolean shouldSendToDLTAfterRetries(MessageHeaders headers, Exception exception);
}

Current Behavior

I can only configure the DLT decisions as static:

  • NO_DLT
  • ALWAYS_RETRY_ON_ERROR
  • FAIL_ON_ERROR

Context

We do heavy testing on production here. In my case, I don't want messages with header isTest=true to go to DLT, because once in a while manual testing produce invalid messages, that we can't process.

Note I want messages to be retryed, but be discarded after, not going to DLTs. I am not aware of any workaround.

@sobychacko
Copy link
Contributor

@santunioni The DltStrategy you mention is an enum provided by the framework that is expected to be used with the @RetrableTopic for non-blocking retries. I want to ensure that this is the case and that you use non-blocking retries. In that case, have you looked into using a DltHandler method and tried to inject the @Header to retrieve the header value and make the DLT decision?

See these: https://docs.spring.io/spring-kafka/reference/retrytopic/dlt-strategies.html
https://stackoverflow.com/a/69302622/2070861

@santunioni
Copy link
Author

santunioni commented Jan 9, 2025

I want to ensure that this is the case and that you use non-blocking retries.

Yes. That is my case. I am using non-blocking retries.

However, after retrying N times by consuming and producing to topic <main-topic-name>-retry, the message goes to a DLQ topic not controlled by Spring. I reprocess DLQs by moving messages to the retry topic again, and this is fixed by my company tooling.

I don't know if the @DltHandler would work for me, when the system don't poll from DLT directly.

@santunioni
Copy link
Author

Could I implement the behavior with KafkaListenerErrorHandler? I think the documentation suggest it, but not sure.

@sobychacko
Copy link
Contributor

I guess we need to understand the issue a bit further. Do you have a small sample where the issue can be reproduced?

@artembilan
Copy link
Member

Well, sounds like you are over complicating.
So, what is to not send to DLT conditionally?
Mostly it means process the record as normal: if no exception, then nothing to send to retry topic and so on.
Perhaps you indeed can use that KafkaListenerErrorHandler to decide if to re-throw an exception for the record or not.
You may give it a chance to go through some retry topics cycle, but when it is done enough just swallow an exception for the record with that specific header.

I don't think we would need some extra contract to introduce if that is really possible with existing API.

Please, give it a chance and let us know!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants