Skip to content

Commit 3ab7556

Browse files
authored
Add OutputStream BodyPublisher (#652)
* Add outputstream bodypublisher Adds a body method that allows for writing to an outputstream. This is for cases such as certain libraries that can only write to an outputstream * Update DHttpClientRequest.java * Update module-info.java
1 parent 352b389 commit 3ab7556

File tree

7 files changed

+322
-1
lines changed

7 files changed

+322
-1
lines changed

http-client/src/main/java/io/avaje/http/client/DHttpClientRequest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,13 @@ public HttpClientRequest body(HttpRequest.BodyPublisher body) {
363363
return this;
364364
}
365365

366+
@Override
367+
public HttpClientRequest body(OutputStreamBodyWriter writer) {
368+
this.body =
369+
new OutputStreamBodyPublisher(writer, context.httpClient().executor().orElseThrow());
370+
return this;
371+
}
372+
366373
@Override
367374
public HttpClientRequest errorMapper(Function<HttpException, RuntimeException> errorMapper) {
368375
this.errorMapper = errorMapper;

http-client/src/main/java/io/avaje/http/client/HttpClientRequest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,34 @@ default HttpClientRequest queryParam(String name, Collection<String> values) {
403403
*/
404404
HttpClientRequest body(Path file);
405405

406+
407+
/**
408+
* Set the body content using a callback that writes to an {@link java.io.OutputStream}.
409+
* <p>
410+
* This allows streaming large or dynamically generated content directly to the HTTP request body,
411+
* without buffering the entire payload in memory. The provided {@code OutputStreamWriter} is called
412+
* with an {@link java.io.OutputStream} that writes to the request body. Data written to the stream
413+
* is sent as the request body.
414+
* <p>
415+
* Example usage:
416+
* <pre>{@code
417+
* client.request()
418+
* .url("http://example.com/upload")
419+
* .body(outputStream -> {
420+
* // Write data in chunks
421+
* for (byte[] chunk : getChunks()) {
422+
* outputStream.write(chunk);
423+
* }
424+
* })
425+
* .POST()
426+
* .asPlainString();
427+
* }</pre>
428+
*
429+
* @param writer Callback to write data to the request body output stream
430+
* @return The request being built
431+
*/
432+
HttpClientRequest body(OutputStreamBodyWriter writer);
433+
406434
/**
407435
* Set the body content using http BodyPublisher.
408436
*
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package io.avaje.http.client;
2+
3+
import java.io.IOException;
4+
import java.io.PipedInputStream;
5+
import java.io.PipedOutputStream;
6+
import java.io.UncheckedIOException;
7+
import java.net.http.HttpRequest;
8+
import java.nio.ByteBuffer;
9+
import java.util.concurrent.CompletableFuture;
10+
import java.util.concurrent.Executor;
11+
import java.util.concurrent.Flow;
12+
import java.util.concurrent.atomic.AtomicBoolean;
13+
14+
/**
15+
* A BodyPublisher that allows writing to an OutputStream. Data written to the OutputStream is
16+
* published to the HTTP request body.
17+
*/
18+
final class OutputStreamBodyPublisher implements HttpRequest.BodyPublisher {
19+
20+
private final PipedOutputStream outputStream;
21+
private final PipedInputStream inputStream;
22+
private final int bufferSize;
23+
private final AtomicBoolean streamClosed = new AtomicBoolean(false);
24+
private final Executor executor;
25+
private final OutputStreamBodyWriter writer;
26+
27+
OutputStreamBodyPublisher(OutputStreamBodyWriter writer, Executor executor) {
28+
this.bufferSize = 8192;
29+
this.writer = writer;
30+
this.outputStream = new PipedOutputStream();
31+
this.inputStream = new PipedInputStream(bufferSize);
32+
this.executor = executor;
33+
}
34+
35+
@Override
36+
public long contentLength() {
37+
return -1;
38+
}
39+
40+
@Override
41+
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
42+
try {
43+
outputStream.connect(inputStream);
44+
} catch (IOException e) {
45+
throw new UncheckedIOException(e);
46+
}
47+
subscriber.onSubscribe(new OutputStreamSubscription(subscriber));
48+
}
49+
50+
private class OutputStreamSubscription implements Flow.Subscription {
51+
private final Flow.Subscriber<? super ByteBuffer> subscriber;
52+
private final AtomicBoolean cancelled = new AtomicBoolean(false);
53+
private volatile boolean completed = false;
54+
private CompletableFuture<Void> writeTask;
55+
56+
OutputStreamSubscription(Flow.Subscriber<? super ByteBuffer> subscriber) {
57+
this.subscriber = subscriber;
58+
59+
// Start a background thread to write to the output stream
60+
writeTask =
61+
CompletableFuture.runAsync(
62+
() -> {
63+
try {
64+
writer.write(outputStream);
65+
} catch (Throwable t) {
66+
subscriber.onError(t);
67+
} finally {
68+
try {
69+
outputStream.close();
70+
} catch (IOException e) {
71+
subscriber.onError(e);
72+
}
73+
}
74+
},
75+
executor);
76+
}
77+
78+
@Override
79+
public void request(long n) {
80+
if (cancelled.get() || completed) {
81+
return;
82+
}
83+
try {
84+
byte[] buffer = new byte[bufferSize];
85+
for (long i = 0; i < n && !cancelled.get(); i++) {
86+
int bytesRead = inputStream.read(buffer);
87+
if (bytesRead == -1) {
88+
// End of stream
89+
completed = true;
90+
subscriber.onComplete();
91+
closeStreams();
92+
return;
93+
}
94+
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, 0, bytesRead);
95+
subscriber.onNext(byteBuffer);
96+
}
97+
} catch (IOException e) {
98+
completed = true;
99+
subscriber.onError(e);
100+
closeStreams();
101+
}
102+
}
103+
104+
@Override
105+
public void cancel() {
106+
cancelled.set(true);
107+
writeTask.cancel(true);
108+
closeStreams();
109+
}
110+
111+
private void closeStreams() {
112+
if (streamClosed.compareAndSet(false, true)) {
113+
try (outputStream;
114+
inputStream; ) {
115+
} catch (IOException e) {
116+
// Ignore
117+
}
118+
}
119+
}
120+
}
121+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package io.avaje.http.client;
2+
3+
import java.io.IOException;
4+
import java.io.OutputStream;
5+
6+
/**
7+
* Use to set the body content using a callback that writes to an {@link java.io.OutputStream}.
8+
* <p>
9+
* This allows streaming large or dynamically generated content directly to the HTTP request body,
10+
* without buffering the entire payload in memory. The provided {@code OutputStreamWriter} is called
11+
* with an {@link java.io.OutputStream} that writes to the request body. Data written to the stream
12+
* is sent as the request body.
13+
* <p>
14+
* Example usage:
15+
* <pre>{@code
16+
* client.request()
17+
* .url("http://example.com/upload")
18+
* .body(outputStream -> {
19+
* // Write data in chunks
20+
* for (byte[] chunk : getChunks()) {
21+
* outputStream.write(chunk);
22+
* }
23+
* })
24+
* .POST()
25+
* .asPlainString();
26+
* }</pre>
27+
*
28+
* @see HttpClientRequest#body(OutputStreamBodyWriter)
29+
*/
30+
public interface OutputStreamBodyWriter {
31+
32+
/**
33+
* Write body content to the outputStream.
34+
*/
35+
void write(OutputStream outputStream) throws IOException;
36+
}

http-client/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
requires static com.fasterxml.jackson.core;
3131
requires static io.avaje.jsonb;
3232
requires static io.avaje.inject;
33+
requires static jdk.httpserver;
3334

3435
exports io.avaje.http.client;
3536
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package io.avaje.http.client;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
5+
import java.io.ByteArrayOutputStream;
6+
import java.io.IOException;
7+
import java.net.InetSocketAddress;
8+
import java.net.http.HttpResponse;
9+
import java.time.Duration;
10+
import java.util.concurrent.Executors;
11+
import java.util.concurrent.atomic.AtomicReference;
12+
13+
import org.junit.jupiter.api.AfterAll;
14+
import org.junit.jupiter.api.BeforeAll;
15+
import org.junit.jupiter.api.Test;
16+
17+
import com.sun.net.httpserver.HttpServer;
18+
19+
public class OutPutStreamTest {
20+
21+
private static HttpServer server;
22+
private static int port;
23+
private static final AtomicReference<String> receivedBody = new AtomicReference<>();
24+
25+
@BeforeAll
26+
static void startServer() throws IOException {
27+
server = HttpServer.create(new InetSocketAddress(0), 0);
28+
port = server.getAddress().getPort();
29+
server.createContext(
30+
"/test",
31+
exchange -> {
32+
try (var is = exchange.getRequestBody();
33+
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
34+
35+
is.transferTo(baos);
36+
37+
receivedBody.set(baos.toString());
38+
}
39+
exchange.sendResponseHeaders(204, -1);
40+
});
41+
server.setExecutor(Executors.newSingleThreadExecutor());
42+
server.start();
43+
}
44+
45+
@AfterAll
46+
static void stopServer() {
47+
if (server != null) {
48+
server.stop(0);
49+
}
50+
}
51+
52+
@Test
53+
void testOutputStreamBodyPublisher() throws Exception {
54+
String data = "Hello OutputStreamBodyPublisher!";
55+
56+
HttpClient client = HttpClient.builder().requestTimeout(Duration.ofDays(1)).build();
57+
58+
HttpResponse<String> response =
59+
client
60+
.request()
61+
.url("http://localhost:" + port + "/test")
62+
.header("Content-Type", "text/plain")
63+
.body(
64+
outputStream -> {
65+
outputStream.write(data.getBytes());
66+
})
67+
.POST()
68+
.asPlainString();
69+
70+
assertEquals(204, response.statusCode());
71+
assertEquals(data, receivedBody.get());
72+
}
73+
74+
@Test
75+
void testOutputStreamBodyPublisherLargeData() throws Exception {
76+
int repeat = 100_000; // much larger than buffer (8192)
77+
String chunk = "abcdefghij"; // 10 bytes
78+
StringBuilder sb = new StringBuilder(repeat * chunk.length());
79+
for (int i = 0; i < repeat; i++) {
80+
sb.append(chunk);
81+
}
82+
String data = sb.toString();
83+
84+
receivedBody.set(null); // reset
85+
86+
HttpClient client = HttpClient.builder().requestTimeout(Duration.ofMinutes(2)).build();
87+
88+
HttpResponse<String> response =
89+
client
90+
.request()
91+
.url("http://localhost:" + port + "/test")
92+
.header("Content-Type", "text/plain")
93+
.body(
94+
os -> {
95+
for (int i = 0; i < repeat; i++) {
96+
os.write(chunk.getBytes());
97+
}
98+
})
99+
.POST()
100+
.asPlainString();
101+
102+
assertEquals(204, response.statusCode());
103+
assertEquals(data, receivedBody.get());
104+
}
105+
106+
@Test
107+
void testError() throws Exception {
108+
109+
HttpClient client = HttpClient.builder().requestTimeout(Duration.ofDays(1)).build();
110+
try {
111+
112+
client
113+
.request()
114+
.url("http://localhost:" + port + "/test")
115+
.header("Content-Type", "text/plain")
116+
.body(
117+
outputStream -> {
118+
outputStream.write(" Output".getBytes());
119+
throw new IOException("test error");
120+
})
121+
.POST()
122+
.asPlainString();
123+
} catch (HttpException e) {
124+
assertEquals("test error", e.getCause().getMessage());
125+
}
126+
}
127+
}

http-generator-client/src/main/java/io/avaje/http/generator/client/ClientMethodWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,8 @@ private static boolean directBodyType(String type) {
425425
|| "java.util.function.Supplier<?extendsjava.io.InputStream>".equals(type)
426426
|| "java.util.function.Supplier<java.io.InputStream>".equals(type)
427427
|| "java.nio.file.Path".equals(type)
428-
|| "io.avaje.http.client.BodyContent".equals(type);
428+
|| "io.avaje.http.client.BodyContent".equals(type)
429+
|| "io.avaje.http.client.OutputStreamBodyWriter".equals(type);
429430
}
430431

431432
private void writePaths(Set<PathSegments.Segment> segments) {

0 commit comments

Comments
 (0)