Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions kobo/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,10 @@
'Enable automatic deletion of attachments for users who have exceeded '
'their storage limits.'
),
'ANONYMOUS_EXPORTS_GRACE_PERIOD': (
30,
'Number of minutes after which anonymous export tasks are cleaned up.',
),
'LIMIT_ATTACHMENT_REMOVAL_GRACE_PERIOD': (
90,
'Number of days to keep attachments after the user has exceeded their '
Expand Down Expand Up @@ -729,6 +733,7 @@
'MASS_EMAIL_ENQUEUED_RECORD_EXPIRY',
'MASS_EMAIL_TEST_EMAILS',
'USAGE_LIMIT_ENFORCEMENT',
'ANONYMOUS_EXPORTS_GRACE_PERIOD',
),
'Rest Services': (
'ALLOW_UNSECURED_HOOK_ENDPOINTS',
Expand Down Expand Up @@ -1445,6 +1450,12 @@ def dj_stripe_request_callback_method():
'options': {'queue': 'kpi_low_priority_queue'}
},
# Schedule every 15 minutes
'cleanup-anonymous-exports': {
'task': 'kpi.tasks.cleanup_anonymous_exports',
'schedule': crontab(minute='*/5'),
'options': {'queue': 'kpi_low_priority_queue'},
},
# Schedule every 15 minutes
'refresh-user-report-snapshot': {
'task': 'kobo.apps.user_reports.tasks.refresh_user_report_snapshots',
'schedule': crontab(minute='*/15'),
Expand Down
62 changes: 61 additions & 1 deletion kpi/tasks.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
import time
from datetime import timedelta

import requests
from constance import config
from django.apps import apps
from django.conf import settings
from django.core import mail
from django.core.exceptions import ObjectDoesNotExist
from django.core.management import call_command
from django.db import DatabaseError, transaction
from django.utils import timezone

from kobo.apps.kobo_auth.shortcuts import User
from kobo.apps.markdownx_uploader.tasks import remove_unused_markdown_files
from kobo.celery import celery_app
from kpi.constants import LIMIT_HOURS_23
from kpi.maintenance_tasks import remove_old_asset_snapshots, remove_old_import_tasks
from kpi.models.asset import Asset
from kpi.models.import_export_task import ImportTask, SubmissionExportTask
from kpi.models.import_export_task import (
ImportExportStatusChoices,
ImportTask,
SubmissionExportTask,
)
from kpi.utils.log import logging
from kpi.utils.object_permission import get_anonymous_user


@celery_app.task(
Expand Down Expand Up @@ -68,6 +78,56 @@ def export_task_in_background(
)


@celery_app.task
def cleanup_anonymous_exports(**kwargs):
"""
Task to clean up export tasks created by the AnonymousUser that are older
than `ANONYMOUS_EXPORTS_GRACE_PERIOD`, excluding those that are still processing
"""
BATCH_SIZE = 200
cutoff_time = timezone.now() - timedelta(
minutes=config.ANONYMOUS_EXPORTS_GRACE_PERIOD
)

old_export_ids = (
SubmissionExportTask.objects.filter(
user=get_anonymous_user(),
date_created__lt=cutoff_time,
)
.exclude(status=ImportExportStatusChoices.PROCESSING)
.order_by('date_created')
.values_list('pk', flat=True)[:BATCH_SIZE]
)

if not old_export_ids:
logging.info('No old anonymous exports to clean up.')
return

deleted_count = 0
for export_id in old_export_ids:
try:
with transaction.atomic():
# Acquire a row-level lock without waiting
export = (
SubmissionExportTask.objects.only('pk', 'uid', 'result')
.select_for_update(nowait=True)
.get(pk=export_id)
)

if export.result:
try:
export.result.delete(save=False)
except Exception as e:
logging.error(
f'Error deleting file for export {export.uid}: {e}'
)
export.delete()
deleted_count += 1
except DatabaseError:
logging.info(f'Export {export_id} is currently being processed. Skipping.')
logging.info(f'Cleaned up {deleted_count} old anonymous exports.')


@celery_app.task
def sync_kobocat_xforms(
username=None,
Expand Down
102 changes: 102 additions & 0 deletions kpi/tests/test_cleanup_anonymous_exports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import threading
from datetime import timedelta

from django.db import transaction
from django.test import TransactionTestCase
from django.utils import timezone

from kpi.models.import_export_task import (
ImportExportStatusChoices,
SubmissionExportTask,
)
from kpi.tasks import cleanup_anonymous_exports
from kpi.utils.object_permission import get_anonymous_user


class AnonymousExportCleanupTestCase(TransactionTestCase):
def _create_export_task(
self, status=ImportExportStatusChoices.COMPLETE, minutes_old=60
):
export = SubmissionExportTask()
export.user = get_anonymous_user()
export.status = status
export.data = {'type': 'xls', 'source': 'test'}
export.save()

if minutes_old > 0:
past_time = timezone.now() - timedelta(minutes=minutes_old)
SubmissionExportTask.objects.filter(uid=export.uid).update(
date_created=past_time
)
export.refresh_from_db()
return export

def test_exports_older_than_30_minutes_are_deleted(self):
# Export older than 30 min - should be deleted
old_export = self._create_export_task(minutes_old=31)

# Export newer than 30 min - should be kept
recent_export = self._create_export_task(minutes_old=29)

cleanup_anonymous_exports()
self.assertFalse(
SubmissionExportTask.objects.filter(uid=old_export.uid).exists()
)
self.assertTrue(
SubmissionExportTask.objects.filter(uid=recent_export.uid).exists()
)

def test_processing_exports_are_not_deleted(self):
"""
Test that exports with PROCESSING status are never deleted
"""
processing_export = self._create_export_task(
status=ImportExportStatusChoices.PROCESSING, minutes_old=100
)

cleanup_anonymous_exports()
self.assertTrue(
SubmissionExportTask.objects.filter(uid=processing_export.uid).exists()
)

def test_concurrency_locked_rows_are_skipped(self):
exports = [self._create_export_task(minutes_old=120) for _ in range(5)]
locked_export = exports[0]
not_locked_exports = exports[1:]

lock_acquired = threading.Event()
release_lock = threading.Event()

def lock_row():
with transaction.atomic():
# Lock the row
SubmissionExportTask.objects.select_for_update().get(
pk=locked_export.pk
)

# Signal that the lock is active
lock_acquired.set()

# Wait until the main thread signals we can release the lock
release_lock.wait()

t = threading.Thread(target=lock_row)
t.start()

# Wait for the row-level lock to actually be acquired
lock_acquired.wait()

# Now cleanup should try select_for_update(nowait=True), causing DatabaseError
cleanup_anonymous_exports()

# Let the locking thread finish its transaction
release_lock.set()
t.join()

# Verify the locked row was not deleted
assert SubmissionExportTask.objects.filter(pk=locked_export.pk).exists()

# Verify unlocked rows were deleted
assert not SubmissionExportTask.objects.filter(
pk__in=[e.pk for e in not_locked_exports]
).exists()