|
| 1 | +[[kafka-queues]] |
| 2 | += Kafka Queues (Share Consumer) |
| 3 | + |
| 4 | +Starting with version 4.0, Spring for Apache Kafka provides support for Kafka Queues through share consumers, which are part of Apache Kafka 4.0.0 and implement https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka[KIP-932 (Queues for Kafka)]. |
| 5 | +This feature is currently in early access. |
| 6 | + |
| 7 | +Kafka Queues enable a different consumption model compared to traditional consumer groups. |
| 8 | +Instead of the partition-based assignment model where each partition is exclusively assigned to one consumer, share consumers can cooperatively consume from the same partitions, with records being distributed among the consumers in the share group. |
| 9 | + |
| 10 | +[[share-consumer-factory]] |
| 11 | +== Share Consumer Factory |
| 12 | + |
| 13 | +The `ShareConsumerFactory` is responsible for creating share consumer instances. |
| 14 | +Spring Kafka provides the `DefaultShareConsumerFactory` implementation. |
| 15 | + |
| 16 | +[[share-consumer-factory-configuration]] |
| 17 | +=== Configuration |
| 18 | + |
| 19 | +You can configure a `DefaultShareConsumerFactory` similar to how you configure a regular `ConsumerFactory`: |
| 20 | + |
| 21 | +[source,java] |
| 22 | +---- |
| 23 | +@Bean |
| 24 | +public ShareConsumerFactory<String, String> shareConsumerFactory() { |
| 25 | + Map<String, Object> props = new HashMap<>(); |
| 26 | + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); |
| 27 | + props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group"); |
| 28 | + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
| 29 | + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
| 30 | + return new DefaultShareConsumerFactory<>(props); |
| 31 | +} |
| 32 | +---- |
| 33 | + |
| 34 | +[[share-consumer-factory-constructors]] |
| 35 | +=== Constructor Options |
| 36 | + |
| 37 | +The `DefaultShareConsumerFactory` provides several constructor options: |
| 38 | + |
| 39 | +[source,java] |
| 40 | +---- |
| 41 | +// Basic configuration |
| 42 | +new DefaultShareConsumerFactory<>(configs); |
| 43 | +
|
| 44 | +// With deserializer suppliers |
| 45 | +new DefaultShareConsumerFactory<>(configs, keyDeserializerSupplier, valueDeserializerSupplier); |
| 46 | +
|
| 47 | +// With deserializer instances |
| 48 | +new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, configureDeserializers); |
| 49 | +---- |
| 50 | + |
| 51 | +[[share-consumer-factory-deserializers]] |
| 52 | +=== Deserializer Configuration |
| 53 | + |
| 54 | +You can configure deserializers in several ways: |
| 55 | + |
| 56 | +1. **Via Configuration Properties** (recommended for simple cases): |
| 57 | ++ |
| 58 | +[source,java] |
| 59 | +---- |
| 60 | +props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
| 61 | +props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
| 62 | +---- |
| 63 | + |
| 64 | +2. **Via Setters**: |
| 65 | ++ |
| 66 | +[source,java] |
| 67 | +---- |
| 68 | +factory.setKeyDeserializer(new StringDeserializer()); |
| 69 | +factory.setValueDeserializer(new StringDeserializer()); |
| 70 | +---- |
| 71 | + |
| 72 | +3. **Via Suppliers** (for cases where deserializers need to be created per consumer): |
| 73 | ++ |
| 74 | +[source,java] |
| 75 | +---- |
| 76 | +factory.setKeyDeserializerSupplier(() -> new StringDeserializer()); |
| 77 | +factory.setValueDeserializerSupplier(() -> new StringDeserializer()); |
| 78 | +---- |
| 79 | + |
| 80 | +Set `configureDeserializers` to `false` if your deserializers are already fully configured and should not be reconfigured by the factory. |
| 81 | + |
| 82 | +[[share-consumer-factory-listeners]] |
| 83 | +=== Lifecycle Listeners |
| 84 | + |
| 85 | +You can add listeners to monitor the lifecycle of share consumers: |
| 86 | + |
| 87 | +[source,java] |
| 88 | +---- |
| 89 | +factory.addListener(new ShareConsumerFactory.Listener<String, String>() { |
| 90 | + @Override |
| 91 | + public void consumerAdded(String id, ShareConsumer<String, String> consumer) { |
| 92 | + // Called when a new consumer is created |
| 93 | + System.out.println("Consumer added: " + id); |
| 94 | + } |
| 95 | +
|
| 96 | + @Override |
| 97 | + public void consumerRemoved(String id, ShareConsumer<String, String> consumer) { |
| 98 | + // Called when a consumer is closed |
| 99 | + System.out.println("Consumer removed: " + id); |
| 100 | + } |
| 101 | +}); |
| 102 | +---- |
| 103 | + |
| 104 | +[[share-message-listener-containers]] |
| 105 | +== Share Message Listener Containers |
| 106 | + |
| 107 | +[[share-kafka-message-listener-container]] |
| 108 | +=== ShareKafkaMessageListenerContainer |
| 109 | + |
| 110 | +The `ShareKafkaMessageListenerContainer` provides a simple, single-threaded container for share consumers: |
| 111 | + |
| 112 | +[source,java] |
| 113 | +---- |
| 114 | +@Bean |
| 115 | +public ShareKafkaMessageListenerContainer<String, String> container( |
| 116 | + ShareConsumerFactory<String, String> shareConsumerFactory) { |
| 117 | +
|
| 118 | + ContainerProperties containerProps = new ContainerProperties("my-topic"); |
| 119 | + containerProps.setGroupId("my-share-group"); |
| 120 | +
|
| 121 | + ShareKafkaMessageListenerContainer<String, String> container = |
| 122 | + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps); |
| 123 | +
|
| 124 | + container.setupMessageListener(new MessageListener<String, String>() { |
| 125 | + @Override |
| 126 | + public void onMessage(ConsumerRecord<String, String> record) { |
| 127 | + System.out.println("Received: " + record.value()); |
| 128 | + } |
| 129 | + }); |
| 130 | +
|
| 131 | + return container; |
| 132 | +} |
| 133 | +---- |
| 134 | + |
| 135 | +[[share-container-properties]] |
| 136 | +=== Container Properties |
| 137 | + |
| 138 | +Share containers support a subset of the container properties available for regular consumers: |
| 139 | + |
| 140 | +* `topics`: Array of topic names to subscribe to |
| 141 | +* `groupId`: The share group ID |
| 142 | +* `clientId`: The client ID for the consumer |
| 143 | +* `kafkaConsumerProperties`: Additional consumer properties |
| 144 | + |
| 145 | +[IMPORTANT] |
| 146 | +==== |
| 147 | +Share consumers do not support: |
| 148 | +
|
| 149 | +* Explicit partition assignment (`TopicPartitionOffset`) |
| 150 | +* Topic patterns |
| 151 | +* Manual offset management |
| 152 | +==== |
| 153 | + |
| 154 | +[[share-annotation-driven-listeners]] |
| 155 | +== Annotation-Driven Listeners |
| 156 | + |
| 157 | +[[share-kafka-listener]] |
| 158 | +=== @KafkaListener with Share Consumers |
| 159 | + |
| 160 | +You can use `@KafkaListener` with share consumers by configuring a `ShareKafkaListenerContainerFactory`: |
| 161 | + |
| 162 | +[source,java] |
| 163 | +---- |
| 164 | +@Configuration |
| 165 | +@EnableKafka |
| 166 | +public class ShareConsumerConfig { |
| 167 | +
|
| 168 | + @Bean |
| 169 | + public ShareConsumerFactory<String, String> shareConsumerFactory() { |
| 170 | + Map<String, Object> props = new HashMap<>(); |
| 171 | + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); |
| 172 | + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
| 173 | + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
| 174 | + return new DefaultShareConsumerFactory<>(props); |
| 175 | + } |
| 176 | +
|
| 177 | + @Bean |
| 178 | + public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory( |
| 179 | + ShareConsumerFactory<String, String> shareConsumerFactory) { |
| 180 | + return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory); |
| 181 | + } |
| 182 | +} |
| 183 | +---- |
| 184 | + |
| 185 | +Then use it in your listener: |
| 186 | + |
| 187 | +[source,java] |
| 188 | +---- |
| 189 | +@Component |
| 190 | +public class ShareMessageListener { |
| 191 | +
|
| 192 | + @KafkaListener( |
| 193 | + topics = "my-queue-topic", |
| 194 | + containerFactory = "shareKafkaListenerContainerFactory", |
| 195 | + groupId = "my-share-group" |
| 196 | + ) |
| 197 | + public void listen(ConsumerRecord<String, String> record) { |
| 198 | + System.out.println("Received from queue: " + record.value()); |
| 199 | + // Record is automatically acknowledged with ACCEPT |
| 200 | + } |
| 201 | +} |
| 202 | +---- |
| 203 | + |
| 204 | +[[share-group-configuration]] |
| 205 | +== Share Group Configuration |
| 206 | + |
| 207 | +Share groups require specific broker configuration to function properly. |
| 208 | +For testing with embedded Kafka, use: |
| 209 | + |
| 210 | +[source,java] |
| 211 | +---- |
| 212 | +@EmbeddedKafka( |
| 213 | + topics = {"my-queue-topic"}, |
| 214 | + brokerProperties = { |
| 215 | + "unstable.api.versions.enable=true", |
| 216 | + "group.coordinator.rebalance.protocols=classic,share", |
| 217 | + "share.coordinator.state.topic.replication.factor=1", |
| 218 | + "share.coordinator.state.topic.min.isr=1" |
| 219 | + } |
| 220 | +) |
| 221 | +---- |
| 222 | + |
| 223 | +[[share-group-offset-reset]] |
| 224 | +=== Share Group Offset Reset |
| 225 | + |
| 226 | +Unlike regular consumer groups, share groups use a different configuration for offset reset behavior. |
| 227 | +You can configure this programmatically: |
| 228 | + |
| 229 | +[source,java] |
| 230 | +---- |
| 231 | +private void configureShareGroup(String bootstrapServers, String groupId) throws Exception { |
| 232 | + Map<String, Object> adminProps = new HashMap<>(); |
| 233 | + adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); |
| 234 | +
|
| 235 | + try (Admin admin = Admin.create(adminProps)) { |
| 236 | + ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId); |
| 237 | + ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest"); |
| 238 | +
|
| 239 | + Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of( |
| 240 | + configResource, List.of(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET)) |
| 241 | + ); |
| 242 | +
|
| 243 | + admin.incrementalAlterConfigs(configs).all().get(); |
| 244 | + } |
| 245 | +} |
| 246 | +---- |
| 247 | + |
| 248 | +[[share-record-acknowledgment]] |
| 249 | +== Record Acknowledgment |
| 250 | + |
| 251 | +Currently, share consumers automatically acknowledge records with `AcknowledgeType.ACCEPT` after successful processing. |
| 252 | +More sophisticated acknowledgment patterns will be added in future versions. |
| 253 | + |
| 254 | +[[share-differences-from-regular-consumers]] |
| 255 | +== Differences from Regular Consumers |
| 256 | + |
| 257 | +Share consumers differ from regular consumers in several key ways: |
| 258 | + |
| 259 | +1. **No Partition Assignment**: Share consumers cannot be assigned specific partitions |
| 260 | +2. **No Topic Patterns**: Share consumers do not support subscribing to topic patterns |
| 261 | +3. **Cooperative Consumption**: Multiple consumers in the same share group can consume from the same partitions simultaneously |
| 262 | +4. **Automatic Acknowledgment**: Records are automatically acknowledged after processing |
| 263 | +5. **Different Group Management**: Share groups use different coordinator protocols |
| 264 | + |
| 265 | +[[share-limitations-and-considerations]] |
| 266 | +== Limitations and Considerations |
| 267 | + |
| 268 | +[[share-current-limitations]] |
| 269 | +=== Current Limitations |
| 270 | + |
| 271 | +* **Early Access**: This feature is in early access and may change in future versions |
| 272 | +* **Limited Acknowledgment Options**: Only automatic `ACCEPT` acknowledgment is currently supported |
| 273 | +* **No Message Converters**: Message converters are not yet supported for share consumers |
| 274 | +* **Single-Threaded**: Share consumer containers currently run in single-threaded mode |
0 commit comments