@@ -187,6 +187,7 @@ public void setClientIdSuffix(String clientIdSuffix) {
187
187
* either explicitly or by Kafka; may be null if not assigned yet.
188
188
*/
189
189
@ Override
190
+ @ Nullable
190
191
public Collection <TopicPartition > getAssignedPartitions () {
191
192
ListenerConsumer listenerConsumer = this .listenerConsumer ;
192
193
if (listenerConsumer != null ) {
@@ -1402,17 +1403,19 @@ public String toString() {
1402
1403
+ "]" ;
1403
1404
}
1404
1405
1405
- private void closeProducers (Collection <TopicPartition > partitions ) {
1406
- ProducerFactory <?, ?> producerFactory = this .kafkaTxManager .getProducerFactory ();
1407
- partitions .forEach (tp -> {
1408
- try {
1409
- producerFactory .closeProducerFor (zombieFenceTxIdSuffix (tp .topic (), tp .partition ()));
1410
- }
1411
- catch (Exception e ) {
1412
- this .logger .error ("Failed to close producer with transaction id suffix: "
1413
- + zombieFenceTxIdSuffix (tp .topic (), tp .partition ()), e );
1414
- }
1415
- });
1406
+ private void closeProducers (@ Nullable Collection <TopicPartition > partitions ) {
1407
+ if (partitions != null ) {
1408
+ ProducerFactory <?, ?> producerFactory = this .kafkaTxManager .getProducerFactory ();
1409
+ partitions .forEach (tp -> {
1410
+ try {
1411
+ producerFactory .closeProducerFor (zombieFenceTxIdSuffix (tp .topic (), tp .partition ()));
1412
+ }
1413
+ catch (Exception e ) {
1414
+ this .logger .error ("Failed to close producer with transaction id suffix: "
1415
+ + zombieFenceTxIdSuffix (tp .topic (), tp .partition ()), e );
1416
+ }
1417
+ });
1418
+ }
1416
1419
}
1417
1420
1418
1421
private String zombieFenceTxIdSuffix (String topic , int partition ) {
0 commit comments