- 
                Notifications
    You must be signed in to change notification settings 
- Fork 21
Refactor KafkaListenerConfig to remove unused ObjectMapper and simplify product event handling #1217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…fy product event handling
| WalkthroughThe Kafka listener now receives a validated ProductDto payload directly (via @payload @Valid), removing ObjectMapper usage and its constructor dependency; onSaveProductEvent signature and constructor were updated and the DTO is forwarded to the product management service without manual JSON deserialization. Changes
 Sequence Diagram(s)sequenceDiagram
  autonumber
  actor Kafka as Kafka Broker
  participant Listener as KafkaListenerConfig
  participant Validator as Bean Validation
  participant ProductSvc as ProductManageService
  Kafka->>Listener: ProductDto message (deserialized by Kafka)
  Listener->>Validator: Validate ProductDto (@Valid)
  alt valid DTO
    Listener->>ProductSvc: productManageService.manage(productDto)
    ProductSvc-->>Listener: ack
  else invalid DTO
    Listener-->>Kafka: error handling / reject
  end
sequenceDiagram
  note over Listener: Old flow (before)
  actor Kafka as Kafka Broker
  participant Listener as KafkaListenerConfig
  participant Mapper as ObjectMapper
  participant ProductSvc as ProductManageService
  Kafka->>Listener: String payload (JSON)
  Listener->>Mapper: readValue(String, ProductDto)
  Mapper-->>Listener: ProductDto
  Listener->>ProductSvc: productManageService.manage(productDto)
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Poem
 Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
 ✅ Passed checks (1 passed)
 ✨ Finishing touches
 🧪 Generate unit tests
 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment  | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
