-
Notifications
You must be signed in to change notification settings - Fork 7
fix: subscriptions bookkeeping in pubsub clients #510
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| def __del__(self) -> None: | ||
| if not self._decremented: | ||
| self._decrement_stream_count() | ||
| self._decremented = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we implementing __del__? Do we need this somewhere? If not then we should remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemented __del__ in order to be able to decrement the number of active streams when a subscription closes. Is there a better method to implement this? Or is there a super() equivalent I should call here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pattern for this is to use a context manager. I don't think we can do this now though since it would break the API and expectations existing users have.
| def __del__(self) -> None: | ||
| if not self._decremented: | ||
| self._decrement_stream_count() | ||
| self._decremented = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pattern for this is to use a context manager. I don't think we can do this now though since it would break the API and expectations existing users have.
| def unsubscribe(self) -> None: | ||
| if not self._decremented: | ||
| self._decrement_stream_count() | ||
| self._decremented = True | ||
| self._client_stream.cancel() # type: ignore[misc] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's called in the tests / can be called by end users to immediately free up a stream
Closes https://github.com/momentohq/dev-eco-issue-tracker/issues/1157
Pubsub clients should do some bookkeeping of number of active subscriptions to be able to gracefully reject new subscribe requests rather than letting them silently queue up on the client.
Updated the client to use a pool of 4 unary grpc managers by default to match the other SDKs.
The stream managers use the existing
with_max_subscriptionsconfiguration to statically set the number of grpc managers in the stream pool.Created a sync and async
subscription_initializationtest suite like in other SDKs. Unfortunately, it appears these tests cannot be run against momento-local. The credential provider must useinsecure_channel()for http connections to momento-local, but apparently under the hood that means only one underlying TCP connection is created, but the tests are written expecting to exercise multiple grpc channels. I've excluded these tests from CI because they cannot be run against momento-local or the live service since they require a subscriptions limit greater than the default.