Skip to content

Provide a tondem RecordFilterStrategy with MessageConverter. #3482

@ioanngolovko

Description

@ioanngolovko

Expected Behavior

Provide a tondem RecordFilterStrategy with MessageConverter.

Current Behavior

MessageConverter invokes after RecordFilterStrategy logic.

Context

If we configure MessageConverter (Json for example) -> then we expect to filter the converted Message, not just the deserialized ConsumerRecord (that is deserialized with String/Bytes/ByteArray SerDe). So for JsonDeserializer we must conver message inside our filter with our own logic, with ObjectMapper for example. It's dirty.

Filtering feature seems like incomplete due-to missing conjunction with Spring Messaging.

As you can see, here we try to filter original Consumer record and then invoke delegate logic.
https://github.com/spring-projects/spring-kafka/blob/3.2.x/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java#L71

Possible Solution

  • We must check if MessageConverter is configured for current MessageListenerContainer and build the right adapters chain
  • Refactor chain to allow new DelegatingFilterStrategy to decide what kind of data to filter, ConsumerRecord or Message.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions