Skip to content

Commit 7dc119c

Browse files
authored
Add support for HTTP/1 connection pre-warming (#856)
Motivation This patch adds support for HTTP/1 connection pre-warming. This allows the user to request that the HTTP/1 connection pool create and maintain extra connections, above-and-beyond those strictly needed to run the pool. This pool can be used to absorb small spikes in request traffic without increasing latency to account for connection creation. Modifications - Added new configuration properties for pre-warmed connections. - Amended the HTTP/1 state machine to create new connections where necessary. - Added state machine tests. Results Pre-warmed connections are available.
1 parent 254d340 commit 7dc119c

9 files changed

+993
-102
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ final class HTTPConnectionPool:
7979
.concurrentHTTP1ConnectionsPerHostSoftLimit,
8080
retryConnectionEstablishment: clientConfiguration.connectionPool.retryConnectionEstablishment,
8181
preferHTTP1: clientConfiguration.httpVersion == .http1Only,
82-
maximumConnectionUses: clientConfiguration.maximumUsesPerConnection
82+
maximumConnectionUses: clientConfiguration.maximumUsesPerConnection,
83+
preWarmedHTTP1ConnectionCount: clientConfiguration.connectionPool.preWarmedHTTP1ConnectionCount
8384
)
8485
}
8586

@@ -104,6 +105,11 @@ final class HTTPConnectionPool:
104105
enum Unlocked {
105106
case createConnection(Connection.ID, on: EventLoop)
106107
case closeConnection(Connection, isShutdown: StateMachine.ConnectionAction.IsShutdown)
108+
case closeConnectionAndCreateConnection(
109+
close: Connection,
110+
newConnectionID: Connection.ID,
111+
on: EventLoop
112+
)
107113
case cleanupConnections(CleanupContext, isShutdown: StateMachine.ConnectionAction.IsShutdown)
108114
case migration(
109115
createConnections: [(Connection.ID, EventLoop)],
@@ -185,12 +191,27 @@ final class HTTPConnectionPool:
185191
self.locked.connection = .scheduleBackoffTimer(connectionID, backoff: backoff, on: eventLoop)
186192
case .scheduleTimeoutTimer(let connectionID, on: let eventLoop):
187193
self.locked.connection = .scheduleTimeoutTimer(connectionID, on: eventLoop)
194+
case .scheduleTimeoutTimerAndCreateConnection(let timeoutID, let newConnectionID, let eventLoop):
195+
self.locked.connection = .scheduleTimeoutTimer(timeoutID, on: eventLoop)
196+
self.unlocked.connection = .createConnection(newConnectionID, on: eventLoop)
188197
case .cancelTimeoutTimer(let connectionID):
189198
self.locked.connection = .cancelTimeoutTimer(connectionID)
199+
case .createConnectionAndCancelTimeoutTimer(let createdID, on: let eventLoop, cancelTimerID: let cancelID):
200+
self.unlocked.connection = .createConnection(createdID, on: eventLoop)
201+
self.locked.connection = .cancelTimeoutTimer(cancelID)
190202
case .closeConnection(let connection, let isShutdown):
191203
self.unlocked.connection = .closeConnection(connection, isShutdown: isShutdown)
204+
case .closeConnectionAndCreateConnection(
205+
let closeConnection,
206+
let newConnectionID,
207+
let eventLoop
208+
):
209+
self.unlocked.connection = .closeConnectionAndCreateConnection(
210+
close: closeConnection,
211+
newConnectionID: newConnectionID,
212+
on: eventLoop
213+
)
192214
case .cleanupConnections(var cleanupContext, let isShutdown):
193-
//
194215
self.locked.connection = .cancelBackoffTimers(cleanupContext.connectBackoff)
195216
cleanupContext.connectBackoff = []
196217
self.unlocked.connection = .cleanupConnections(cleanupContext, isShutdown: isShutdown)
@@ -287,6 +308,23 @@ final class HTTPConnectionPool:
287308
self.delegate.connectionPoolDidShutdown(self, unclean: unclean)
288309
}
289310

311+
case .closeConnectionAndCreateConnection(
312+
let connectionToClose,
313+
let newConnectionID,
314+
let eventLoop
315+
):
316+
self.logger.trace(
317+
"closing and creating connection",
318+
metadata: [
319+
"ahc-connection-id": "\(connectionToClose.id)"
320+
]
321+
)
322+
323+
self.createConnection(newConnectionID, on: eventLoop)
324+
325+
// we are not interested in the close promise...
326+
connectionToClose.close(promise: nil)
327+
290328
case .cleanupConnections(let cleanupContext, let isShutdown):
291329
for connection in cleanupContext.close {
292330
connection.close(promise: nil)
@@ -400,7 +438,7 @@ final class HTTPConnectionPool:
400438
self.modifyStateAndRunActions { stateMachine in
401439
if self._idleTimer.removeValue(forKey: connectionID) != nil {
402440
// The timer still exists. State Machines assumes it is alive
403-
return stateMachine.connectionIdleTimeout(connectionID)
441+
return stateMachine.connectionIdleTimeout(connectionID, on: eventLoop)
404442
}
405443
return .none
406444
}

Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -262,32 +262,30 @@ extension HTTPConnectionPool {
262262
private var overflowIndex: Array<HTTP1ConnectionState>.Index
263263
/// The number of times each connection can be used before it is closed and replaced.
264264
private let maximumConnectionUses: Int?
265-
266-
init(maximumConcurrentConnections: Int, generator: Connection.ID.Generator, maximumConnectionUses: Int?) {
265+
/// How many pre-warmed connections we should create.
266+
private let preWarmedConnectionCount: Int
267+
268+
init(
269+
maximumConcurrentConnections: Int,
270+
generator: Connection.ID.Generator,
271+
maximumConnectionUses: Int?,
272+
preWarmedHTTP1ConnectionCount: Int
273+
) {
267274
self.connections = []
268275
self.connections.reserveCapacity(min(maximumConcurrentConnections, 1024))
269276
self.overflowIndex = self.connections.endIndex
270277
self.maximumConcurrentConnections = maximumConcurrentConnections
271278
self.generator = generator
272279
self.maximumConnectionUses = maximumConnectionUses
280+
self.preWarmedConnectionCount = preWarmedHTTP1ConnectionCount
273281
}
274282

275283
var stats: Stats {
276-
var stats = Stats()
277-
// all additions here can be unchecked, since we will have at max self.connections.count
278-
// which itself is an Int. For this reason we will never overflow.
279-
for connectionState in self.connections {
280-
if connectionState.isConnecting {
281-
stats.connecting &+= 1
282-
} else if connectionState.isBackingOff {
283-
stats.backingOff &+= 1
284-
} else if connectionState.isLeased {
285-
stats.leased &+= 1
286-
} else if connectionState.isIdle {
287-
stats.idle &+= 1
288-
}
289-
}
290-
return stats
284+
self.connectionStats(in: self.connections.startIndex..<self.connections.endIndex)
285+
}
286+
287+
var generalPurposeStats: Stats {
288+
self.connectionStats(in: self.connections.startIndex..<self.overflowIndex)
291289
}
292290

293291
var isEmpty: Bool {
@@ -328,6 +326,24 @@ extension HTTPConnectionPool {
328326
}
329327
}
330328

329+
private func connectionStats(in range: Range<Int>) -> Stats {
330+
var stats = Stats()
331+
// all additions here can be unchecked, since we will have at max self.connections.count
332+
// which itself is an Int. For this reason we will never overflow.
333+
for connectionState in self.connections[range] {
334+
if connectionState.isConnecting {
335+
stats.connecting &+= 1
336+
} else if connectionState.isBackingOff {
337+
stats.backingOff &+= 1
338+
} else if connectionState.isLeased {
339+
stats.leased &+= 1
340+
} else if connectionState.isIdle {
341+
stats.idle &+= 1
342+
}
343+
}
344+
return stats
345+
}
346+
331347
// MARK: - Mutations -
332348

333349
/// A connection's use. Did it serve in the pool or was it specialized for an `EventLoop`?
@@ -836,6 +852,10 @@ extension HTTPConnectionPool {
836852
var leased: Int = 0
837853
var connecting: Int = 0
838854
var backingOff: Int = 0
855+
856+
var nonLeased: Int {
857+
self.idle + self.connecting + self.backingOff
858+
}
839859
}
840860
}
841861
}

Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift

Lines changed: 119 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,23 @@ extension HTTPConnectionPool {
3333
/// The property was introduced to fail fast during testing.
3434
/// Otherwise this should always be true and not turned off.
3535
private let retryConnectionEstablishment: Bool
36+
private let preWarmedConnectionCount: Int
3637

3738
init(
3839
idGenerator: Connection.ID.Generator,
3940
maximumConcurrentConnections: Int,
4041
retryConnectionEstablishment: Bool,
4142
maximumConnectionUses: Int?,
43+
preWarmedHTTP1ConnectionCount: Int,
4244
lifecycleState: StateMachine.LifecycleState
4345
) {
4446
self.connections = HTTP1Connections(
4547
maximumConcurrentConnections: maximumConcurrentConnections,
4648
generator: idGenerator,
47-
maximumConnectionUses: maximumConnectionUses
49+
maximumConnectionUses: maximumConnectionUses,
50+
preWarmedHTTP1ConnectionCount: preWarmedHTTP1ConnectionCount
4851
)
52+
self.preWarmedConnectionCount = preWarmedHTTP1ConnectionCount
4953
self.retryConnectionEstablishment = retryConnectionEstablishment
5054

5155
self.requests = RequestQueue()
@@ -145,9 +149,26 @@ extension HTTPConnectionPool {
145149

146150
private mutating func executeRequestOnPreferredEventLoop(_ request: Request, eventLoop: EventLoop) -> Action {
147151
if let connection = self.connections.leaseConnection(onPreferred: eventLoop) {
152+
// Cool, a connection is available. If using this would put us below our needed extra set, we
153+
// should create another.
154+
let stats = self.connections.generalPurposeStats
155+
let needExtraConnection =
156+
stats.nonLeased < (self.requests.count + self.preWarmedConnectionCount) && self.connections.canGrow
157+
let action: StateMachine.ConnectionAction
158+
159+
if needExtraConnection {
160+
action = .createConnectionAndCancelTimeoutTimer(
161+
createdID: self.connections.createNewConnection(on: eventLoop),
162+
on: eventLoop,
163+
cancelTimerID: connection.id
164+
)
165+
} else {
166+
action = .cancelTimeoutTimer(connection.id)
167+
}
168+
148169
return .init(
149170
request: .executeRequest(request, connection, cancelTimeout: false),
150-
connection: .cancelTimeoutTimer(connection.id)
171+
connection: action
151172
)
152173
}
153174

@@ -294,7 +315,20 @@ extension HTTPConnectionPool {
294315
}
295316
}
296317

297-
mutating func connectionIdleTimeout(_ connectionID: Connection.ID) -> Action {
318+
mutating func connectionIdleTimeout(_ connectionID: Connection.ID, on eventLoop: any EventLoop) -> Action {
319+
// Don't close idle connections if we need pre-warmed connections. Instead, re-arm the idle timer.
320+
// We still want the idle timers to make sure we eventually fall below the pre-warmed limit.
321+
if self.preWarmedConnectionCount > 0 {
322+
let stats = self.connections.generalPurposeStats
323+
if stats.idle <= self.preWarmedConnectionCount {
324+
return .init(
325+
request: .none,
326+
connection: .scheduleTimeoutTimer(connectionID, on: eventLoop)
327+
)
328+
}
329+
}
330+
331+
// Ok, we do actually want the connection count to go down.
298332
guard let connection = self.connections.closeConnectionIfIdle(connectionID) else {
299333
// because of a race this connection (connection close runs against trigger of timeout)
300334
// was already removed from the state machine.
@@ -410,11 +444,7 @@ extension HTTPConnectionPool {
410444
case .running:
411445
// Close the connection if it's expired.
412446
if context.shouldBeClosed {
413-
let connection = self.connections.closeConnection(at: index)
414-
return .init(
415-
request: .none,
416-
connection: .closeConnection(connection, isShutdown: .no)
417-
)
447+
return self.nextActionForToBeClosedIdleConnection(at: index, context: context)
418448
} else {
419449
switch context.use {
420450
case .generalPurpose:
@@ -446,28 +476,63 @@ extension HTTPConnectionPool {
446476
at index: Int,
447477
context: HTTP1Connections.IdleConnectionContext
448478
) -> EstablishedAction {
479+
var requestAction = HTTPConnectionPool.StateMachine.RequestAction.none
480+
var parkedConnectionDetails: (HTTPConnectionPool.Connection.ID, any EventLoop)? = nil
481+
449482
// 1. Check if there are waiting requests in the general purpose queue
450483
if let request = self.requests.popFirst(for: nil) {
451-
return .init(
452-
request: .executeRequest(request, self.connections.leaseConnection(at: index), cancelTimeout: true),
453-
connection: .none
484+
requestAction = .executeRequest(
485+
request,
486+
self.connections.leaseConnection(at: index),
487+
cancelTimeout: true
454488
)
455489
}
456490

457491
// 2. Check if there are waiting requests in the matching eventLoop queue
458-
if let request = self.requests.popFirst(for: context.eventLoop) {
459-
return .init(
460-
request: .executeRequest(request, self.connections.leaseConnection(at: index), cancelTimeout: true),
461-
connection: .none
492+
if case .none = requestAction, let request = self.requests.popFirst(for: context.eventLoop) {
493+
requestAction = .executeRequest(
494+
request,
495+
self.connections.leaseConnection(at: index),
496+
cancelTimeout: true
462497
)
463498
}
464499

465500
// 3. Create a timeout timer to ensure the connection is closed if it is idle for too
466-
// long.
467-
let (connectionID, eventLoop) = self.connections.parkConnection(at: index)
501+
// long, assuming we don't already have a use for it.
502+
if case .none = requestAction {
503+
parkedConnectionDetails = self.connections.parkConnection(at: index)
504+
}
505+
506+
// 4. We may need to create another connection to make sure we have enough pre-warmed ones.
507+
// We need to do that if we have fewer non-leased connections than we need pre-warmed ones _and_ the pool can grow.
508+
// Note that in this case we don't need to account for the number of pending requests, as that is 0: step 1
509+
// confirmed that.
510+
let connectionAction: EstablishedConnectionAction
511+
512+
if self.connections.generalPurposeStats.nonLeased < self.preWarmedConnectionCount
513+
&& self.connections.canGrow
514+
{
515+
// Re-use the event loop of the connection that just got created.
516+
if let parkedConnectionDetails {
517+
let newConnectionID = self.connections.createNewConnection(on: parkedConnectionDetails.1)
518+
connectionAction = .scheduleTimeoutTimerAndCreateConnection(
519+
timeoutID: parkedConnectionDetails.0,
520+
newConnectionID: newConnectionID,
521+
on: parkedConnectionDetails.1
522+
)
523+
} else {
524+
let newConnectionID = self.connections.createNewConnection(on: context.eventLoop)
525+
connectionAction = .createConnection(connectionID: newConnectionID, on: context.eventLoop)
526+
}
527+
} else if let parkedConnectionDetails {
528+
connectionAction = .scheduleTimeoutTimer(parkedConnectionDetails.0, on: parkedConnectionDetails.1)
529+
} else {
530+
connectionAction = .none
531+
}
532+
468533
return .init(
469-
request: .none,
470-
connection: .scheduleTimeoutTimer(connectionID, on: eventLoop)
534+
request: requestAction,
535+
connection: connectionAction
471536
)
472537
}
473538

@@ -495,6 +560,37 @@ extension HTTPConnectionPool {
495560
)
496561
}
497562

563+
private mutating func nextActionForToBeClosedIdleConnection(
564+
at index: Int,
565+
context: HTTP1Connections.IdleConnectionContext
566+
) -> EstablishedAction {
567+
// Step 1: Tell the connection pool to drop what it knows about this object.
568+
let connectionToClose = self.connections.closeConnection(at: index)
569+
570+
// Step 2: Check whether we need a connection to replace this one. We do if we have fewer non-leased connections
571+
// than we requests + minimumPrewarming count _and_ the pool can grow. Note that in many cases the above closure
572+
// will have made some space, which is just fine.
573+
let nonLeased = self.connections.generalPurposeStats.nonLeased
574+
let neededNonLeased = self.requests.generalPurposeCount + self.preWarmedConnectionCount
575+
576+
let connectionAction: EstablishedConnectionAction
577+
if nonLeased < neededNonLeased && self.connections.canGrow {
578+
// We re-use the EL of the connection we just closed.
579+
let newConnectionID = self.connections.createNewConnection(on: connectionToClose.eventLoop)
580+
connectionAction = .closeConnectionAndCreateConnection(
581+
closeConnection: connectionToClose,
582+
newConnectionID: newConnectionID,
583+
on: connectionToClose.eventLoop
584+
)
585+
} else {
586+
connectionAction = .closeConnection(connectionToClose, isShutdown: .no)
587+
}
588+
return .init(
589+
request: .none,
590+
connection: connectionAction
591+
)
592+
}
593+
498594
// MARK: Failed/Closed connection management
499595

500596
private mutating func nextActionForFailedConnection(
@@ -530,7 +626,10 @@ extension HTTPConnectionPool {
530626
at index: Int,
531627
context: HTTP1Connections.FailedConnectionContext
532628
) -> Action {
533-
if context.connectionsStartingForUseCase < self.requests.generalPurposeCount {
629+
let needConnectionForRequest =
630+
context.connectionsStartingForUseCase
631+
< (self.requests.generalPurposeCount + self.preWarmedConnectionCount)
632+
if needConnectionForRequest {
534633
// if we have more requests queued up, than we have starting connections, we should
535634
// create a new connection
536635
let (newConnectionID, newEventLoop) = self.connections.replaceConnection(at: index)

0 commit comments

Comments
 (0)