Skip to content

Commit 987481f

Browse files
committed
list objects in worker when emptying bucket
1 parent 6cbff77 commit 987481f

File tree

8 files changed

+182
-146
lines changed

8 files changed

+182
-146
lines changed

src/http/routes/bucket/emptyBucket.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ const emptyBucketParamsSchema = {
1414
const successResponseSchema = {
1515
type: 'object',
1616
properties: {
17-
message: { type: 'string', examples: ['Successfully emptied'] },
17+
message: {
18+
type: 'string',
19+
examples: ['Empty bucket has been queued. Completion may take up to an hour.'],
20+
},
1821
},
1922
}
2023
interface emptyBucketRequestInterface extends AuthenticatedRequest {
@@ -41,7 +44,9 @@ export default async function routes(fastify: FastifyInstance) {
4144

4245
await request.storage.emptyBucket(bucketId)
4346

44-
return response.status(200).send(createResponse('Successfully emptied'))
47+
return response
48+
.status(200)
49+
.send(createResponse('Empty bucket has been queued. Completion may take up to an hour.'))
4550
}
4651
)
4752
}

src/storage/events/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ export * from './base-event'
33
export * from './lifecycle/object-created'
44
export * from './lifecycle/object-updated'
55
export * from './lifecycle/object-removed'
6-
export * from './lifecycle/object-admin-delete'
7-
export * from './lifecycle/object-admin-delete-batch'
6+
export * from './objects/object-admin-delete'
7+
export * from './objects/object-admin-delete-all-before'
88
export * from './objects/backup-object'
99
export * from './migrations/run-migrations'
1010
export * from './migrations/reset-migrations'

src/storage/events/lifecycle/object-admin-delete-batch.ts

