Skip to content

Commit 958155c

Browse files
feat(exports): add celery task to delete old anonymous exports DEV-1358 (#6509)
### 📣 Summary Automatically removes old anonymous exports to prevent outdated files from being accessible and to keep storage usage under control. ### 📖 Description Anonymous exports are now cleaned up automatically after they expire. A new recurring background task deletes export files and their corresponding records once they are older than the value defined in `ANON_EXPORTS_CLEANUP_AGE` (configured in Constance, default: 30 minutes). To ensure system stability, the cleanup runs in small batches of 50 exports per execution and uses a distributed lock to prevent multiple workers from deleting the same exports simultaneously. This keeps storage usage under control and prevents outdated anonymous export files from remaining accessible. Authenticated exports and other project data are not affected. --------- Co-authored-by: Olivier Léger <[email protected]>
1 parent 5c1ba9b commit 958155c

File tree

3 files changed

+174
-1
lines changed

3 files changed

+174
-1
lines changed

kobo/settings/base.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,10 @@
475475
'Enable automatic deletion of attachments for users who have exceeded '
476476
'their storage limits.'
477477
),
478+
'ANONYMOUS_EXPORTS_GRACE_PERIOD': (
479+
30,
480+
'Number of minutes after which anonymous export tasks are cleaned up.',
481+
),
478482
'LIMIT_ATTACHMENT_REMOVAL_GRACE_PERIOD': (
479483
90,
480484
'Number of days to keep attachments after the user has exceeded their '
@@ -729,6 +733,7 @@
729733
'MASS_EMAIL_ENQUEUED_RECORD_EXPIRY',
730734
'MASS_EMAIL_TEST_EMAILS',
731735
'USAGE_LIMIT_ENFORCEMENT',
736+
'ANONYMOUS_EXPORTS_GRACE_PERIOD',
732737
),
733738
'Rest Services': (
734739
'ALLOW_UNSECURED_HOOK_ENDPOINTS',
@@ -1445,6 +1450,12 @@ def dj_stripe_request_callback_method():
14451450
'options': {'queue': 'kpi_low_priority_queue'}
14461451
},
14471452
# Schedule every 15 minutes
1453+
'cleanup-anonymous-exports': {
1454+
'task': 'kpi.tasks.cleanup_anonymous_exports',
1455+
'schedule': crontab(minute='*/5'),
1456+
'options': {'queue': 'kpi_low_priority_queue'},
1457+
},
1458+
# Schedule every 15 minutes
14481459
'refresh-user-report-snapshot': {
14491460
'task': 'kobo.apps.user_reports.tasks.refresh_user_report_snapshots',
14501461
'schedule': crontab(minute='*/15'),

kpi/tasks.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,29 @@
11
import time
2+
from datetime import timedelta
23

34
import requests
5+
from constance import config
46
from django.apps import apps
57
from django.conf import settings
68
from django.core import mail
79
from django.core.exceptions import ObjectDoesNotExist
810
from django.core.management import call_command
11+
from django.db import DatabaseError, transaction
12+
from django.utils import timezone
913

1014
from kobo.apps.kobo_auth.shortcuts import User
1115
from kobo.apps.markdownx_uploader.tasks import remove_unused_markdown_files
1216
from kobo.celery import celery_app
1317
from kpi.constants import LIMIT_HOURS_23
1418
from kpi.maintenance_tasks import remove_old_asset_snapshots, remove_old_import_tasks
1519
from kpi.models.asset import Asset
16-
from kpi.models.import_export_task import ImportTask, SubmissionExportTask
20+
from kpi.models.import_export_task import (
21+
ImportExportStatusChoices,
22+
ImportTask,
23+
SubmissionExportTask,
24+
)
25+
from kpi.utils.log import logging
26+
from kpi.utils.object_permission import get_anonymous_user
1727

1828

1929
@celery_app.task(
@@ -68,6 +78,56 @@ def export_task_in_background(
6878
)
6979

7080

81+
@celery_app.task
82+
def cleanup_anonymous_exports(**kwargs):
83+
"""
84+
Task to clean up export tasks created by the AnonymousUser that are older
85+
than `ANONYMOUS_EXPORTS_GRACE_PERIOD`, excluding those that are still processing
86+
"""
87+
BATCH_SIZE = 200
88+
cutoff_time = timezone.now() - timedelta(
89+
minutes=config.ANONYMOUS_EXPORTS_GRACE_PERIOD
90+
)
91+
92+
old_export_ids = (
93+
SubmissionExportTask.objects.filter(
94+
user=get_anonymous_user(),
95+
date_created__lt=cutoff_time,
96+
)
97+
.exclude(status=ImportExportStatusChoices.PROCESSING)
98+
.order_by('date_created')
99+
.values_list('pk', flat=True)[:BATCH_SIZE]
100+
)
101+
102+
if not old_export_ids:
103+
logging.info('No old anonymous exports to clean up.')
104+
return
105+
106+
deleted_count = 0
107+
for export_id in old_export_ids:
108+
try:
109+
with transaction.atomic():
110+
# Acquire a row-level lock without waiting
111+
export = (
112+
SubmissionExportTask.objects.only('pk', 'uid', 'result')
113+
.select_for_update(nowait=True)
114+
.get(pk=export_id)
115+
)
116+
117+
if export.result:
118+
try:
119+
export.result.delete(save=False)
120+
except Exception as e:
121+
logging.error(
122+
f'Error deleting file for export {export.uid}: {e}'
123+
)
124+
export.delete()
125+
deleted_count += 1
126+
except DatabaseError:
127+
logging.info(f'Export {export_id} is currently being processed. Skipping.')
128+
logging.info(f'Cleaned up {deleted_count} old anonymous exports.')
129+
130+
71131
@celery_app.task
72132
def sync_kobocat_xforms(
73133
username=None,
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import threading
2+
from datetime import timedelta
3+
4+
from django.db import transaction
5+
from django.test import TransactionTestCase
6+
from django.utils import timezone
7+
8+
from kpi.models.import_export_task import (
9+
ImportExportStatusChoices,
10+
SubmissionExportTask,
11+
)
12+
from kpi.tasks import cleanup_anonymous_exports
13+
from kpi.utils.object_permission import get_anonymous_user
14+
15+
16+
class AnonymousExportCleanupTestCase(TransactionTestCase):
17+
def _create_export_task(
18+
self, status=ImportExportStatusChoices.COMPLETE, minutes_old=60
19+
):
20+
export = SubmissionExportTask()
21+
export.user = get_anonymous_user()
22+
export.status = status
23+
export.data = {'type': 'xls', 'source': 'test'}
24+
export.save()
25+
26+
if minutes_old > 0:
27+
past_time = timezone.now() - timedelta(minutes=minutes_old)
28+
SubmissionExportTask.objects.filter(uid=export.uid).update(
29+
date_created=past_time
30+
)
31+
export.refresh_from_db()
32+
return export
33+
34+
def test_exports_older_than_30_minutes_are_deleted(self):
35+
# Export older than 30 min - should be deleted
36+
old_export = self._create_export_task(minutes_old=31)
37+
38+
# Export newer than 30 min - should be kept
39+
recent_export = self._create_export_task(minutes_old=29)
40+
41+
cleanup_anonymous_exports()
42+
self.assertFalse(
43+
SubmissionExportTask.objects.filter(uid=old_export.uid).exists()
44+
)
45+
self.assertTrue(
46+
SubmissionExportTask.objects.filter(uid=recent_export.uid).exists()
47+
)
48+
49+
def test_processing_exports_are_not_deleted(self):
50+
"""
51+
Test that exports with PROCESSING status are never deleted
52+
"""
53+
processing_export = self._create_export_task(
54+
status=ImportExportStatusChoices.PROCESSING, minutes_old=100
55+
)
56+
57+
cleanup_anonymous_exports()
58+
self.assertTrue(
59+
SubmissionExportTask.objects.filter(uid=processing_export.uid).exists()
60+
)
61+
62+
def test_concurrency_locked_rows_are_skipped(self):
63+
exports = [self._create_export_task(minutes_old=120) for _ in range(5)]
64+
locked_export = exports[0]
65+
not_locked_exports = exports[1:]
66+
67+
lock_acquired = threading.Event()
68+
release_lock = threading.Event()
69+
70+
def lock_row():
71+
with transaction.atomic():
72+
# Lock the row
73+
SubmissionExportTask.objects.select_for_update().get(
74+
pk=locked_export.pk
75+
)
76+
77+
# Signal that the lock is active
78+
lock_acquired.set()
79+
80+
# Wait until the main thread signals we can release the lock
81+
release_lock.wait()
82+
83+
t = threading.Thread(target=lock_row)
84+
t.start()
85+
86+
# Wait for the row-level lock to actually be acquired
87+
lock_acquired.wait()
88+
89+
# Now cleanup should try select_for_update(nowait=True), causing DatabaseError
90+
cleanup_anonymous_exports()
91+
92+
# Let the locking thread finish its transaction
93+
release_lock.set()
94+
t.join()
95+
96+
# Verify the locked row was not deleted
97+
assert SubmissionExportTask.objects.filter(pk=locked_export.pk).exists()
98+
99+
# Verify unlocked rows were deleted
100+
assert not SubmissionExportTask.objects.filter(
101+
pk__in=[e.pk for e in not_locked_exports]
102+
).exists()

0 commit comments

Comments
 (0)