Skip to content

fix: make empty bucket use queue to remove underlying objects asynchronously #701

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type StorageConfigType = {
responseSMaxAge: number
anonKeyAsync: Promise<string>
serviceKeyAsync: Promise<string>
emptyBucketMax: number
storageBackendType: StorageBackendType
tenantId: string
requestUrlLengthLimit: number
Expand Down Expand Up @@ -325,6 +326,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
),
// Storage
storageBackendType: getOptionalConfigFromEnv('STORAGE_BACKEND') as StorageBackendType,
emptyBucketMax: parseInt(getOptionalConfigFromEnv('STORAGE_EMPTY_BUCKET_MAX') || '200000', 10),

// Storage - File
storageFilePath: getOptionalConfigFromEnv(
Expand Down
9 changes: 7 additions & 2 deletions src/http/routes/bucket/emptyBucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ const emptyBucketParamsSchema = {
const successResponseSchema = {
type: 'object',
properties: {
message: { type: 'string', examples: ['Successfully emptied'] },
message: {
type: 'string',
examples: ['Empty bucket has been queued. Completion may take up to an hour.'],
},
},
}
interface emptyBucketRequestInterface extends AuthenticatedRequest {
Expand All @@ -41,7 +44,9 @@ export default async function routes(fastify: FastifyInstance) {

await request.storage.emptyBucket(bucketId)

return response.status(200).send(createResponse('Successfully emptied'))
return response
.status(200)
.send(createResponse('Empty bucket has been queued. Completion may take up to an hour.'))
}
)
}
8 changes: 7 additions & 1 deletion src/internal/errors/codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ export const ERRORS = {
message: `The maximum number of this resource ${limit} is reached`,
originalError: e,
}),