Lines changed: 0 additions & 87 deletions
This file was deleted.
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import { BaseEvent } from '../base-event'
2+
import { getConfig } from '../../../config'
3+
import { Job, SendOptions, WorkOptions } from 'pg-boss'
4+
import { logger, logSchema } from '@internal/monitoring'
5+
import { Storage } from '../../index'
6+
import { BasePayload } from '@internal/queue'
7+
import { withOptionalVersion } from '@storage/backend'
8+
9+
const DELETE_JOB_TIME_LIMIT_MS = 10_000
10+
11+
export interface ObjectDeleteAllBeforeEvent extends BasePayload {
12+
before: string
13+
bucketId: string
14+
}
15+
16+
const { storageS3Bucket, requestUrlLengthLimit } = getConfig()
17+
18+
export class ObjectAdminDeleteAllBefore extends BaseEvent<ObjectDeleteAllBeforeEvent> {
19+
static queueName = 'object:admin:delete-all-before'
20+
21+
static getWorkerOptions(): WorkOptions {
22+
return {}
23+
}
24+
25+
static getSendOptions(payload: ObjectDeleteAllBeforeEvent): SendOptions {
26+
return {
27+
singletonKey: `${payload.tenant.ref}/${payload.bucketId}`,
28+
priority: 10,
29+
expireInSeconds: 30,
30+
}
31+
}
32+
33+
static async handle(job: Job<ObjectDeleteAllBeforeEvent>) {
34+
let storage: Storage | undefined = undefined
35+
36+
const tenantId = job.data.tenant.ref
37+
const bucketId = job.data.bucketId
38+
const before = new Date(job.data.before)
39+
40+
try {
41+
storage = await this.createStorage(job.data)
42+
43+
logSchema.event(
44+
logger,
45+
`[Admin]: ObjectAdminDeleteAllBefore ${bucketId} ${before.toUTCString()}`,
46+
{
47+
jodId: job.id,
48+
type: 'event',
49+
event: 'ObjectAdminDeleteAllBefore',
50+
payload: JSON.stringify(job.data),
51+
objectPath: bucketId,
52+
tenantId,
53+
project: tenantId,
54+
reqId: job.data.reqId,
55+
}
56+
)
57+
58+
const batchLimit = Math.floor(requestUrlLengthLimit / (36 + 3))
59+
60+
let moreObjectsToDelete = false
61+
const start = Date.now()
62+
while (Date.now() - start < DELETE_JOB_TIME_LIMIT_MS) {
63+
moreObjectsToDelete = false
64+
const objects = await storage.db.listObjects(bucketId, 'id, name', batchLimit + 1, before)
65+
66+
const backend = storage.backend
67+
if (objects && objects.length > 0) {
68+
if (objects.length > batchLimit) {
69+
objects.pop()
70+
moreObjectsToDelete = true
71+
}
72+
73+
await storage.db.withTransaction(async (trx) => {
74+
const deleted = await trx.deleteObjects(
75+
bucketId,
76+
objects.map(({ id }) => id!),
77+
'id'
78+
)
79+
80+
if (deleted && deleted.length > 0) {
81+
const prefixes: string[] = []
82+
83+
for (const { name, version } of deleted) {
84+
const fileName = withOptionalVersion(`${tenantId}/${bucketId}/${name}`, version)
85+
prefixes.push(fileName)
86+
prefixes.push(fileName + '.info')
87+
}
88+
89+
await backend.deleteObjects(storageS3Bucket, prefixes)
90+
}
91+
})
92+
}
93+
94+
if (!moreObjectsToDelete) {
95+
break
96+
}
97+
}
98+
99+
if (moreObjectsToDelete) {
100+
// delete next batch
101+
await ObjectAdminDeleteAllBefore.send({
102+
before,
103+
bucketId,
104+
tenant: job.data.tenant,
105+
reqId: job.data.reqId,
106+
})
107+
}
108+
} catch (e) {
109+
logger.error(
110+
{
111+
error: e,
112+
jodId: job.id,
113+
type: 'event',
114+
event: 'ObjectAdminDeleteAllBefore',
115+
payload: JSON.stringify(job.data),
116+
objectPath: bucketId,
117+
tenantId,
118+
project: tenantId,
119+
reqId: job.data.reqId,
120+
},
121+
`[Admin]: ObjectAdminDeleteAllBefore ${bucketId} ${before.toUTCString()} - FAILED`
122+
)
123+
throw e
124+
} finally {
125+
if (storage) {
126+
const tenant = storage.db.tenant()
127+
storage.db
128+
.destroyConnection()
129+
.then(() => {
130+
// no-op
131+
})
132+
.catch((e) => {
133+
logger.error(
134+
{ error: e },
135+
`[Admin]: ObjectAdminDeleteAllBefore ${tenant.ref} - FAILED DISPOSING CONNECTION`
136+
)
137+
})
138+
}
139+
}
140+
}
141+
}

src/storage/events/workers.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
import { Queue } from '@internal/queue'
22
import { Webhook } from './lifecycle/webhook'
3-
import { ObjectAdminDelete } from './lifecycle/object-admin-delete'
3+
import { ObjectAdminDelete } from './objects/object-admin-delete'
44
import { RunMigrationsOnTenants } from './migrations/run-migrations'
55
import { BackupObjectEvent } from './objects/backup-object'
66
import { ResetMigrationsOnTenant } from './migrations/reset-migrations'
77
import { JwksCreateSigningSecret } from './jwks/jwks-create-signing-secret'
88
import { UpgradePgBossV10 } from './pgboss/upgrade-v10'
9-
import { ObjectAdminDeleteBatch } from './lifecycle/object-admin-delete-batch'
9+
import { ObjectAdminDeleteAllBefore } from './objects/object-admin-delete-all-before'
1010

