Skip to content

Commit 580836b

Browse files
authored
feat(aci): Move workflow engine scheduling to new task (#99535)
Creates a new task that (if options allow it) schedules batch workflow processing. Workflow scheduling and Delayed rule scheduling run on different redis clusters with different client types and thus some different constraints. We mostly don't want to touch rule processing until we delete it, whereas workflow engine is under active development. By splitting the scheduling, we can make changes for workflow engine without needing to worry about maintaining or modifying rules processing behavior. The deployment/cutover plans is: 1. Deploy new task, verify it is running and doing nothing. 2. Set up a PR to set `delayed_workflow.rollout` to False and another setting setting it to True and `workflow_engine.use_process_pending_batch` to False. 3. Merge the first PR, then the second immediately after the first is deployed. This should disable the workflow engine publishing, allow it to finish, then re-enable it in the new task.
1 parent 549157c commit 580836b

File tree

5 files changed

+174
-37
lines changed

5 files changed

+174
-37
lines changed

src/sentry/conf/server.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1208,6 +1208,12 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
12081208
"schedule": crontab(minute="*/1"),
12091209
"options": {"expires": 10, "queue": "buffers.process_pending_batch"},
12101210
},
1211+
"flush-delayed-workflows": {
1212+
"task": "sentry.tasks.process_buffer.schedule_delayed_workflows",
1213+
# Run every 1 minute
1214+
"schedule": crontab(minute="*/1"),
1215+
"options": {"expires": 10, "queue": "workflow_engine.process_workflows"},
1216+
},
12111217
"sync-options": {
12121218
"task": "sentry.tasks.options.sync_options",
12131219
# Run every 10 seconds
@@ -1634,6 +1640,10 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
16341640
"task": "buffer:sentry.tasks.process_buffer.process_pending_batch",
16351641
"schedule": task_crontab("*/1", "*", "*", "*", "*"),
16361642
},
1643+
"flush-delayed-workflows": {
1644+
"task": "workflow_engine:sentry.tasks.process_buffer.schedule_delayed_workflows",
1645+
"schedule": task_crontab("*/1", "*", "*", "*", "*"),
1646+
},
16371647
"sync-options": {
16381648
"task": "options:sentry.tasks.options.sync_options",
16391649
"schedule": timedelta(seconds=10),

src/sentry/options/defaults.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3143,6 +3143,17 @@
31433143
default=True,
31443144
flags=FLAG_AUTOMATOR_MODIFIABLE,
31453145
)
3146+
# Control whether delayed workflow engine evaluation is done
3147+
# via the old process_pending_batch task with rules delayed processing, or
3148+
# via the new schedule_delayed_workflows task.
3149+
# NB: These tasks are allowed to run concurrently, so a naive switch here
3150+
# may result in duplicate processing.
3151+
register(
3152+
"workflow_engine.use_new_scheduling_task",
3153+
type=Bool,
3154+
default=False,
3155+
flags=FLAG_AUTOMATOR_MODIFIABLE,
3156+
)
31463157

31473158
# Restrict uptime issue creation for specific host provider identifiers. Items
31483159
# in this list map to the `host_provider_id` column in the UptimeSubscription

src/sentry/rules/processing/buffer_processing.py

Lines changed: 51 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -159,45 +159,59 @@ def process_in_batches(
159159
)
160160

161161

162-
def process_buffer() -> None:
162+
def process_buffer_for_type(processing_type: str, handler: type[DelayedProcessingBase]) -> None:
163+
"""
164+
Process buffers for a specific processing type and handler.
165+
"""
163166
should_emit_logs = options.get("delayed_processing.emit_logs")
164167

165-
for processing_type, handler in delayed_processing_registry.registrations.items():
166-
if handler.option and not options.get(handler.option):
167-
log_name = f"{processing_type}.disabled"
168-
logger.info(log_name, extra={"option": handler.option})
169-
continue
168+
if handler.option and not options.get(handler.option):
169+
log_name = f"{processing_type}.disabled"
170+
logger.info(log_name, extra={"option": handler.option})
171+
return
170172

