Skip to content

Commit 36fd6e9

Browse files
perf(storage-s3): stream files and abort s3 request from static handler (#13430)
### What? Stream S3 object directly to response instead of creating a Buffer in memory and wire up an abort controller to stop streaming if user aborts download ### Why? To avoid excessive memory usage and to abort s3 download if user has aborted the request anyway. ### How? In node environment the AWS S3 always returns a Readable. The streamToBuffer method always required this, but the any type hided that this was actually needed. Now there is an explicit type check, but this should never trigger in a node server environment. Wire up and abort controller to the request so that we tell the S3 object to also stop streaming further if the user aborts. Fixes #10286 Maybe also helps on other issues with s3 and resource usage
1 parent c67ceca commit 36fd6e9

File tree

1 file changed

+50
-35
lines changed

1 file changed

+50
-35
lines changed

packages/storage-s3/src/staticHandler.ts

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -27,34 +27,34 @@ interface Args {
2727
signedDownloads?: SignedDownloadsConfig
2828
}
2929

30-
// Type guard for NodeJS.Readable streams
31-
const isNodeReadableStream = (body: unknown): body is Readable => {
30+
const isNodeReadableStream = (body: AWS.GetObjectOutput['Body']): body is Readable => {
3231
return (
3332
typeof body === 'object' &&
3433
body !== null &&
3534
'pipe' in body &&
36-
typeof (body as any).pipe === 'function' &&
35+
typeof body.pipe === 'function' &&
3736
'destroy' in body &&
38-
typeof (body as any).destroy === 'function'
37+
typeof body.destroy === 'function'
3938
)
4039
}
4140

42-
const destroyStream = (object: AWS.GetObjectOutput | undefined) => {
41+
const abortRequestAndDestroyStream = ({
42+
abortController,
43+
object,
44+
}: {
45+
abortController: AbortController
46+
object?: AWS.GetObjectOutput
47+
}) => {
48+
try {
49+
abortController.abort()
50+
} catch {
51+
/* noop */
52+
}
4353
if (object?.Body && isNodeReadableStream(object.Body)) {
4454
object.Body.destroy()
4555
}
4656
}
4757

48-
// Convert a stream into a promise that resolves with a Buffer
49-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
50-
const streamToBuffer = async (readableStream: any) => {
51-
const chunks = []
52-
for await (const chunk of readableStream) {
53-
chunks.push(typeof chunk === 'string' ? Buffer.from(chunk) : chunk)
54-
}
55-
return Buffer.concat(chunks)
56-
}
57-
5858
export const getHandler = ({
5959
bucket,
6060
collection,
@@ -63,6 +63,15 @@ export const getHandler = ({
6363
}: Args): StaticHandler => {
6464
return async (req, { headers: incomingHeaders, params: { clientUploadContext, filename } }) => {
6565
let object: AWS.GetObjectOutput | undefined = undefined
66+
let streamed = false
67+
68+
const abortController = new AbortController()
69+
if (req.signal) {
70+
req.signal.addEventListener('abort', () => {
71+
abortRequestAndDestroyStream({ abortController, object })
72+
})
73+
}
74+
6675
try {
6776
const prefix = await getFilePrefix({ clientUploadContext, collection, filename, req })
6877

@@ -89,10 +98,13 @@ export const getHandler = ({
8998
}
9099
}
91100

92-
object = await getStorageClient().getObject({
93-
Bucket: bucket,
94-
Key: key,
95-
})
101+
object = await getStorageClient().getObject(
102+
{
103+
Bucket: bucket,
104+
Key: key,
105+
},
106+
{ abortSignal: abortController.signal },
107+
)
96108

97109
if (!object.Body) {
98110
return new Response(null, { status: 404, statusText: 'Not Found' })
@@ -130,33 +142,36 @@ export const getHandler = ({
130142
})
131143
}
132144

133-
// On error, manually destroy stream to close socket
134-
if (object.Body && isNodeReadableStream(object.Body)) {
135-
const stream = object.Body
136-
stream.on('error', (err) => {
137-
req.payload.logger.error({
138-
err,
139-
key,
140-
msg: 'Error streaming S3 object, destroying stream',
141-
})
142-
stream.destroy()
145+
if (!isNodeReadableStream(object.Body)) {
146+
req.payload.logger.error({
147+
key,
148+
msg: 'S3 object body is not a readable stream',
143149
})
150+
return new Response('Internal Server Error', { status: 500 })
144151
}
145152

146-
const bodyBuffer = await streamToBuffer(object.Body)
147-
148-
return new Response(bodyBuffer, {
149-
headers,
150-
status: 200,
153+
const stream = object.Body
154+
stream.on('error', (err) => {
155+
req.payload.logger.error({
156+
err,
157+
key,
158+
msg: 'Error while streaming S3 object (aborting)',
159+
})
160+
abortRequestAndDestroyStream({ abortController, object })
151161
})
162+
163+
streamed = true
164+
return new Response(stream, { headers, status: 200 })
152165
} catch (err) {
153166
if (err instanceof AWS.NoSuchKey) {
154167
return new Response(null, { status: 404, statusText: 'Not Found' })
155168
}
156169
req.payload.logger.error(err)
157170
return new Response('Internal Server Error', { status: 500 })
158171
} finally {
159-
destroyStream(object)
172+
if (!streamed) {
173+
abortRequestAndDestroyStream({ abortController, object })
174+
}
160175
}
161176
}
162177
}

0 commit comments

Comments
 (0)