-
-
Notifications
You must be signed in to change notification settings - Fork 4.5k
feat(ACI): Send historical data to seer on update #102934
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
fe7ccdf
27cbf22
8db5489
b639c00
609abd8
d1faf32
1a7e598
5b5a19a
05d08de
42c309a
f9246d0
7a87a33
0236524
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| from datetime import timedelta | ||
| from typing import Any | ||
| from typing import Any, cast | ||
|
|
||
| from rest_framework import serializers | ||
|
|
||
|
|
@@ -8,7 +8,10 @@ | |
| from sentry.incidents.logic import enable_disable_subscriptions | ||
| from sentry.incidents.models.alert_rule import AlertRuleDetectionType | ||
| from sentry.relay.config.metric_extraction import on_demand_metrics_feature_flags | ||
| from sentry.seer.anomaly_detection.store_data_workflow_engine import send_new_detector_data | ||
| from sentry.seer.anomaly_detection.store_data_workflow_engine import ( | ||
| send_new_detector_data, | ||
| update_detector_data, | ||
| ) | ||
| from sentry.snuba.dataset import Dataset | ||
| from sentry.snuba.metrics.extraction import should_use_on_demand_metrics | ||
| from sentry.snuba.models import ( | ||
|
|
@@ -270,6 +273,20 @@ def update_data_source(self, instance: Detector, data_source: SnubaQueryDataSour | |
| "Invalid extrapolation mode for this detector type." | ||
| ) | ||
|
|
||
| # Handle a dynamic detector's snuba query changing | ||
| if instance.config.get("detection_type") == AlertRuleDetectionType.DYNAMIC: | ||
| if snuba_query.query != data_source.get( | ||
| "query" | ||
| ) or snuba_query.aggregate != data_source.get("aggregate"): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Prevent Unnecessary Seer API CallsThe condition checking if a dynamic detector's query or aggregate changed compares against |
||
| try: | ||
| validated_data_source = cast(dict[str, Any], data_source) | ||
| update_detector_data(instance, validated_data_source) | ||
| except Exception: | ||
| # don't update the snuba query if we failed to send data to Seer | ||
| raise serializers.ValidationError( | ||
| "Failed to send data to Seer, cannot update detector" | ||
| ) | ||
ceorourke marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| update_snuba_query( | ||
| snuba_query=snuba_query, | ||
| query_type=data_source.get("query_type", snuba_query.type), | ||
|
|
@@ -286,6 +303,21 @@ def update_data_source(self, instance: Detector, data_source: SnubaQueryDataSour | |
| ) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: 🔍 Detailed AnalysisWhen 💡 Suggested FixModify the call to 🤖 Prompt for AI AgentDid we get this right? 👍 / 👎 to inform future reviews. |
||
|
|
||
| def update(self, instance: Detector, validated_data: dict[str, Any]): | ||
| # Handle anomaly detection changes first in case we need to exit before saving | ||
| if ( | ||
| not instance.config.get("detection_type") == AlertRuleDetectionType.DYNAMIC | ||
| and validated_data.get("config", {}).get("detection_type") | ||
| == AlertRuleDetectionType.DYNAMIC | ||
| ): | ||
| # Detector has been changed to become a dynamic detector | ||
| try: | ||
| update_detector_data(instance, validated_data) | ||
| except Exception: | ||
| # Don't update if we failed to send data to Seer | ||
| raise serializers.ValidationError( | ||
| "Failed to send data to Seer, cannot update detector" | ||
| ) | ||
|
|
||
| super().update(instance, validated_data) | ||
|
|
||
| # Handle enable/disable query subscriptions | ||
|
|
@@ -299,6 +331,7 @@ def update(self, instance: Detector, validated_data: dict[str, Any]): | |
| if query_subscriptions: | ||
| enable_disable_subscriptions(query_subscriptions, enabled) | ||
|
|
||
| # Handle data sources | ||
| data_source: SnubaQueryDataSourceType | None = None | ||
|
|
||
| if "data_sources" in validated_data: | ||
|
|
@@ -324,10 +357,9 @@ def create(self, validated_data: dict[str, Any]): | |
| try: | ||
| send_new_detector_data(detector) | ||
| except Exception: | ||
| # Sending historical data failed; Detector won't be save, but we | ||
| # Sending historical data failed; Detector won't be saved, but we | ||
| # need to clean up database state that has already been created. | ||
| detector.workflow_condition_group.delete() | ||
|
|
||
| raise | ||
|
|
||
| schedule_update_project_config(detector) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| import logging | ||
| from typing import Any | ||
|
|
||
| import sentry_sdk | ||
| from django.conf import settings | ||
|
|
@@ -30,7 +31,7 @@ | |
| from sentry.utils import json, metrics | ||
| from sentry.utils.json import JSONDecodeError | ||
| from sentry.workflow_engine.models import DataCondition, DataSource, DataSourceDetector, Detector | ||
| from sentry.workflow_engine.types import DetectorException | ||
| from sentry.workflow_engine.types import DetectorException, DetectorPriorityLevel | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
@@ -40,42 +41,104 @@ | |
| ) | ||
|
|
||
|
|
||
| def send_new_detector_data(detector: Detector) -> None: | ||
| """ | ||
| Send historical data for a new Detector to Seer. | ||
| """ | ||
| def _fetch_related_models( | ||
| detector: Detector, method: str | ||
| ) -> tuple[DataSource, DataCondition, SnubaQuery]: | ||
| # XXX: it is technically possible (though not used today) that a detector could have multiple data sources | ||
| data_source_detector = DataSourceDetector.objects.filter(detector_id=detector.id).first() | ||
| if not data_source_detector: | ||
| raise DetectorException("Could not create detector, data source not found.") | ||
| raise DetectorException(f"Could not {method} detector, data source not found.") | ||
| data_source = data_source_detector.data_source | ||
|
|
||
| try: | ||
| query_subscription = QuerySubscription.objects.get(id=int(data_source.source_id)) | ||
| except QuerySubscription.DoesNotExist: | ||
| raise DetectorException( | ||
| f"Could not create detector, query subscription {data_source.source_id} not found." | ||
| f"Could not {method} detector, query subscription {data_source.source_id} not found." | ||
| ) | ||
| try: | ||
| snuba_query = SnubaQuery.objects.get(id=query_subscription.snuba_query_id) | ||
| except SnubaQuery.DoesNotExist: | ||
| raise DetectorException( | ||
| f"Could not create detector, snuba query {query_subscription.snuba_query_id} not found." | ||
| f"Could not {method} detector, snuba query {query_subscription.snuba_query_id} not found." | ||
| ) | ||
| try: | ||
| data_condition = DataCondition.objects.get( | ||
| condition_group=detector.workflow_condition_group | ||
| condition_group=detector.workflow_condition_group, | ||
| condition_result__in=[ | ||
| DetectorPriorityLevel.HIGH, | ||
| DetectorPriorityLevel.MEDIUM, | ||
| ], | ||
| ) | ||
| except (DataCondition.DoesNotExist, DataCondition.MultipleObjectsReturned): | ||
| # there should only ever be one data condition for a dynamic metric detector, we dont actually expect a MultipleObjectsReturned | ||
| # there should only ever be one non-resolution data condition for a dynamic metric detector, we dont actually expect a MultipleObjectsReturned | ||
| dcg_id = ( | ||
| detector.workflow_condition_group.id | ||
| if detector.workflow_condition_group is not None | ||
| else None | ||
| ) | ||
| raise DetectorException( | ||
| f"Could not create detector, data condition {dcg_id} not found or too many found." | ||
| f"Could not {method} detector, data condition {dcg_id} not found or too many found." | ||
| ) | ||
| return data_source, data_condition, snuba_query | ||
|
|
||
|
|
||
| def update_detector_data( | ||
| detector: Detector, | ||
| updated_fields: dict[str, Any], | ||
| ) -> None: | ||
| data_source, data_condition, snuba_query = _fetch_related_models(detector, "update") | ||
|
|
||
| # use setattr to avoid saving the models until the Seer call has successfully finished, | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the same as we do it for alert rules https://github.com/getsentry/sentry/blob/master/src/sentry/seer/anomaly_detection/store_data.py#L122 |
||
| # otherwise they would be in a bad state | ||
| updated_data_condition_data = updated_fields.get("condition_group", {}).get("conditions") | ||
| if updated_data_condition_data: | ||
| for k, v in updated_data_condition_data[0].items(): | ||
| setattr(data_condition, k, v) | ||
|
|
||
| event_types = snuba_query.event_types | ||
| updated_data_source_data = updated_fields.get("data_sources") | ||
| if updated_data_source_data: | ||
| data_source_data = updated_data_source_data[0] | ||
| event_types = data_source_data.get("eventTypes") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Casing Conflict Invalidates Event DataThe code attempts to retrieve |
||
|
|
||
| for k, v in data_source_data.items(): | ||
| if k == "dataset": | ||
| v = v.value | ||
| elif k == "time_window": | ||
| time_window = data_source_data.get("time_window") | ||
| v = time_window if time_window is not None else snuba_query.time_window | ||
| elif k == "event_types": | ||
| continue | ||
| setattr(snuba_query, k, v) | ||
|
|
||
| try: | ||
| handle_send_historical_data_to_seer( | ||
| detector, | ||
| data_source, | ||
| data_condition, | ||
| snuba_query, | ||
| detector.project, | ||
| SeerMethod.UPDATE, | ||
| event_types, | ||
| ) | ||
| except TimeoutError: | ||
| raise ValidationError("Timed out sending data to Seer, unable to update detector") | ||
| except MaxRetryError: | ||
| raise ValidationError("Hit max retries sending data to Seer, unable to update detector") | ||
| except ParseError: | ||
| raise ValidationError("Couldn't parse response from Seer, unable to update detector") | ||
| except ValidationError: | ||
| raise ValidationError("Hit validation error, unable to update detector") | ||
| metrics.incr("anomaly_detection_monitor.updated") | ||
|
|
||
|
|
||
| def send_new_detector_data(detector: Detector) -> None: | ||
| """ | ||
| Send historical data for a new Detector to Seer. | ||
| """ | ||
| data_source, data_condition, snuba_query = _fetch_related_models(detector, "create") | ||
|
|
||
| try: | ||
| handle_send_historical_data_to_seer( | ||
| detector, data_source, data_condition, snuba_query, detector.project, SeerMethod.CREATE | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check other snuba query fields as well (like timeWindow)? Should we just resend the data every time we update a dynamic detector?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, I will look into this for a follow up. I think every time we change it isn't necessary (e.g. it could just be changing the name of the detector) but we might be missing some cases we should be updating.