From 223f44a3961ca52c9ab954a7b3345ac30cea7363 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Mon, 28 Apr 2025 17:25:07 +0100 Subject: [PATCH 1/4] Make body stream writer sendable Motivation: The body stream writer can be sent across isolation domains so should be sendable. Modifications: - Make it explicitly sendable - Add appropriate preconcurrency annotations - Wrap an iterator from swift-algorithms as it hasn't yet annotated its types with Sendable Result: Body stream writer is sendable --- Sources/AsyncHTTPClient/HTTPHandler.swift | 47 +++++++++++++++++------ 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index 95bdaeaed..e3130f2fb 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -32,14 +32,15 @@ extension HTTPClient { /// A streaming uploader. /// /// ``StreamWriter`` abstracts - public struct StreamWriter { - let closure: (IOData) -> EventLoopFuture + public struct StreamWriter: Sendable { + let closure: @Sendable (IOData) -> EventLoopFuture /// Create new ``HTTPClient/Body/StreamWriter`` /// /// - parameters: /// - closure: function that will be called to write actual bytes to the channel. - public init(closure: @escaping (IOData) -> EventLoopFuture) { + @preconcurrency + public init(closure: @escaping @Sendable (IOData) -> EventLoopFuture) { self.closure = closure } @@ -55,15 +56,15 @@ extension HTTPClient { func writeChunks( of bytes: Bytes, maxChunkSize: Int - ) -> EventLoopFuture where Bytes.Element == UInt8 { - // `StreamWriter` is has design issues, for example + ) -> EventLoopFuture where Bytes.Element == UInt8, Bytes: Sendable, Bytes.SubSequence: Sendable { + // `StreamWriter` has design issues, for example // - https://github.com/swift-server/async-http-client/issues/194 // - https://github.com/swift-server/async-http-client/issues/264 // - We're not told the EventLoop the task runs on and the user is free to return whatever EL they // want. // One important consideration then is that we must lock around the iterator because we could be hopping // between threads. - typealias Iterator = EnumeratedSequence>.Iterator + typealias Iterator = BodyStreamIterator typealias Chunk = (offset: Int, element: ChunksOfCountCollection.Element) func makeIteratorAndFirstChunk( @@ -77,7 +78,7 @@ extension HTTPClient { return nil } - return (NIOLockedValueBox(iterator), chunk) + return (NIOLockedValueBox(BodyStreamIterator(iterator)), chunk) } guard let (iterator, chunk) = makeIteratorAndFirstChunk(bytes: bytes) else { @@ -86,9 +87,8 @@ extension HTTPClient { @Sendable // can't use closure here as we recursively call ourselves which closures can't do func writeNextChunk(_ chunk: Chunk, allDone: EventLoopPromise) { - if let nextElement = iterator.withLockedValue({ $0.next() }) { + if let (index, element) = iterator.withLockedValue({ $0.next() }) { self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).map { - let index = nextElement.offset if (index + 1) % 4 == 0 { // Let's not stack-overflow if the futures insta-complete which they at least in HTTP/2 // mode. @@ -96,10 +96,10 @@ extension HTTPClient { // from another thread. If we fail to do that promptly, we may balloon our body chunks // into memory. allDone.futureResult.eventLoop.execute { - writeNextChunk(nextElement, allDone: allDone) + writeNextChunk((offset: index, element: element), allDone: allDone) } } else { - writeNextChunk(nextElement, allDone: allDone) + writeNextChunk((offset: index, element: element), allDone: allDone) } }.cascadeFailure(to: allDone) } else { @@ -188,7 +188,7 @@ extension HTTPClient { @preconcurrency @inlinable public static func bytes(_ bytes: Bytes) -> Body - where Bytes: RandomAccessCollection, Bytes: Sendable, Bytes.Element == UInt8 { + where Bytes: RandomAccessCollection, Bytes: Sendable, Bytes.SubSequence: Sendable, Bytes.Element == UInt8 { Body(contentLength: Int64(bytes.count)) { writer in if bytes.count <= bagOfBytesToByteBufferConversionChunkSize { return writer.write(.byteBuffer(ByteBuffer(bytes: bytes))) @@ -1080,3 +1080,26 @@ extension RequestBodyLength { self = .known(length) } } + +@usableFromInline +struct BodyStreamIterator< + Bytes: Collection +>: IteratorProtocol, @unchecked Sendable where Bytes.Element == UInt8, Bytes: Sendable { + // @unchecked: swift-algorithms hasn't adopted Sendable yet. By inspection, the iterator + // is safe to annotate as sendable. + @usableFromInline + typealias Element = (offset: Int, element: Bytes.SubSequence) + + @usableFromInline + var _backing: EnumeratedSequence>.Iterator + + @inlinable + init(_ backing: EnumeratedSequence>.Iterator) { + self._backing = backing + } + + @inlinable + mutating func next() -> Element? { + self._backing.next() + } +} From a86f58787c8bf64f9033366fbd72e772a0aa73e1 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Tue, 29 Apr 2025 10:43:44 +0100 Subject: [PATCH 2/4] add preconcurrency --- Sources/AsyncHTTPClient/HTTPHandler.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index e3130f2fb..da7deffae 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -53,6 +53,7 @@ extension HTTPClient { } @inlinable + @preconcurrency func writeChunks( of bytes: Bytes, maxChunkSize: Int From 41e53ad2c4d12b42c2b37d67ec503c40dee66092 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Tue, 29 Apr 2025 11:30:27 +0100 Subject: [PATCH 3/4] simplify --- Sources/AsyncHTTPClient/HTTPHandler.swift | 103 +++++++++------------- 1 file changed, 41 insertions(+), 62 deletions(-) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index da7deffae..cad11df2c 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -53,11 +53,10 @@ extension HTTPClient { } @inlinable - @preconcurrency func writeChunks( of bytes: Bytes, maxChunkSize: Int - ) -> EventLoopFuture where Bytes.Element == UInt8, Bytes: Sendable, Bytes.SubSequence: Sendable { + ) -> EventLoopFuture where Bytes.Element == UInt8, Bytes: Sendable { // `StreamWriter` has design issues, for example // - https://github.com/swift-server/async-http-client/issues/194 // - https://github.com/swift-server/async-http-client/issues/264 @@ -65,51 +64,54 @@ extension HTTPClient { // want. // One important consideration then is that we must lock around the iterator because we could be hopping // between threads. - typealias Iterator = BodyStreamIterator + typealias Iterator = EnumeratedSequence>.Iterator typealias Chunk = (offset: Int, element: ChunksOfCountCollection.Element) - func makeIteratorAndFirstChunk( - bytes: Bytes - ) -> ( - iterator: NIOLockedValueBox, - chunk: Chunk - )? { - var iterator = bytes.chunks(ofCount: maxChunkSize).enumerated().makeIterator() - guard let chunk = iterator.next() else { - return nil + // HACK (again, we're not told the right EventLoop): Let's write 0 bytes to make the user tell us... + return self.write(.byteBuffer(ByteBuffer())).flatMapWithEventLoop { (_, loop) in + func makeIteratorAndFirstChunk( + bytes: Bytes + ) -> (iterator: Iterator, chunk: Chunk)? { + var iterator = bytes.chunks(ofCount: maxChunkSize).enumerated().makeIterator() + guard let chunk = iterator.next() else { + return nil + } + + return (iterator, chunk) } - return (NIOLockedValueBox(BodyStreamIterator(iterator)), chunk) - } - - guard let (iterator, chunk) = makeIteratorAndFirstChunk(bytes: bytes) else { - return self.write(IOData.byteBuffer(.init())) - } + guard let iteratorAndChunk = makeIteratorAndFirstChunk(bytes: bytes) else { + return loop.makeSucceededVoidFuture() + } - @Sendable // can't use closure here as we recursively call ourselves which closures can't do - func writeNextChunk(_ chunk: Chunk, allDone: EventLoopPromise) { - if let (index, element) = iterator.withLockedValue({ $0.next() }) { - self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).map { - if (index + 1) % 4 == 0 { - // Let's not stack-overflow if the futures insta-complete which they at least in HTTP/2 - // mode. - // Also, we must frequently return to the EventLoop because we may get the pause signal - // from another thread. If we fail to do that promptly, we may balloon our body chunks - // into memory. - allDone.futureResult.eventLoop.execute { + var iterator = iteratorAndChunk.0 + let chunk = iteratorAndChunk.1 + + // can't use closure here as we recursively call ourselves which closures can't do + func writeNextChunk(_ chunk: Chunk, allDone: EventLoopPromise) { + let loop = allDone.futureResult.eventLoop + loop.assertInEventLoop() + + if let (index, element) = iterator.next() { + self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).hop(to: loop).assumeIsolated().map { + if (index + 1) % 4 == 0 { + // Let's not stack-overflow if the futures insta-complete which they at least in HTTP/2 + // mode. + // Also, we must frequently return to the EventLoop because we may get the pause signal + // from another thread. If we fail to do that promptly, we may balloon our body chunks + // into memory. + allDone.futureResult.eventLoop.assumeIsolated().execute { + writeNextChunk((offset: index, element: element), allDone: allDone) + } + } else { writeNextChunk((offset: index, element: element), allDone: allDone) } - } else { - writeNextChunk((offset: index, element: element), allDone: allDone) - } - }.cascadeFailure(to: allDone) - } else { - self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).cascade(to: allDone) + }.nonisolated().cascadeFailure(to: allDone) + } else { + self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).cascade(to: allDone) + } } - } - // HACK (again, we're not told the right EventLoop): Let's write 0 bytes to make the user tell us... - return self.write(.byteBuffer(ByteBuffer())).flatMapWithEventLoop { (_, loop) in let allDone = loop.makePromise(of: Void.self) writeNextChunk(chunk, allDone: allDone) return allDone.futureResult @@ -189,7 +191,7 @@ extension HTTPClient { @preconcurrency @inlinable public static func bytes(_ bytes: Bytes) -> Body - where Bytes: RandomAccessCollection, Bytes: Sendable, Bytes.SubSequence: Sendable, Bytes.Element == UInt8 { + where Bytes: RandomAccessCollection, Bytes: Sendable, Bytes.Element == UInt8 { Body(contentLength: Int64(bytes.count)) { writer in if bytes.count <= bagOfBytesToByteBufferConversionChunkSize { return writer.write(.byteBuffer(ByteBuffer(bytes: bytes))) @@ -1081,26 +1083,3 @@ extension RequestBodyLength { self = .known(length) } } - -@usableFromInline -struct BodyStreamIterator< - Bytes: Collection ->: IteratorProtocol, @unchecked Sendable where Bytes.Element == UInt8, Bytes: Sendable { - // @unchecked: swift-algorithms hasn't adopted Sendable yet. By inspection, the iterator - // is safe to annotate as sendable. - @usableFromInline - typealias Element = (offset: Int, element: Bytes.SubSequence) - - @usableFromInline - var _backing: EnumeratedSequence>.Iterator - - @inlinable - init(_ backing: EnumeratedSequence>.Iterator) { - self._backing = backing - } - - @inlinable - mutating func next() -> Element? { - self._backing.next() - } -} From 7612e8ff3605c85c14ffc021e7fc55951dd028b6 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Tue, 29 Apr 2025 12:39:29 +0100 Subject: [PATCH 4/4] format --- Sources/AsyncHTTPClient/HTTPHandler.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index cad11df2c..432f0ff11 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -93,7 +93,8 @@ extension HTTPClient { loop.assertInEventLoop() if let (index, element) = iterator.next() { - self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).hop(to: loop).assumeIsolated().map { + self.write(.byteBuffer(ByteBuffer(bytes: chunk.element))).hop(to: loop).assumeIsolated().map + { if (index + 1) % 4 == 0 { // Let's not stack-overflow if the futures insta-complete which they at least in HTTP/2 // mode.