FeatureNotEnabled: (resource: string, feature: string, e?: Error) =>
new StorageBackendError({
code: ErrorCode.InvalidRequest,
Expand All @@ -77,6 +76,13 @@ export const ERRORS = {
message: `The feature ${feature} is not enabled for this resource`,
originalError: e,
}),
UnableToEmptyBucket: (bucket: string) =>
new StorageBackendError({
code: ErrorCode.InvalidRequest,
resource: bucket,
httpStatusCode: 409,
message: `Unable to empty the bucket because it contains too many objects`,
}),
NoSuchBucket: (bucket: string, e?: Error) =>
new StorageBackendError({
code: ErrorCode.NoSuchBucket,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/database/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export interface Database {
filters?: Filters
): Promise<Filters['dontErrorOnEmpty'] extends true ? Bucket | undefined : Bucket>

countObjectsInBucket(bucketId: string): Promise<number>
countObjectsInBucket(bucketId: string, limit?: number): Promise<number>

deleteBucket(bucketId: string | string[]): Promise<Bucket[]>

Expand Down
56 changes: 33 additions & 23 deletions src/storage/database/knex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,21 @@ export class StorageKnexDB implements Database {
return result
}

async countObjectsInBucket(bucketId: string) {
async countObjectsInBucket(bucketId: string, limit?: number): Promise<number> {
// if we have a limit use select to only scan up to that limit
if (limit !== undefined) {
const result = await this.runQuery('CountObjectsInBucketWithLimit', (knex) => {
return knex.from('objects').where('bucket_id', bucketId).limit(limit).select(knex.raw('1'))
})
return result.length
}

// do full count if there is no limit
const result = await this.runQuery('CountObjectsInBucket', (knex) => {
return knex
.from<{ count: number }>('objects')
.where('bucket_id', bucketId)
.limit(10)
.count()
.first()
return knex.from('objects').where('bucket_id', bucketId).count().first<{ count: number }>()
})

return (result?.count as number) || 0
return result?.count || 0
}

async deleteBucket(bucketId: string | string[]) {
Expand Down Expand Up @@ -745,7 +749,10 @@ export class StorageKnexDB implements Database {
async mustLockObject(bucketId: string, objectName: string, version?: string) {
return this.runQuery('MustLockObject', async (knex) => {
const hash = hashStringToInt(`${bucketId}/${objectName}${version ? `/${version}` : ''}`)
const result = await knex.raw<any>(`SELECT pg_try_advisory_xact_lock(?);`, [hash])
const result = await knex.raw<{ rows: { pg_try_advisory_xact_lock: boolean }[] }>(
`SELECT pg_try_advisory_xact_lock(?);`,
[hash]
)
const lockAcquired = result.rows.shift()?.pg_try_advisory_xact_lock || false

if (!lockAcquired) {
Expand All @@ -764,7 +771,7 @@ export class StorageKnexDB implements Database {
) {
return this.runQuery('WaitObjectLock', async (knex) => {
const hash = hashStringToInt(`${bucketId}/${objectName}${version ? `/${version}` : ''}`)
const query = knex.raw<any>(`SELECT pg_advisory_xact_lock(?)`, [hash])
const query = knex.raw(`SELECT pg_advisory_xact_lock(?)`, [hash])

if (opts?.timeout) {
let timeoutInterval: undefined | NodeJS.Timeout
Expand Down Expand Up @@ -794,18 +801,21 @@ export class StorageKnexDB implements Database {

async searchObjects(bucketId: string, prefix: string, options: SearchObjectOption) {
return this.runQuery('SearchObjects', async (knex) => {
const result = await knex.raw('select * from storage.search(?,?,?,?,?,?,?,?)', [
prefix,
bucketId,
options.limit || 100,
prefix.split('/').length,
options.offset || 0,
options.search || '',
options.sortBy?.column ?? 'name',
options.sortBy?.order ?? 'asc',
])

return (result as any).rows
const result = await knex.raw<{ rows: Obj[] }>(
'select * from storage.search(?,?,?,?,?,?,?,?)',
[
prefix,
bucketId,
options.limit || 100,
prefix.split('/').length,
options.offset || 0,
options.search || '',
options.sortBy?.column ?? 'name',
options.sortBy?.order ?? 'asc',
]
)

return result.rows
})
}

Expand Down Expand Up @@ -940,7 +950,7 @@ export class StorageKnexDB implements Database {

if (typeof columns === 'object') {
value.forEach((column: string) => {
delete (columns as Record<any, any>)[column]
delete (columns as Record<string, object>)[column]
})
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/storage/events/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ export * from './lifecycle/bucket-deleted'
export * from './lifecycle/object-created'
export * from './lifecycle/object-updated'
export * from './lifecycle/object-removed'
export * from './lifecycle/object-admin-delete'
export * from './objects/object-admin-delete'
export * from './objects/object-admin-delete-all-before'
export * from './objects/backup-object'
export * from './migrations/run-migrations'
export * from './migrations/reset-migrations'
Expand Down
141 changes: 141 additions & 0 deletions src/storage/events/objects/object-admin-delete-all-before.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import { BaseEvent } from '../base-event'
import { getConfig } from '../../../config'
import { Job, SendOptions, WorkOptions } from 'pg-boss'
import { logger, logSchema } from '@internal/monitoring'
import { Storage } from '../../index'
import { BasePayload } from '@internal/queue'
import { withOptionalVersion } from '@storage/backend'

const DELETE_JOB_TIME_LIMIT_MS = 10_000

export interface ObjectDeleteAllBeforeEvent extends BasePayload {
before: string
bucketId: string
}

const { storageS3Bucket, requestUrlLengthLimit } = getConfig()

export class ObjectAdminDeleteAllBefore extends BaseEvent<ObjectDeleteAllBeforeEvent> {
static queueName = 'object:admin:delete-all-before'

static getWorkerOptions(): WorkOptions {
return {}
}

static getSendOptions(payload: ObjectDeleteAllBeforeEvent): SendOptions {
return {
singletonKey: `${payload.tenant.ref}/${payload.bucketId}`,
priority: 10,
expireInSeconds: 30,
}
}

static async handle(job: Job<ObjectDeleteAllBeforeEvent>) {
let storage: Storage | undefined = undefined

const tenantId = job.data.tenant.ref
const bucketId = job.data.bucketId
const before = new Date(job.data.before)

try {
storage = await this.createStorage(job.data)

logSchema.event(
logger,
`[Admin]: ObjectAdminDeleteAllBefore ${bucketId} ${before.toUTCString()}`,
{
jodId: job.id,
type: 'event',
event: 'ObjectAdminDeleteAllBefore',
payload: JSON.stringify(job.data),
objectPath: bucketId,
tenantId,
project: tenantId,
reqId: job.data.reqId,
}
)

const batchLimit = Math.floor(requestUrlLengthLimit / (36 + 3))

let moreObjectsToDelete = false
const start = Date.now()
while (Date.now() - start < DELETE_JOB_TIME_LIMIT_MS) {
moreObjectsToDelete = false
const objects = await storage.db.listObjects(bucketId, 'id, name', batchLimit + 1, before)

const backend = storage.backend
if (objects && objects.length > 0) {
if (objects.length > batchLimit) {
objects.pop()
moreObjectsToDelete = true
}

await storage.db.withTransaction(async (trx) => {
const deleted = await trx.deleteObjects(
bucketId,
objects.map(({ id }) => id!),
'id'
)

if (deleted && deleted.length > 0) {
const prefixes: string[] = []

for (const { name, version } of deleted) {
const fileName = withOptionalVersion(`${tenantId}/${bucketId}/${name}`, version)
prefixes.push(fileName)
prefixes.push(fileName + '.info')
}

await backend.deleteObjects(storageS3Bucket, prefixes)
}
})
}

if (!moreObjectsToDelete) {
break
}
}

if (moreObjectsToDelete) {
// delete next batch
await ObjectAdminDeleteAllBefore.send({
before,
bucketId,
tenant: job.data.tenant,
reqId: job.data.reqId,
})
}
} catch (e) {
logger.error(
{
error: e,
jodId: job.id,
type: 'event',
event: 'ObjectAdminDeleteAllBefore',
payload: JSON.stringify(job.data),
objectPath: bucketId,
tenantId,
project: tenantId,
reqId: job.data.reqId,
},
`[Admin]: ObjectAdminDeleteAllBefore ${bucketId} ${before.toUTCString()} - FAILED`
)
throw e
} finally {
if (storage) {
const tenant = storage.db.tenant()
storage.db
.destroyConnection()
.then(() => {
// no-op
})
.catch((e) => {
logger.error(
{ error: e },
`[Admin]: ObjectAdminDeleteAllBefore ${tenant.ref} - FAILED DISPOSING CONNECTION`
)
})
}
}
}
}
4 changes: 3 additions & 1 deletion src/storage/events/workers.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { Queue } from '@internal/queue'
import { Webhook } from './lifecycle/webhook'
import { ObjectAdminDelete } from './lifecycle/object-admin-delete'
import { ObjectAdminDelete } from './objects/object-admin-delete'
import { RunMigrationsOnTenants } from './migrations/run-migrations'
import { BackupObjectEvent } from './objects/backup-object'
import { ResetMigrationsOnTenant } from './migrations/reset-migrations'
import { JwksCreateSigningSecret } from './jwks/jwks-create-signing-secret'
import { UpgradePgBossV10 } from './pgboss/upgrade-v10'
import { ObjectAdminDeleteAllBefore } from './objects/object-admin-delete-all-before'

export function registerWorkers() {
Queue.register(Webhook)
Queue.register(ObjectAdminDelete)
Queue.register(ObjectAdminDeleteAllBefore)
Queue.register(RunMigrationsOnTenants)
Queue.register(BackupObjectEvent)
Queue.register(ResetMigrationsOnTenant)
Expand Down
Loading
Loading