Skip to content

Commit 25fda8f

Browse files
committed
list objects in worker when emptying bucket
1 parent 569778a commit 25fda8f

File tree

10 files changed

+186
-152
lines changed

10 files changed

+186
-152
lines changed

src/http/routes/bucket/emptyBucket.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ const emptyBucketParamsSchema = {
1414
const successResponseSchema = {
1515
type: 'object',
1616
properties: {
17-
message: { type: 'string', examples: ['Successfully emptied'] },
17+
message: {
18+
type: 'string',
19+
examples: ['Empty bucket has been queued. Completion may take up to an hour.'],
20+
},
1821
},
1922
}
2023
interface emptyBucketRequestInterface extends AuthenticatedRequest {
@@ -41,7 +44,9 @@ export default async function routes(fastify: FastifyInstance) {
4144

4245
await request.storage.emptyBucket(bucketId)
4346

44-
return response.status(200).send(createResponse('Successfully emptied'))
47+
return response
48+
.status(200)
49+
.send(createResponse('Empty bucket has been queued. Completion may take up to an hour.'))
4550
}
4651
)
4752
}

src/internal/errors/codes.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@ export const ERRORS = {
7474
code: ErrorCode.InvalidRequest,
7575
httpStatusCode: 409,
7676
message: `The feature ${feature} is not enabled for this resource`,
77+
originalError: e,
7778
}),
78-
UnableToEmptyBucket: (bucket: string, e?: Error) =>
79+
UnableToEmptyBucket: (bucket: string) =>
7980
new StorageBackendError({
8081
code: ErrorCode.InvalidRequest,
8182
resource: bucket,
8283
httpStatusCode: 409,
8384
message: `Unable to empty the bucket because it contains too many objects`,
84-
originalError: e,
8585
}),
8686
NoSuchBucket: (bucket: string, e?: Error) =>
8787
new StorageBackendError({

src/storage/events/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ export * from './lifecycle/bucket-deleted'
55
export * from './lifecycle/object-created'
66
export * from './lifecycle/object-updated'
77
export * from './lifecycle/object-removed'
8-
export * from './lifecycle/object-admin-delete'
9-
export * from './lifecycle/object-admin-delete-batch'
8+
export * from './objects/object-admin-delete'
9+
export * from './objects/object-admin-delete-all-before'
1010
export * from './objects/backup-object'
1111
export * from './migrations/run-migrations'
1212
export * from './migrations/reset-migrations'

src/storage/events/lifecycle/object-admin-delete-batch.ts

Lines changed: 0 additions & 87 deletions
This file was deleted.
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import { BaseEvent } from '../base-event'
2+
import { getConfig } from '../../../config'
3+
import { Job, SendOptions, WorkOptions } from 'pg-boss'
4+
import { logger, logSchema } from '@internal/monitoring'
5+
import { Storage } from '../../index'
6+
import { BasePayload } from '@internal/queue'
7+
import { withOptionalVersion } from '@storage/backend'
8+
9+
const DELETE_JOB_TIME_LIMIT_MS = 10_000
10+
11+
export interface ObjectDeleteAllBeforeEvent extends BasePayload {
12+
before: string
13+
bucketId: string
14+
}
15+
16+
const { storageS3Bucket, requestUrlLengthLimit } = getConfig()
17+
18+
export class ObjectAdminDeleteAllBefore extends BaseEvent<ObjectDeleteAllBeforeEvent> {
19+
static queueName = 'object:admin:delete-all-before'
20+
21+
static getWorkerOptions(): WorkOptions {
22+
return {}
23+
}
24+
25+
static getSendOptions(payload: ObjectDeleteAllBeforeEvent): SendOptions {
26+
return {
27+
singletonKey: `${payload.tenant.ref}/${payload.bucketId}`,
28+
priority: 10,
29+
expireInSeconds: 30,
30+
}
31+
}
32+
33+
static async handle(job: Job<ObjectDeleteAllBeforeEvent>) {
34+
let storage: Storage | undefined = undefined
35+
36+
const tenantId = job.data.tenant.ref
37+
const bucketId = job.data.bucketId
38+
const before = new Date(job.data.before)
39+
40+
try {
41+
storage = await this.createStorage(job.data)
42+
43+
logSchema.event(
44+
logger,
45+
`[Admin]: ObjectAdminDeleteAllBefore ${bucketId} ${before.toUTCString()}`,
46+
{
47+
jodId: job.id,
48+
type: 'event',
49+
event: 'ObjectAdminDeleteAllBefore',
50+
payload: JSON.stringify(job.data),
51+
objectPath: bucketId,
52+
tenantId,
53+
project: tenantId,
54+
reqId: job.data.reqId,
55+
}
56+
)
57+
58+
const batchLimit = Math.floor(requestUrlLengthLimit / (36 + 3))
59+
60+
let moreObjectsToDelete = false
61+
const start = Date.now()
62+
while (Date.now() - start < DELETE_JOB_TIME_LIMIT_MS) {
63+
moreObjectsToDelete = false
64+
const objects = await storage.db.listObjects(bucketId, 'id, name', batchLimit + 1, before)
65+
66+
const backend = storage.backend
67+
if (objects && objects.length > 0) {
68+
if (objects.length > batchLimit) {
69+
objects.pop()
70+
moreObjectsToDelete = true
71+
}
72+
73+
await storage.db.withTransaction(async (trx) => {
74+
const deleted = await trx.deleteObjects(
75+
bucketId,
76+
objects.map(({ id }) => id!),
77+
'id'
78+
)
79+
80+
if (deleted && deleted.length > 0) {
81+
const prefixes: string[] = []
82+
83+
for (const { name, version } of deleted) {
84+
const fileName = withOptionalVersion(`${tenantId}/${bucketId}/${name}`, version)
85+
prefixes.push(fileName)
86+
prefixes.push(fileName + '.info')
87+
}
88+
89+
await backend.deleteObjects(storageS3Bucket, prefixes)
90+
}
91+
})
92+
}
93+
94+
if (!moreObjectsToDelete) {
95+
break
96+
}
97+
}
98+
99+
if (moreObjectsToDelete) {
100+
// delete next batch
101+
await ObjectAdminDeleteAllBefore.send({
102+
before,
103+
bucketId,
104+
tenant: job.data.tenant,
105+
reqId: job.data.reqId,
106+
})
107+
}
108+
} catch (e) {
109+
logger.error(
110+
{
111+
error: e,
112+
jodId: job.id,
113+
type: 'event',
114+
event: 'ObjectAdminDeleteAllBefore',
115+
payload: JSON.stringify(job.data),
116+
objectPath: bucketId,
117+
tenantId,
118+
project: tenantId,
119+
reqId: job.data.reqId,
120+
},
121+
`[Admin]: ObjectAdminDeleteAllBefore ${bucketId} ${before.toUTCString()} - FAILED`
122+
)
123+
throw e
124+
} finally {
125+
if (storage) {
126+
const tenant = storage.db.tenant()
127+
storage.db
128+
.destroyConnection()
129+
.then(() => {
130+
// no-op
131+
})
132+
.catch((e) => {
133+
logger.error(
134+
{ error: e },
135+
`[Admin]: ObjectAdminDeleteAllBefore ${tenant.ref} - FAILED DISPOSING CONNECTION`
136+
)
137+
})
138+
}
139+
}
140+
}
141+
}

src/storage/events/workers.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
import { Queue } from '@internal/queue'
22
import { Webhook } from './lifecycle/webhook'
3-
import { ObjectAdminDelete } from './lifecycle/object-admin-delete'
3+
import { ObjectAdminDelete } from './objects/object-admin-delete'
44
import { RunMigrationsOnTenants } from './migrations/run-migrations'
55
import { BackupObjectEvent } from './objects/backup-object'
66
import { ResetMigrationsOnTenant } from './migrations/reset-migrations'
77
import { JwksCreateSigningSecret } from './jwks/jwks-create-signing-secret'
88
import { UpgradePgBossV10 } from './pgboss/upgrade-v10'
9-
import { ObjectAdminDeleteBatch } from './lifecycle/object-admin-delete-batch'
9+
import { ObjectAdminDeleteAllBefore } from './objects/object-admin-delete-all-before'
1010

1111
export function registerWorkers() {
1212
Queue.register(Webhook)
1313
Queue.register(ObjectAdminDelete)
14-
Queue.register(ObjectAdminDeleteBatch)
14+
Queue.register(ObjectAdminDeleteAllBefore)
1515
Queue.register(RunMigrationsOnTenants)
1616
Queue.register(BackupObjectEvent)
1717
Queue.register(ResetMigrationsOnTenant)

src/storage/storage.ts

Lines changed: 20 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ import { StorageObjectLocator } from '@storage/locator'
1717
import { BucketCreatedEvent, BucketDeleted } from '@storage/events'
1818
import { tenantHasMigrations } from '@internal/database/migrations'
1919
import { tenantHasFeature } from '@internal/database'
20-
import { ObjectAdminDeleteBatch } from './events'
20+
import { ObjectAdminDeleteAllBefore } from './events'
2121

22-
const { requestUrlLengthLimit, emptyBucketMax } = getConfig()
22+
const { emptyBucketMax } = getConfig()
2323

2424
/**
2525
* Storage
@@ -256,66 +256,34 @@ export class Storage {
256256
/**
257257
* Deletes all files in a bucket
258258
* @param bucketId
259+
* @param before limit to files before the specified time (defaults to now)
259260
*/
260-
async emptyBucket(bucketId: string) {
261+
async emptyBucket(bucketId: string, before: Date = new Date()) {
261262
await this.findBucket(bucketId, 'name')
262263

263264
const count = await this.db.countObjectsInBucket(bucketId, emptyBucketMax + 1)
264265
if (count > emptyBucketMax) {
265266
throw ERRORS.UnableToEmptyBucket(bucketId)
266267
}
267268

268-
while (true) {
269-
const objects = await this.db.listObjects(
270-
bucketId,
271-
'id, name',
272-
Math.floor(requestUrlLengthLimit / (36 + 3))
273-
)
274-
275-
if (!(objects && objects.length > 0)) {
276-
break
277-
}
278-
279-
const deleted = await this.db.deleteObjects(
280-
bucketId,
281-
objects.map(({ id }) => id!),
282-
'id'
283-
)
284-
285-
if (deleted && deleted.length > 0) {
286-
const prefixes = deleted.reduce((all, { name, version }) => {
287-
const fileName = this.location.getKeyLocation({
288-
tenantId: this.db.tenantId,
289-
bucketId,
290-
objectName: name,
291-
version,
292-
})
293-
all.push(fileName)
294-
all.push(fileName + '.info')
295-
return all
296-
}, [] as string[])
297-
// delete files from s3 asynchronously
298-
await ObjectAdminDeleteBatch.send({
299-
prefixes,
300-
bucketId,
301-
tenant: this.db.tenant(),
302-
reqId: this.db.reqId,
303-
})
304-
}
269+
const objects = await this.db.listObjects(bucketId, 'id, name', 1, before)
270+
if (!objects || objects.length < 1) {
271+
// the bucket is already empty
272+
return
273+
}
305274

306-
if (deleted?.length !== objects.length) {
307-
const deletedNames = new Set(deleted?.map(({ name }) => name))
308-
const remainingNames = objects
309-
.filter(({ name }) => !deletedNames.has(name))
310-
.map(({ name }) => name)
275+
// ensure delete permissions
276+
await this.db.testPermission((db) => {
277+
return db.deleteObject(bucketId, objects[0].id!)
278+
})
311279

312-
throw ERRORS.AccessDenied(
313-
`Cannot delete: ${remainingNames.join(
314-
' ,'
315-
)}, you may have SELECT but not DELETE permissions`
316-
)
317-
}
318-
}
280+
// use queue to recursively delete all objects created before the specified time
281+
await ObjectAdminDeleteAllBefore.send({
282+
before,
283+
bucketId,
284+
tenant: this.db.tenant(),
285+
reqId: this.db.reqId,
286+
})
319287
}
320288

321289
validateMimeType(mimeType: string[]) {

0 commit comments

Comments
 (0)