171-
buffer = handler.buffer_backend()
172-
173-
with metrics.timer(f"{processing_type}.process_all_conditions.duration"):
174-
# We need to use a very fresh timestamp here; project scores (timestamps) are
175-
# updated with each relevant event, and some can be updated every few milliseconds.
176-
# The staler this timestamp, the more likely it'll miss some recently updated projects,
177-
# and the more likely we'll have frequently updated projects that are never actually
178-
# retrieved and processed here.
179-
fetch_time = datetime.now(tz=timezone.utc).timestamp()
180-
buffer_keys = handler.get_buffer_keys()
181-
all_project_ids_and_timestamps = buffer.bulk_get_sorted_set(
182-
buffer_keys,
183-
min=0,
184-
max=fetch_time,
185-
)
173+
buffer = handler.buffer_backend()
174+
175+
with metrics.timer(f"{processing_type}.process_all_conditions.duration"):
176+
# We need to use a very fresh timestamp here; project scores (timestamps) are
177+
# updated with each relevant event, and some can be updated every few milliseconds.
178+
# The staler this timestamp, the more likely it'll miss some recently updated projects,
179+
# and the more likely we'll have frequently updated projects that are never actually
180+
# retrieved and processed here.
181+
fetch_time = datetime.now(tz=timezone.utc).timestamp()
182+
buffer_keys = handler.get_buffer_keys()
183+
all_project_ids_and_timestamps = buffer.bulk_get_sorted_set(
184+
buffer_keys,
185+
min=0,
186+
max=fetch_time,
187+
)
186188