1111
export function registerWorkers() {
1212
Queue.register(Webhook)
1313
Queue.register(ObjectAdminDelete)
14-
Queue.register(ObjectAdminDeleteBatch)
14+
Queue.register(ObjectAdminDeleteAllBefore)
1515
Queue.register(RunMigrationsOnTenants)
1616
Queue.register(BackupObjectEvent)
1717
Queue.register(ResetMigrationsOnTenant)

src/storage/storage.ts

Lines changed: 21 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
import { StorageBackendAdapter, withOptionalVersion } from './backend'
1+
import { StorageBackendAdapter } from './backend'
22
import { Database, FindBucketFilters } from './database'
33
import { ERRORS } from '@internal/errors'
44
import { AssetRenderer, HeadRenderer, ImageRenderer } from './renderer'
55
import { getFileSizeLimit, mustBeValidBucketName, parseFileSizeToBytes } from './limits'
66
import { getConfig } from '../config'
77
import { ObjectStorage } from './object'
88
import { InfoRenderer } from '@storage/renderer/info'
9-
import { ObjectAdminDeleteBatch } from './events'
9+
import { ObjectAdminDeleteAllBefore } from './events'
1010

11-
const { requestUrlLengthLimit, emptyBucketMax } = getConfig()
11+
const { emptyBucketMax } = getConfig()
1212

1313
/**
1414
* Storage
@@ -175,61 +175,34 @@ export class Storage {
175175
/**
176176
* Deletes all files in a bucket
177177
* @param bucketId
178+
* @param before limit to files before the specified time (defaults to now)
178179
*/
179-
async emptyBucket(bucketId: string) {
180+
async emptyBucket(bucketId: string, before: Date = new Date()) {
180181
await this.findBucket(bucketId, 'name')
181182

182183
const count = await this.db.countObjectsInBucket(bucketId, emptyBucketMax + 1)
183184
if (count > emptyBucketMax) {
184185
throw ERRORS.UnableToEmptyBucket(bucketId)
185186
}
186187

187-
while (true) {
188-
const objects = await this.db.listObjects(
189-
bucketId,
190-
'id, name',
191-
Math.floor(requestUrlLengthLimit / (36 + 3))
192-
)
193-
194-
if (!(objects && objects.length > 0)) {
195-
break
196-
}
188+
const objects = await this.db.listObjects(bucketId, 'id, name', 1, before)
189+
if (!objects || objects.length < 1) {
190+
// the bucket is already empty
191+
return
192+
}
197193

198-
const deleted = await this.db.deleteObjects(
199-
bucketId,
200-
objects.map(({ id }) => id!),
201-
'id'
202-
)
203-
204-
if (deleted && deleted.length > 0) {
205-
const prefixes = deleted.reduce((all, { name, version }) => {
206-
const fileName = withOptionalVersion(`${this.db.tenantId}/${bucketId}/${name}`, version)
207-
all.push(fileName)
208-
all.push(fileName + '.info')
209-
return all
210-
}, [] as string[])
211-
// delete files from s3 asynchronously
212-
await ObjectAdminDeleteBatch.send({
213-
prefixes,
214-
bucketId,
215-
tenant: this.db.tenant(),
216-
reqId: this.db.reqId,
217-
})
218-
}
194+
// ensure delete permissions
195+
await this.db.testPermission((db) => {
196+
return db.deleteObject(bucketId, objects[0].id!)
197+
})
219198

220-
if (deleted?.length !== objects.length) {
221-
const deletedNames = new Set(deleted?.map(({ name }) => name))
222-
const remainingNames = objects
223-
.filter(({ name }) => !deletedNames.has(name))
224-
.map(({ name }) => name)
225-
226-
throw ERRORS.AccessDenied(
227-
`Cannot delete: ${remainingNames.join(
228-
' ,'
229-
)}, you may have SELECT but not DELETE permissions`
230-
)
231-
}
232-
}
199+
// use queue to recursively delete all objects created before the specified time
200+
await ObjectAdminDeleteAllBefore.send({
201+
before,
202+
bucketId,
203+
tenant: this.db.tenant(),
204+
reqId: this.db.reqId,
205+
})
233206
}
234207

235208
validateMimeType(mimeType: string[]) {

0 commit comments

Comments
 (0)