-
Notifications
You must be signed in to change notification settings - Fork 9
refactor consumer option to validate the version #81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -153,6 +153,14 @@ class ExchangeToExchangeBindingSpecification: | |||||
| binding_key: Optional[str] = None | ||||||
|
|
||||||
|
|
||||||
| class ConsumerOptions: | ||||||
| def validate(self, versions: Dict[str, bool]) -> None: | ||||||
| raise NotImplementedError("Subclasses should implement this method") | ||||||
|
|
||||||
| def filter_set(self) -> Dict[symbol, Described]: | ||||||
| raise NotImplementedError("Subclasses should implement this method") | ||||||
|
|
||||||
|
|
||||||
| @dataclass | ||||||
| class MessageProperties: | ||||||
| """ | ||||||
|
|
@@ -215,7 +223,7 @@ def __init__( | |||||
| self.sql = sql | ||||||
|
|
||||||
|
|
||||||
| class StreamConsumerOptions: | ||||||
| class StreamConsumerOptions(ConsumerOptions): | ||||||
| """ | ||||||
| Configuration options for stream queues. | ||||||
|
|
||||||
|
|
@@ -237,6 +245,7 @@ def __init__( | |||||
| ): | ||||||
|
|
||||||
| self._filter_set: Dict[symbol, Described] = {} | ||||||
| self._filter_option = filter_options | ||||||
|
|
||||||
| if offset_specification is None and filter_options is None: | ||||||
| raise ValidationCodeException( | ||||||
|
|
@@ -329,7 +338,6 @@ def _filter_message_properties( | |||||
| def _filter_application_properties( | ||||||
| self, application_properties: Optional[dict[str, Any]] | ||||||
| ) -> None: | ||||||
| app_prop = {} | ||||||
| if application_properties is not None: | ||||||
| app_prop = application_properties.copy() | ||||||
|
Comment on lines
341
to
342
|
||||||
|
|
||||||
|
|
@@ -356,6 +364,41 @@ def filter_set(self) -> Dict[symbol, Described]: | |||||
| """ | ||||||
| return self._filter_set | ||||||
|
|
||||||
| def validate(self, versions: Dict[str, bool]) -> None: | ||||||
| """ | ||||||
| Validates stream filter options against supported RabbitMQ server versions. | ||||||
|
|
||||||
| Args: | ||||||
| versions: Dictionary mapping version strings to boolean indicating support. | ||||||
|
|
||||||
| Raises: | ||||||
| ValidationCodeException: If a filter option requires a higher RabbitMQ version. | ||||||
| """ | ||||||
| if self._filter_option is None: | ||||||
| return | ||||||
| if self._filter_option.values and not versions.get("4.1.0", False): | ||||||
| raise ValidationCodeException( | ||||||
| "Stream filter by values requires RabbitMQ 4.1.0 or higher" | ||||||
| ) | ||||||
| if self._filter_option.match_unfiltered and not versions.get("4.1.0", False): | ||||||
| raise ValidationCodeException( | ||||||
| "Stream filter by match_unfiltered requires RabbitMQ 4.1.0 or higher" | ||||||
| ) | ||||||
| if self._filter_option.sql and not versions.get("4.2.0", False): | ||||||
| raise ValidationCodeException( | ||||||
| "Stream filter by SQL requires RabbitMQ 4.2.0 or higher" | ||||||
| ) | ||||||
| if self._filter_option.message_properties and not versions.get("4.1.0", False): | ||||||
| raise ValidationCodeException( | ||||||
| "Stream filter by SQL requires RabbitMQ 4.1.0 or higher" | ||||||
| ) | ||||||
| if self._filter_option.application_properties and not versions.get( | ||||||
| "4.1.0", False | ||||||
| ): | ||||||
| raise ValidationCodeException( | ||||||
| "Stream filter by SQL requires RabbitMQ 4.1.0 or higher" | ||||||
Gsantomaggio marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
| "Stream filter by SQL requires RabbitMQ 4.1.0 or higher" | |
| "Stream filter by application_properties requires RabbitMQ 4.1.0 or higher" |
Uh oh!
There was an error while loading. Please reload this page.