-
Notifications
You must be signed in to change notification settings - Fork 2.9k
perf(storage-s3): stream files and abort s3 request from static handler #13430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9cc1b84
d5b5cad
21a38bb
15d0f9c
2dadf5b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,34 +27,34 @@ interface Args { | |
signedDownloads?: SignedDownloadsConfig | ||
} | ||
|
||
// Type guard for NodeJS.Readable streams | ||
const isNodeReadableStream = (body: unknown): body is Readable => { | ||
const isNodeReadableStream = (body: AWS.GetObjectOutput['Body']): body is Readable => { | ||
return ( | ||
typeof body === 'object' && | ||
body !== null && | ||
'pipe' in body && | ||
typeof (body as any).pipe === 'function' && | ||
typeof body.pipe === 'function' && | ||
'destroy' in body && | ||
typeof (body as any).destroy === 'function' | ||
typeof body.destroy === 'function' | ||
) | ||
} | ||
|
||
const destroyStream = (object: AWS.GetObjectOutput | undefined) => { | ||
const abortRequestAndDestroyStream = ({ | ||
abortController, | ||
object, | ||
}: { | ||
abortController: AbortController | ||
object?: AWS.GetObjectOutput | ||
}) => { | ||
try { | ||
abortController.abort() | ||
} catch { | ||
/* noop */ | ||
} | ||
if (object?.Body && isNodeReadableStream(object.Body)) { | ||
object.Body.destroy() | ||
} | ||
} | ||
|
||
// Convert a stream into a promise that resolves with a Buffer | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
const streamToBuffer = async (readableStream: any) => { | ||
const chunks = [] | ||
for await (const chunk of readableStream) { | ||
chunks.push(typeof chunk === 'string' ? Buffer.from(chunk) : chunk) | ||
} | ||
return Buffer.concat(chunks) | ||
} | ||
|
||
export const getHandler = ({ | ||
bucket, | ||
collection, | ||
|
@@ -63,6 +63,15 @@ export const getHandler = ({ | |
}: Args): StaticHandler => { | ||
return async (req, { headers: incomingHeaders, params: { clientUploadContext, filename } }) => { | ||
let object: AWS.GetObjectOutput | undefined = undefined | ||
let streamed = false | ||
|
||
const abortController = new AbortController() | ||
if (req.signal) { | ||
req.signal.addEventListener('abort', () => { | ||
abortRequestAndDestroyStream({ abortController, object }) | ||
}) | ||
} | ||
|
||
try { | ||
const prefix = await getFilePrefix({ clientUploadContext, collection, filename, req }) | ||
|
||
|
@@ -89,10 +98,13 @@ export const getHandler = ({ | |
} | ||
} | ||
|
||
object = await getStorageClient().getObject({ | ||
Bucket: bucket, | ||
Key: key, | ||
}) | ||
object = await getStorageClient().getObject( | ||
{ | ||
Bucket: bucket, | ||
Key: key, | ||
}, | ||
{ abortSignal: abortController.signal }, | ||
) | ||
|
||
if (!object.Body) { | ||
return new Response(null, { status: 404, statusText: 'Not Found' }) | ||
|
@@ -130,33 +142,36 @@ export const getHandler = ({ | |
}) | ||
} | ||
|
||
// On error, manually destroy stream to close socket | ||
if (object.Body && isNodeReadableStream(object.Body)) { | ||
const stream = object.Body | ||
stream.on('error', (err) => { | ||
req.payload.logger.error({ | ||
err, | ||
key, | ||
msg: 'Error streaming S3 object, destroying stream', | ||
}) | ||
stream.destroy() | ||
if (!isNodeReadableStream(object.Body)) { | ||
req.payload.logger.error({ | ||
key, | ||
msg: 'S3 object body is not a readable stream', | ||
}) | ||
return new Response('Internal Server Error', { status: 500 }) | ||
} | ||
|
||
const bodyBuffer = await streamToBuffer(object.Body) | ||
|
||
return new Response(bodyBuffer, { | ||
headers, | ||
status: 200, | ||
const stream = object.Body | ||
stream.on('error', (err) => { | ||
req.payload.logger.error({ | ||
err, | ||
key, | ||
msg: 'Error while streaming S3 object (aborting)', | ||
}) | ||
abortRequestAndDestroyStream({ abortController, object }) | ||
}) | ||
|
||
streamed = true | ||
return new Response(stream, { headers, status: 200 }) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Returning a Node.js Readable stream directly to Response constructor may not work in all environments. Consider checking if the runtime supports streaming responses or provide a fallback mechanism. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should be safe with newer node versions, but not sure about other edge runtimes. |
||
} catch (err) { | ||
if (err instanceof AWS.NoSuchKey) { | ||
return new Response(null, { status: 404, statusText: 'Not Found' }) | ||
} | ||
req.payload.logger.error(err) | ||
return new Response('Internal Server Error', { status: 500 }) | ||
} finally { | ||
destroyStream(object) | ||
if (!streamed) { | ||
abortRequestAndDestroyStream({ abortController, object }) | ||
} | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.