187-
if should_emit_logs:
188-
log_str = ", ".join(
189-
f"{project_id}: {timestamps}"
190-
for project_id, timestamps in all_project_ids_and_timestamps.items()
191-
)
192-
log_name = f"{processing_type}.project_id_list"
193-
logger.info(log_name, extra={"project_ids": log_str})
194-
195-
project_ids = list(all_project_ids_and_timestamps.keys())
196-
for project_id in project_ids:
197-
process_in_batches(buffer, project_id, processing_type)
198-
199-
buffer.delete_keys(
200-
buffer_keys,
201-
min=0,
202-
max=fetch_time,
189+
if should_emit_logs:
190+
log_str = ", ".join(
191+
f"{project_id}: {timestamps}"
192+
for project_id, timestamps in all_project_ids_and_timestamps.items()
203193
)
194+
log_name = f"{processing_type}.project_id_list"
195+
logger.info(log_name, extra={"project_ids": log_str})
196+
197+
project_ids = list(all_project_ids_and_timestamps.keys())
198+
for project_id in project_ids:
199+
process_in_batches(buffer, project_id, processing_type)
200+
201+
buffer.delete_keys(
202+
buffer_keys,
203+
min=0,
204+
max=fetch_time,
205+
)
206+
207+
208+
def process_buffer() -> None:
209+
"""
210+
Process all registered delayed processing types.
211+
"""
212+
for processing_type, handler in delayed_processing_registry.registrations.items():
213+
# If the new scheduling task is enabled and this is delayed_workflow, skip it
214+
use_new_scheduling = options.get("workflow_engine.use_new_scheduling_task")
215+
if use_new_scheduling and processing_type == "delayed_workflow":
216+
continue
217+
process_buffer_for_type(processing_type, handler)

src/sentry/tasks/process_buffer.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
import sentry_sdk
55
from django.apps import apps
66

7+
from sentry import options
78
from sentry.db.models.base import Model
89
from sentry.tasks.base import instrumented_task
10+
from sentry.taskworker import namespaces
911
from sentry.taskworker.config import TaskworkerConfig
1012
from sentry.taskworker.namespaces import buffer_tasks
1113
from sentry.utils.locking import UnableToAcquireLock
@@ -63,6 +65,37 @@ def process_pending_batch() -> None:
6365
logger.warning("process_pending_batch.fail", extra={"error": error})
6466

6567

68+
@instrumented_task(
69+
name="sentry.tasks.process_buffer.schedule_delayed_workflows",
70+
queue="workflow_engine.process_workflows",
71+
taskworker_config=TaskworkerConfig(
72+
namespace=namespaces.workflow_engine_tasks,
73+
processing_deadline_duration=40,
74+
),
75+
)
76+
def schedule_delayed_workflows() -> None:
77+
"""
78+
Schedule delayed workflow buffers in a batch.
79+
"""
80+
from sentry.rules.processing.buffer_processing import process_buffer_for_type
81+
from sentry.workflow_engine.tasks.delayed_workflows import DelayedWorkflow
82+
83+
lock = get_process_lock("schedule_delayed_workflows")
84+
85+
try:
86+
with lock.acquire():
87+
# Only process delayed_workflow type
88+
use_new_scheduling = options.get("workflow_engine.use_new_scheduling_task")
89+
if not use_new_scheduling:
90+
logger.info(
91+
"Configured to use process_pending_batch for delayed_workflow; exiting."
92+
)
93+
return
94+
process_buffer_for_type("delayed_workflow", DelayedWorkflow)
95+
except UnableToAcquireLock as error:
96+
logger.warning("schedule_delayed_workflows.fail", extra={"error": error})
97+
98+
6699
@instrumented_task(
67100
name="sentry.tasks.process_buffer.process_incr",
68101
queue="counters-0",

tests/sentry/tasks/test_process_buffer.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
process_incr,
99
process_pending,
1010
process_pending_batch,
11+
schedule_delayed_workflows,
1112
)
1213
from sentry.testutils.cases import TestCase
1314

@@ -50,3 +51,71 @@ def test_process_pending_batch_locked_out(self, mock_process_buffer: mock.MagicM
5051
with self.assertNoLogs("sentry.tasks.process_buffer", level="WARNING"):
5152
process_pending_batch()
5253
assert len(mock_process_buffer.mock_calls) == 1
54+
55+
56+
class ScheduleDelayedWorkflowsTest(TestCase):
57+
@mock.patch("sentry.options.get")
58+
@mock.patch("sentry.rules.processing.buffer_processing.process_buffer_for_type")
59+
@mock.patch("sentry.workflow_engine.tasks.delayed_workflows.DelayedWorkflow")
60+
def test_schedule_delayed_workflows_locked_out(
61+
self,
62+
mock_delayed_workflow: mock.MagicMock,
63+
mock_process_buffer_for_type: mock.MagicMock,
64+
mock_options_get: mock.MagicMock,
65+
) -> None:
66+
# Mock the config option to return True (using new scheduling task)
67+
mock_options_get.return_value = True
68+
69+
with self.assertLogs("sentry.tasks.process_buffer", level="WARNING") as logger:
70+
lock = get_process_lock("schedule_delayed_workflows")
71+
with lock.acquire():
72+
schedule_delayed_workflows()
73+
self.assertEqual(len(logger.output), 1)
74+
assert len(mock_process_buffer_for_type.mock_calls) == 0
75+
76+
with self.assertNoLogs("sentry.tasks.process_buffer", level="WARNING"):
77+
schedule_delayed_workflows()
78+
assert len(mock_process_buffer_for_type.mock_calls) == 1
79+
mock_process_buffer_for_type.assert_called_with(
80+
"delayed_workflow", mock_delayed_workflow
81+
)
82+
83+
@mock.patch("sentry.options.get")
84+
@mock.patch("sentry.rules.processing.buffer_processing.process_buffer_for_type")
85+
@mock.patch("sentry.workflow_engine.tasks.delayed_workflows.DelayedWorkflow")
86+
def test_schedule_delayed_workflows_config_option_true(
87+
self,
88+
mock_delayed_workflow: mock.MagicMock,
89+
mock_process_buffer_for_type: mock.MagicMock,
90+
mock_options_get: mock.MagicMock,
91+
) -> None:
92+
# Mock the config option to return False (not using new scheduling task)
93+
mock_options_get.return_value = False
94+
95+
with self.assertLogs("sentry.tasks.process_buffer", level="INFO") as logger:
96+
schedule_delayed_workflows()
97+
# Should log and exit without calling process_buffer_for_type
98+
assert len(mock_process_buffer_for_type.mock_calls) == 0
99+
assert any(
100+
"Configured to use process_pending_batch" in output for output in logger.output
101+
)
102+
103+
@mock.patch("sentry.options.get")
104+
@mock.patch("sentry.rules.processing.buffer_processing.process_buffer_for_type")
105+
@mock.patch("sentry.workflow_engine.tasks.delayed_workflows.DelayedWorkflow")
106+
def test_schedule_delayed_workflows_normal_operation(
107+
self,
108+
mock_delayed_workflow: mock.MagicMock,
109+
mock_process_buffer_for_type: mock.MagicMock,
110+
mock_options_get: mock.MagicMock,
111+
) -> None:
112+
# Mock the config option to return True (using new scheduling task)
113+
mock_options_get.return_value = True
114+
115+
schedule_delayed_workflows()
116+
117+
# Should only process delayed_workflow
118+
assert len(mock_process_buffer_for_type.mock_calls) == 1
119+
mock_process_buffer_for_type.assert_called_once_with(
120+
"delayed_workflow", mock_delayed_workflow
121+
)

0 commit comments

Comments
 (0)