Skip to content

Commit d4a0cd8

Browse files
authored
fix: upload stream monitoring (#562)
1 parent 6bf02c7 commit d4a0cd8

File tree

12 files changed

+288
-172
lines changed

12 files changed

+288
-172
lines changed

src/http/plugins/storage.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ export const storage = fastifyPlugin(
2828
request.backend = storageBackend
2929
request.storage = new Storage(storageBackend, database)
3030
})
31+
32+
fastify.addHook('onClose', async () => {
33+
storageBackend.close()
34+
})
3135
},
3236
{ name: 'storage-init' }
3337
)

src/http/routes/tus/index.ts

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ import {
2727
import { TenantConnection, PubSub } from '@internal/database'
2828
import { S3Store } from '@tus/s3-store'
2929
import { NodeHttpHandler } from '@smithy/node-http-handler'
30-
import { createAgent } from '@storage/backend'
3130
import { ROUTE_OPERATIONS } from '../operations'
31+
import * as https from 'node:https'
32+
import { createAgent } from '@internal/http'
3233

3334
const {
35+
storageS3MaxSockets,
3436
storageS3Bucket,
3537
storageS3Endpoint,
3638
storageS3ForcePathStyle,
@@ -57,9 +59,8 @@ type MultiPartRequest = http.IncomingMessage & {
5759
}
5860
}
5961

60-
function createTusStore() {
62+
function createTusStore(agent: { httpsAgent: https.Agent; httpAgent: http.Agent }) {
6163
if (storageBackendType === 's3') {
62-
const agent = createAgent('s3_tus')
6364
return new S3Store({
6465
partSize: tusPartSize * 1024 * 1024, // Each uploaded part will have ${tusPartSize}MB,
6566
expirationPeriodInMilliseconds: tusUrlExpiryMs,
@@ -84,8 +85,11 @@ function createTusStore() {
8485
})
8586
}
8687

87-
function createTusServer(lockNotifier: LockNotifier) {
88-
const datastore = createTusStore()
88+
function createTusServer(
89+
lockNotifier: LockNotifier,
90+
agent: { httpsAgent: https.Agent; httpAgent: http.Agent }
91+
) {
92+
const datastore = createTusStore(agent)
8993
const serverOptions: ServerOptions & {
9094
datastore: DataStore
9195
} = {
@@ -139,7 +143,16 @@ export default async function routes(fastify: FastifyInstance) {
139143
const lockNotifier = new LockNotifier(PubSub)
140144
await lockNotifier.subscribe()
141145

142-
const tusServer = createTusServer(lockNotifier)
146+
const agent = createAgent('s3_tus', {
147+
maxSockets: storageS3MaxSockets,
148+
})
149+
agent.monitor()
150+
151+
fastify.addHook('onClose', () => {
152+
agent.close()
153+
})
154+
155+
const tusServer = createTusServer(lockNotifier, agent)
143156

144157
// authenticated routes
145158
fastify.register(async (fastify) => {

src/internal/concurrency/stream.ts

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,11 @@
11
import { Transform, TransformCallback } from 'stream'
22

3-
interface ByteCounterStreamOptions {
4-
maxHistory?: number
5-
onMaxHistory?: (history: Date[]) => void
6-
rewriteHistoryOnMax?: boolean
7-
}
8-
9-
export const createByteCounterStream = (options: ByteCounterStreamOptions) => {
10-
const { maxHistory = 100 } = options
11-
3+
export const createByteCounterStream = () => {
124
let bytes = 0
13-
let history: Date[] = []
145

156
const transformStream = new Transform({
167
transform(chunk: Buffer, encoding: string, callback: TransformCallback) {
178
bytes += chunk.length
18-
history.push(new Date())
19-
20-
if (history.length === maxHistory) {
21-
if (options.rewriteHistoryOnMax) {
22-
options.onMaxHistory?.(history)
23-
history = []
24-
}
25-
}
26-
279
callback(null, chunk)
2810
},
2911
})
@@ -33,8 +15,5 @@ export const createByteCounterStream = (options: ByteCounterStreamOptions) => {
3315
get bytes() {
3416
return bytes
3517
},
36-
get history() {
37-
return history
38-
},
3918
}
4019
}

src/internal/http/agent.ts

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import Agent, { HttpsAgent } from 'agentkeepalive'
2+
import {
3+
HttpPoolErrorGauge,
4+
HttpPoolFreeSocketsGauge,
5+
HttpPoolPendingRequestsGauge,
6+
HttpPoolSocketsGauge,
7+
} from '@internal/monitoring/metrics'
8+
import { getConfig } from '../../config'
9+
10+
const { region } = getConfig()
11+
12+
export interface InstrumentedAgent {
13+
httpAgent: Agent
14+
httpsAgent: HttpsAgent
15+
monitor: () => NodeJS.Timeout | undefined
16+
close: () => void
17+
}
18+
19+
export interface AgentStats {
20+
busySocketCount: number
21+
freeSocketCount: number
22+
pendingRequestCount: number
23+
errorSocketCount: number
24+
timeoutSocketCount: number
25+
createSocketErrorCount: number
26+
}
27+
28+
/**
29+
* Creates an instrumented agent
30+
* Adding prometheus metrics to the agent
31+
*/
32+
export function createAgent(name: string, options: { maxSockets: number }): InstrumentedAgent {
33+
const agentOptions = {
34+
maxSockets: options.maxSockets,
35+
keepAlive: true,
36+
keepAliveMsecs: 1000,
37+
freeSocketTimeout: 1000 * 15,
38+
}
39+
40+
const httpAgent = new Agent(agentOptions)
41+
const httpsAgent = new HttpsAgent(agentOptions)
42+
let watcher: NodeJS.Timeout | undefined = undefined
43+
44+
return {
45+
httpAgent,
46+
httpsAgent,
47+
monitor: () => {
48+
const agent = watchAgent(name, 'https', httpsAgent)
49+
watcher = agent
50+
return agent
51+
},
52+
close: () => {
53+
if (watcher) {
54+
clearInterval(watcher)
55+
}
56+
},
57+
}
58+
}
59+
60+
/**
61+
* Metrics
62+
*
63+
* HttpPoolSockets
64+
* HttpPoolFreeSockets
65+
* HttpPoolPendingRequests
66+
* HttpPoolError
67+
*
68+
* @param name
69+
* @param protocol
70+
* @param stats
71+
*/
72+
function updateHttpAgentMetrics(name: string, protocol: string, stats: AgentStats) {
73+
// Update the metrics with calculated values
74+
HttpPoolSocketsGauge.set({ name, region, protocol }, stats.busySocketCount)
75+
HttpPoolFreeSocketsGauge.set({ name, region, protocol }, stats.freeSocketCount)
76+
HttpPoolPendingRequestsGauge.set({ name, region }, stats.pendingRequestCount)
77+
HttpPoolErrorGauge.set({ name, region, type: 'socket_error', protocol }, stats.errorSocketCount)
78+
HttpPoolErrorGauge.set(
79+
{ name, region, type: 'timeout_socket_error', protocol },
80+
stats.timeoutSocketCount
81+
)
82+
HttpPoolErrorGauge.set(
83+
{ name, region, type: 'create_socket_error', protocol },
84+
stats.createSocketErrorCount
85+
)
86+
}
87+
88+
export function watchAgent(name: string, protocol: 'http' | 'https', agent: Agent | HttpsAgent) {
89+
return setInterval(() => {
90+
const httpStatus = agent.getCurrentStatus()
91+
92+
const httpStats = gatherHttpAgentStats(httpStatus)
93+
94+
updateHttpAgentMetrics(name, protocol, httpStats)
95+
}, 5000)
96+
}
97+
98+
// Function to update Prometheus metrics based on the current status of the agent
99+
export function gatherHttpAgentStats(status: Agent.AgentStatus) {
100+
// Calculate the number of busy sockets by iterating over the `sockets` object
101+
let busySocketCount = 0
102+
for (const host in status.sockets) {
103+
if (status.sockets.hasOwnProperty(host)) {
104+
busySocketCount += status.sockets[host]
105+
}
106+
}
107+
108+
// Calculate the number of free sockets by iterating over the `freeSockets` object
109+
let freeSocketCount = 0
110+
for (const host in status.freeSockets) {
111+
if (status.freeSockets.hasOwnProperty(host)) {
112+
freeSocketCount += status.freeSockets[host]
113+
}
114+
}
115+
116+
// Calculate the number of pending requests by iterating over the `requests` object
117+
let pendingRequestCount = 0
118+
for (const host in status.requests) {
119+
if (status.requests.hasOwnProperty(host)) {
120+
pendingRequestCount += status.requests[host]
121+
}
122+
}
123+
124+
return {
125+
busySocketCount,
126+
freeSocketCount,
127+
pendingRequestCount,
128+
errorSocketCount: status.errorSocketCount,
129+
timeoutSocketCount: status.timeoutSocketCount,
130+
createSocketErrorCount: status.createSocketErrorCount,
131+
}
132+
}

src/internal/http/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './agent'

src/internal/queue/event.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,14 @@ export class Event<T extends Omit<BasePayload, '$version'>> {
7878
return this.queueName + '-slow'
7979
}
8080

81+
static onClose() {
82+
// no-op
83+
}
84+
85+
static onStart() {
86+
// no-op
87+
}
88+
8189
static batchSend<T extends Event<any>[]>(messages: T) {
8290
return Queue.getInstance().insert(
8391
messages.map((message) => {

src/internal/queue/queue.ts

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ export abstract class Queue {
8686
opts.registerWorkers()
8787
}
8888

89+
await Queue.callStart()
8990
await Queue.startWorkers(opts.onMessage)
9091

9192
if (opts.signal) {
@@ -96,7 +97,8 @@ export abstract class Queue {
9697
type: 'queue',
9798
})
9899
return Queue.stop()
99-
.then(() => {
100+
.then(async () => {
101+
await Queue.callClose()
100102
logSchema.info(logger, '[Queue] Exited', {
101103
type: 'queue',
102104
})
@@ -142,7 +144,8 @@ export abstract class Queue {
142144
})
143145

144146
await new Promise((resolve) => {
145-
boss.once('stopped', () => {
147+
boss.once('stopped', async () => {
148+
await this.callClose()
146149
resolve(null)
147150
})
148151
})
@@ -166,6 +169,22 @@ export abstract class Queue {
166169
return Promise.all(workers)
167170
}
168171

172+
protected static callStart() {
173+
const events = Queue.events.map((event) => {
174+
return event.onStart()
175+
})
176+
177+
return Promise.all(events)
178+
}
179+
180+
protected static callClose() {
181+
const events = Queue.events.map((event) => {
182+
return event.onClose()
183+
})
184+
185+
return Promise.all(events)
186+
}
187+
169188
protected static registerTask(
170189
queueName: string,
171190
event: SubclassOfBaseClass,

src/scripts/export-docs.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
import { promises as fs } from 'fs'
22
import app from '../app'
33
;(async () => {
4-
const response = await app({
4+
const storageApp = app({
55
exposeDocs: true,
6-
}).inject({
6+
})
7+
8+
const response = await storageApp.inject({
79
method: 'GET',
810
url: '/documentation/json',
911
})
1012

1113
await fs.writeFile('static/api.json', response.body)
12-
process.exit(0)
14+
15+
await storageApp.close()
1316
})()

src/storage/backend/adapter.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,10 @@ export abstract class StorageBackendAdapter {
216216
): Promise<{ eTag?: string; lastModified?: Date }> {
217217
throw new Error('not implemented')
218218
}
219+
220+
close(): void {
221+
// do nothing
222+
}
219223
}
220224

221225
const { tusUseFileVersionSeparator } = getConfig()

src/storage/backend/file.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,10 @@ export class FileBackend implements StorageBackendAdapter {
464464
])
465465
}
466466

467+
close() {
468+
// no-op
469+
}
470+
467471
protected async getFileMetadata(file: string) {
468472
const platform = process.platform == 'darwin' ? 'darwin' : 'linux'
469473
const [cacheControl, contentType] = await Promise.all([

0 commit comments

Comments
 (0)