|
58 | 58 | # Enforce idempotent producing for the internal RowProducer
|
59 | 59 | _default_producer_extra_config = {"enable.idempotence": True}
|
60 | 60 |
|
| 61 | +# Force assignment strategy to be "range" for co-partitioning in internal Consumers |
| 62 | +consumer_extra_config_overrides = {"partition.assignment.strategy": "range"} |
| 63 | + |
61 | 64 | _default_max_poll_interval_ms = 300000
|
62 | 65 |
|
63 | 66 |
|
@@ -301,7 +304,10 @@ def __init__(
|
301 | 304 | self._on_message_processed = on_message_processed
|
302 | 305 | self._on_processing_error = on_processing_error or default_on_processing_error
|
303 | 306 |
|
304 |
| - self._consumer = self._get_rowconsumer(on_error=on_consumer_error) |
| 307 | + self._consumer = self._get_rowconsumer( |
| 308 | + on_error=on_consumer_error, |
| 309 | + extra_config_overrides=consumer_extra_config_overrides, |
| 310 | + ) |
305 | 311 | self._producer = self._get_rowproducer(on_error=on_producer_error)
|
306 | 312 | self._running = False
|
307 | 313 | self._failed = False
|
@@ -593,20 +599,27 @@ def get_producer(self) -> Producer:
|
593 | 599 | )
|
594 | 600 |
|
595 | 601 | def _get_rowconsumer(
|
596 |
| - self, on_error: Optional[ConsumerErrorCallback] = None |
| 602 | + self, |
| 603 | + on_error: Optional[ConsumerErrorCallback] = None, |
| 604 | + extra_config_overrides: Optional[dict] = None, |
597 | 605 | ) -> RowConsumer:
|
598 | 606 | """
|
599 | 607 | Create a RowConsumer using the application config
|
600 | 608 |
|
601 | 609 | Used to create the application consumer as well as the sources consumers
|
602 | 610 | """
|
603 |
| - |
| 611 | + extra_config_overrides = extra_config_overrides or {} |
| 612 | + # Override the existing "extra_config" with new values |
| 613 | + extra_config = { |
| 614 | + **self._config.consumer_extra_config, |
| 615 | + **extra_config_overrides, |
| 616 | + } |
604 | 617 | return RowConsumer(
|
605 | 618 | broker_address=self._config.broker_address,
|
606 | 619 | consumer_group=self._config.consumer_group,
|
607 | 620 | auto_offset_reset=self._config.auto_offset_reset,
|
608 | 621 | auto_commit_enable=False, # Disable auto commit and manage commits manually
|
609 |
| - extra_config=self._config.consumer_extra_config, |
| 622 | + extra_config=extra_config, |
610 | 623 | on_error=on_error,
|
611 | 624 | )
|
612 | 625 |
|
@@ -692,7 +705,9 @@ def add_source(self, source: BaseSource, topic: Optional[Topic] = None) -> Topic
|
692 | 705 | source,
|
693 | 706 | topic,
|
694 | 707 | self._get_rowproducer(transactional=False),
|
695 |
| - self._get_rowconsumer(), |
| 708 | + self._get_rowconsumer( |
| 709 | + extra_config_overrides=consumer_extra_config_overrides |
| 710 | + ), |
696 | 711 | self._get_topic_manager(),
|
697 | 712 | )
|
698 | 713 | return topic
|
@@ -897,10 +912,6 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]):
|
897 | 912 | # get the source data.
|
898 | 913 | self._source_manager.start_sources()
|
899 | 914 |
|
900 |
| - # First commit everything processed so far because assignment can take a while |
901 |
| - # and fail |
902 |
| - self._processing_context.commit_checkpoint(force=True) |
903 |
| - |
904 | 915 | # Assign partitions manually to pause the changelog topics
|
905 | 916 | self._consumer.assign(topic_partitions)
|
906 | 917 | # Pause changelog topic+partitions immediately after assignment
|
|
0 commit comments