Skip to content

Commit 1904a35

Browse files
committed
fix: make empty bucket use queue to remove underlying objects asynchronously
1 parent d82ebec commit 1904a35

File tree

5 files changed

+132
-5
lines changed

5 files changed

+132
-5
lines changed

src/storage/events/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ export * from './object-created'
44
export * from './object-updated'
55
export * from './object-removed'
66
export * from './object-admin-delete'
7+
export * from './object-admin-delete-batch'
78
export * from './backup-object'
89
export * from './run-migrations'
910
export * from './reset-migrations'
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import { BaseEvent } from './base-event'
2+
import { getConfig } from '../../config'
3+
import { Job, SendOptions, WorkOptions } from 'pg-boss'
4+
import { logger, logSchema } from '@internal/monitoring'
5+
import { Storage } from '../index'
6+
import { BasePayload } from '@internal/queue'
7+
8+
export interface ObjectDeleteBatchEvent extends BasePayload {
9+
prefixes: string[]
10+
bucketId: string
11+
}
12+
13+
const { storageS3Bucket, adminDeleteQueueTeamSize, adminDeleteConcurrency } = getConfig()
14+
15+
export class ObjectAdminDeleteBatch extends BaseEvent<ObjectDeleteBatchEvent> {
16+
static queueName = 'object:admin:delete-batch'
17+
18+
static getWorkerOptions(): WorkOptions {
19+
return {
20+
teamSize: adminDeleteQueueTeamSize,
21+
teamConcurrency: adminDeleteConcurrency,
22+
}
23+
}
24+
25+
static getQueueOptions(): SendOptions {
26+
return {
27+
priority: 10,
28+
}
29+
}
30+
31+
static async handle(job: Job<ObjectDeleteBatchEvent>) {
32+
console.log('in ObjectAdminDeleteBatch.handle()')
33+
let storage: Storage | undefined = undefined
34+
35+
const { prefixes, bucketId } = job.data
36+
if (prefixes.length < 1) {
37+
return
38+
}
39+
40+
try {
41+
storage = await this.createStorage(job.data)
42+
43+
logSchema.event(logger, `[Admin]: ObjectAdminDeleteBatch ${bucketId} ${prefixes.length}`, {
44+
jodId: job.id,
45+
type: 'event',
46+
event: 'ObjectAdminDeleteBatch',
47+
payload: JSON.stringify(job.data),
48+
objectPath: bucketId,
49+
resources: prefixes,
50+
tenantId: job.data.tenant.ref,
51+
project: job.data.tenant.ref,
52+
reqId: job.data.reqId,
53+
})
54+
55+
await storage.backend.deleteObjects(storageS3Bucket, prefixes)
56+
} catch (e) {
57+
logger.error(
58+
{
59+
error: e,
60+
jodId: job.id,
61+
type: 'event',
62+
event: 'ObjectAdminDeleteBatch',
63+
payload: JSON.stringify(job.data),
64+
objectPath: bucketId,
65+
resources: prefixes,
66+
tenantId: job.data.tenant.ref,
67+
project: job.data.tenant.ref,
68+
reqId: job.data.reqId,
69+
},
70+
`[Admin]: ObjectAdminDeleteBatch ${bucketId} ${prefixes.length} - FAILED`
71+
)
72+
throw e
73+
} finally {
74+
if (storage) {
75+
const tenant = storage.db.tenant()
76+
storage.db
77+
.destroyConnection()
78+
.then(() => {
79+
// no-op
80+
})
81+
.catch((e) => {
82+
logger.error(
83+
{ error: e },
84+
`[Admin]: ObjectAdminDeleteBatch ${tenant.ref} - FAILED DISPOSING CONNECTION`
85+
)
86+
})
87+
}
88+
}
89+
}
90+
}

src/storage/events/workers.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { Queue } from '@internal/queue'
22
import {
33
ObjectAdminDelete,
4+
ObjectAdminDeleteBatch,
45
Webhook,
56
RunMigrationsOnTenants,
67
BackupObjectEvent,
@@ -11,6 +12,7 @@ import {
1112
export function registerWorkers() {
1213
Queue.register(Webhook)
1314
Queue.register(ObjectAdminDelete)
15+
Queue.register(ObjectAdminDeleteBatch)
1416
Queue.register(RunMigrationsOnTenants)
1517
Queue.register(BackupObjectEvent)
1618
Queue.register(ResetMigrationsOnTenant)

src/storage/storage.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import { getFileSizeLimit, mustBeValidBucketName, parseFileSizeToBytes } from '.
66
import { getConfig } from '../config'
77
import { ObjectStorage } from './object'
88
import { InfoRenderer } from '@storage/renderer/info'
9-
import { logger, logSchema } from '@internal/monitoring'
9+
import { ObjectAdminDeleteBatch } from './events'
1010

11-
const { requestUrlLengthLimit, storageS3Bucket } = getConfig()
11+
const { requestUrlLengthLimit } = getConfig()
1212

1313
/**
1414
* Storage
@@ -200,15 +200,18 @@ export class Storage {
200200
)
201201

202202
if (deleted && deleted.length > 0) {
203-
const params = deleted.reduce((all, { name, version }) => {
203+
const prefixes = deleted.reduce((all, { name, version }) => {
204204
const fileName = withOptionalVersion(`${this.db.tenantId}/${bucketId}/${name}`, version)
205205
all.push(fileName)
206206
all.push(fileName + '.info')
207207
return all
208208
}, [] as string[])
209209
// delete files from s3 asynchronously
210-
this.backend.deleteObjects(storageS3Bucket, params).catch((e) => {
211-
logSchema.error(logger, 'Failed to delete objects from s3', { type: 's3', error: e })
210+
await ObjectAdminDeleteBatch.send({
211+
prefixes,
212+
bucketId,
213+
tenant: this.db.tenant(),
214+
reqId: this.db.reqId,
212215
})
213216
}
214217

src/test/bucket.test.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,23 @@ describe('testing EMPTY bucket', () => {
423423

424424
test('user is able to empty a bucket with a service key', async () => {
425425
const bucketId = 'bucket3'
426+
427+
// confirm there are items in the bucket before empty
428+
const responseList = await appInstance.inject({
429+
method: 'POST',
430+
url: '/object/list/' + bucketId,
431+
headers: {
432+
authorization: `Bearer ${process.env.SERVICE_KEY}`,
433+
},
434+
payload: {
435+
prefix: '',
436+
limit: 10,
437+
offset: 0,
438+
},
439+
})
440+
expect(responseList.statusCode).toBe(200)
441+
expect(responseList.json()).toHaveLength(2)
442+
426443
const response = await appInstance.inject({
427444
method: 'POST',
428445
url: `/bucket/${bucketId}/empty`,
@@ -433,6 +450,20 @@ describe('testing EMPTY bucket', () => {
433450
expect(response.statusCode).toBe(200)
434451
const responseJSON = JSON.parse(response.body)
435452
expect(responseJSON.message).toBe('Successfully emptied')
453+
454+
// confirm the bucket is actually empty after
455+
const responseList2 = await appInstance.inject({
456+
method: 'POST',
457+
url: '/object/list/' + bucketId,
458+
headers: {
459+
authorization: `Bearer ${process.env.SERVICE_KEY}`,
460+
},
461+
payload: {
462+
prefix: '',
463+
},
464+
})
465+
expect(responseList2.statusCode).toBe(200)
466+
expect(responseList2.json()).toHaveLength(0)
436467
})
437468

438469
test('user is able to delete a bucket', async () => {

0 commit comments

Comments
 (0)