- inventory-service/src/main/java/com/example/inventoryservice/config/KafkaListenerConfig.java(2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/src/{main,test}/java/**/*.java
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Always use imports rather than fully qualified class names in Java code
Files:
- inventory-service/src/main/java/com/example/inventoryservice/config/KafkaListenerConfig.java
🧠 Learnings (1)
📓 Common learnings
Learnt from: CR
PR: rajadilipkolli/spring-boot-microservices-series-v2#0
File: .github/copilot-instructions.md:0-0
Timestamp: 2025-09-28T10:03:05.049Z
Learning: If event payloads change, update all affected Kafka consumers/producers and prefer backward-compatible migrations
🪛 GitHub Actions: inventory-service
inventory-service/src/main/java/com/example/inventoryservice/config/KafkaListenerConfig.java
[error] 59-59: Cannot find symbol: class Valid. This indicates that the 'Valid' annotation is missing an import statement or dependency.
[error] 59-59: Compilation failure due to missing import or dependency for 'Valid' annotation.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Codacy Static Code Analysis
🔇 Additional comments (1)
inventory-service/src/main/java/com/example/inventoryservice/config/KafkaListenerConfig.java (1)
37-42: Constructor cleanup looks goodDropping the unused
ObjectMapperdependency keeps this bean focused on the two services it actually needs. Nice simplification.
        
          
                inventory-service/src/main/java/com/example/inventoryservice/config/KafkaListenerConfig.java
          
            Show resolved
            Hide resolved
        
      There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
- inventory-service/src/main/java/com/example/inventoryservice/config/KafkaListenerConfig.java(3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/src/{main,test}/java/**/*.java
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Always use imports rather than fully qualified class names in Java code
Files:
- inventory-service/src/main/java/com/example/inventoryservice/config/KafkaListenerConfig.java
🧠 Learnings (1)
📚 Learning: 2025-09-28T10:03:05.049Z
Learnt from: CR
PR: rajadilipkolli/spring-boot-microservices-series-v2#0
File: .github/copilot-instructions.md:0-0
Timestamp: 2025-09-28T10:03:05.049Z
Learning: If event payloads change, update all affected Kafka consumers/producers and prefer backward-compatible migrations
Applied to files:
- inventory-service/src/main/java/com/example/inventoryservice/config/KafkaListenerConfig.java
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Codacy Static Code Analysis
- GitHub Check: Inventory Service with jdk 25
| @KafkaListener(id = "products", topics = AppConstants.PRODUCT_TOPIC, groupId = "product") | ||
| public void onSaveProductEvent(@Payload String productDto) throws JsonProcessingException { | ||
| public void onSaveProductEvent(@Payload @Valid ProductDto productDto) | ||
| throws JsonProcessingException { | ||
| log.info("Received Product: {}", productDto); | ||
| productManageService.manage(objectMapper.readValue(productDto, ProductDto.class)); | ||
| productManageService.manage(productDto); | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restore a conversion step (or add a JsonDeserializer) before switching to ProductDto.
The listener still receives raw JSON strings from Kafka (the consumer config in this service uses the default StringDeserializer, and this PR doesn’t introduce a JsonDeserializer or message converter). With the new signature Spring will try to hand a String payload to a ProductDto parameter and throw MessageConversionException, so every product event will now fail. Please either keep the string payload (and deserialize with ObjectMapper) or wire up a Kafka consumer factory / message converter that materialises ProductDto instances before they reach this listener.
Apply this diff if you opt to restore the manual mapping:
@@
-    private final ProductManageService productManageService;
+    private final ProductManageService productManageService;
+    private final ObjectMapper objectMapper;
@@
-            InventoryOrderManageService orderManageService,
-            ProductManageService productManageService) {
+            InventoryOrderManageService orderManageService,
+            ProductManageService productManageService,
+            ObjectMapper objectMapper) {
         this.orderManageService = orderManageService;
         this.productManageService = productManageService;
+        this.objectMapper = objectMapper;
@@
-    public void onSaveProductEvent(@Payload @Valid ProductDto productDto)
+    public void onSaveProductEvent(@Payload String productDtoJson)
             throws JsonProcessingException {
-        log.info("Received Product: {}", productDto);
-        productManageService.manage(productDto);
+        var productDto = objectMapper.readValue(productDtoJson, ProductDto.class);
+        log.info("Received Product: {}", productDto);
+        productManageService.manage(productDto);
     }Based on learnings
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @KafkaListener(id = "products", topics = AppConstants.PRODUCT_TOPIC, groupId = "product") | |
| public void onSaveProductEvent(@Payload String productDto) throws JsonProcessingException { | |
| public void onSaveProductEvent(@Payload @Valid ProductDto productDto) | |
| throws JsonProcessingException { | |
| log.info("Received Product: {}", productDto); | |
| productManageService.manage(objectMapper.readValue(productDto, ProductDto.class)); | |
| productManageService.manage(productDto); | |
| } | |
| // Restore manual JSON deserialization for backward compatibility | |
| private final ProductManageService productManageService; | |
| private final ObjectMapper objectMapper; | |
| public KafkaListenerConfig( | |
| InventoryOrderManageService orderManageService, | |
| ProductManageService productManageService, | |
| ObjectMapper objectMapper) { | |
| this.orderManageService = orderManageService; | |
| this.productManageService = productManageService; | |
| this.objectMapper = objectMapper; | |
| } | |
| @KafkaListener(id = "products", topics = AppConstants.PRODUCT_TOPIC, groupId = "product") | |
| public void onSaveProductEvent(@Payload String productDtoJson) | |
| throws JsonProcessingException { | |
| var productDto = objectMapper.readValue(productDtoJson, ProductDto.class); | |
| log.info("Received Product: {}", productDto); | |
| productManageService.manage(productDto); | |
| } | 
🤖 Prompt for AI Agents
In
inventory-service/src/main/java/com/example/inventoryservice/config/KafkaListenerConfig.java
around lines 59-64, the listener signature was changed to accept ProductDto but
the consumer still uses StringDeserializer so Spring will receive raw JSON and
throw MessageConversionException; either revert the method to accept a String
and manually map to ProductDto using ObjectMapper (catch/handle
JsonProcessingException) before calling productManageService.manage(productDto),
or configure Kafka to deliver ProductDto by wiring a
ConsumerFactory/JsonDeserializer or a MessageConverter (configure
KafkaListenerContainerFactory with a RecordMessageConverter or set value
deserializer to JsonDeserializer<ProductDto> and trust packages), then restore
the ProductDto parameter so messages are deserialized before the listener.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@coderabbitai, can you please wire up a Kafka consumer factory / message converter that materialises ProductDto instances before they reach this listener.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
I'll help you configure a Kafka consumer factory with proper JSON deserialization for ProductDto. Let me first examine the existing Kafka configuration in your codebase.
🏁 Script executed:
#!/bin/bash
# Find existing Kafka configuration files
fd -e java -e yaml -e yml -e properties | xargs rg -l -i "kafka.*config|consumer.*factory|deserializer" | head -10
# Look for existing consumer configurations
rg -n -C3 "ConsumerFactory|KafkaListenerContainerFactory|JsonDeserializer" --type=java
# Check for existing Kafka properties or application configuration
find . -name "application*.yml" -o -name "application*.properties" -o -name "bootstrap*.yml" -o -name "bootstrap*.properties" | head -5 | xargs cat 2>/dev/null | grep -i kafka || echo "No Kafka properties found"
Length of output: 4079
Let me check the inventory service's current Kafka configuration:
🏁 Script executed:
#!/bin/bash
# Check inventory service's application properties
find inventory-service -name "application*.yml" -o -name "application*.properties" -o -name "bootstrap*.yml" -o -name "bootstrap*.properties" | xargs cat 2>/dev/null
# Look for any existing Kafka configuration classes in inventory service
fd -e java . inventory-service | xargs rg -l -i "kafka.*config|consumer.*factory" | head -5
# Check what other Kafka configurations exist in the project
rg -n -C5 "JsonDeserializer|ConsumerFactory" inventory-service --type=java
Length of output: 1748
Perfect! I can see that the inventory service doesn't have any Kafka consumer configuration in its application.properties, which means it's using the default StringDeserializer. Let me configure the Kafka consumer factory to use JsonDeserializer for ProductDto.
Here's the solution - you need to add Kafka consumer configuration to your inventory-service/src/main/resources/application.properties:
🏁 Script executed:
#!/bin/bash
# Check the current application.properties content and show where to add Kafka config
echo "=== Current inventory-service application.properties ==="
cat inventory-service/src/main/resources/application.properties | tail -10
echo -e "\n=== Kafka Configuration to Add ==="
cat << 'EOF'
######## Kafka Configuration  #########
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.common.dtos,com.example.inventoryservice.model.payload
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
EOF
Length of output: 789
Let me find the actual configuration file location for the inventory service:
🏁 Script executed:
#!/bin/bash
# Find all configuration files in inventory service
echo "=== Finding configuration files in inventory-service ==="
find inventory-service -name "application*" -o -name "bootstrap*" | sort
# Check if there are any resources directories
echo -e "\n=== Resources directories in inventory-service ==="
find inventory-service -type d -name resources
# List contents of resources directories if they exist
echo -e "\n=== Contents of resources directories ==="
find inventory-service -type d -name resources -exec ls -la {} \; 2>/dev/null
Length of output: 730
Now I can see the inventory service uses profile-specific properties files. Let me check their contents and provide the proper Kafka configuration:
🏁 Script executed:
#!/bin/bash
# Check the contents of both properties files
echo "=== Contents of application-local.properties ==="
cat inventory-service/src/main/resources/application-local.properties
echo -e "\n=== Contents of application-test.properties ==="
cat inventory-service/src/test/resources/application-test.properties
# Also check what the earlier output was from (it might have been concatenated from multiple files)
echo -e "\n=== Checking if there are other config files we missed ==="
find . -name "*.properties" -path "*/inventory-service/*" -exec echo "File: {}" \; -exec head -5 {} \; -exec echo "" \;
Length of output: 2658
@rajadilipkolli Perfect! I can see that the inventory service uses profile-specific properties files. To configure Kafka to properly deserialize ProductDto instances, you have two options:
Option 1: Create a base application.properties file (Recommended)
Create inventory-service/src/main/resources/application.properties with the following Kafka configuration:
######## Kafka Configuration  #########
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.common.dtos,com.example.inventoryservice.model.payload
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializerOption 2: Add to existing application-local.properties
Alternatively, you can add the same Kafka configuration to your existing inventory-service/src/main/resources/application-local.properties file.
Key Points:
- JsonDeserializer: This replaces the default StringDeserializerand will automatically convert JSON payloads toProductDtoobjects
- Trusted Packages: I've configured both com.example.common.dtos(forOrderDto) andcom.example.inventoryservice.model.payload(forProductDto) as trusted packages for security
- Consistency: This follows the same pattern used by your payment-service and order-service
For Testing:
Also add the same configuration to inventory-service/src/test/resources/application-test.properties to ensure your integration tests work correctly.
With this configuration, your current KafkaListenerConfig code will work perfectly - Spring Kafka will automatically deserialize the JSON payload to ProductDto before passing it to your onSaveProductEvent method, and the @Valid annotation will trigger validation as expected.
The JsonProcessingException in the method signature can actually be removed now since the deserialization happens at the framework level, but keeping it won't cause any issues if you prefer to leave it for backward compatibility.
🧠 Learnings used
Learnt from: CR
PR: rajadilipkolli/spring-boot-microservices-series-v2#0
File: .github/copilot-instructions.md:0-0
Timestamp: 2025-09-28T10:03:05.058Z
Learning: If event payloads change, update all affected Kafka consumers/producers and prefer backward-compatible migrations
No description provided.