diff --git a/httpclient5/pom.xml b/httpclient5/pom.xml index b97e1d21ec..4478777448 100644 --- a/httpclient5/pom.xml +++ b/httpclient5/pom.xml @@ -118,6 +118,10 @@ zstd-jni true + + com.aayushatharva.brotli4j + brotli4j + diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducer.java new file mode 100644 index 0000000000..676c530a4d --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducer.java @@ -0,0 +1,306 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.async.methods; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import com.aayushatharva.brotli4j.encoder.Encoder; +import com.aayushatharva.brotli4j.encoder.EncoderJNI; + +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.util.Args; + +/** + * {@code AsyncEntityProducer} that Brotli-compresses bytes from an upstream producer + * on the fly and writes the compressed stream to the target {@link DataStreamChannel}. + *

+ * Purely async/streaming: no {@code InputStream}/{@code OutputStream}. Back-pressure is + * honored via {@link #available()} and the I/O reactor’s calls into {@link #produce(DataStreamChannel)}. + * Trailers from the upstream producer are preserved and emitted once the compressed output + * has been fully drained. + *

+ * + *

Content metadata

+ * Returns {@code Content-Encoding: br}, {@code Content-Length: -1} and {@code chunked=true}. + * Repeatability matches the upstream producer. + * + *

Implementation notes

+ * Uses Brotli4j’s {@code EncoderJNI.Wrapper}. JNI-owned output buffers are written directly + * when possible; if the channel applies back-pressure, the unwritten tail is copied into + * small pooled direct {@link java.nio.ByteBuffer}s to reduce allocation churn. Native + * resources are released in {@link #releaseResources()}. + *

+ * Ensure {@link com.aayushatharva.brotli4j.Brotli4jLoader#ensureAvailability()} has been + * called once at startup; this class also invokes it in a static initializer as a safeguard. + *

+ * + *

Usage

+ *
{@code
+ * AsyncEntityProducer plain = new StringAsyncEntityProducer("hello", ContentType.TEXT_PLAIN);
+ * AsyncEntityProducer br = new DeflatingBrotliEntityProducer(plain); // defaults q=5, lgwin=22
+ * client.execute(new BasicRequestProducer(post, br),
+ *                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
+ *                null);
+ * }
+ * + * @see org.apache.hc.core5.http.nio.AsyncEntityProducer + * @see org.apache.hc.core5.http.nio.DataStreamChannel + * @see com.aayushatharva.brotli4j.encoder.EncoderJNI + * @since 5.6 + */ +public final class DeflatingBrotliEntityProducer implements AsyncEntityProducer { + + private enum State { STREAMING, FINISHING, DONE } + + private final AsyncEntityProducer upstream; + private final EncoderJNI.Wrapper encoder; + + private ByteBuffer pendingOut; + private List pendingTrailers; + private State state = State.STREAMING; + + /** + * Create a producer with explicit Brotli params. + * + * @param upstream upstream entity producer whose bytes will be compressed + * @param quality Brotli quality level (see brotli4j documentation) + * @param lgwin Brotli window size log2 (see brotli4j documentation) + * @param mode Brotli mode hint (GENERIC/TEXT/FONT) + * @throws IOException if the native encoder cannot be created + * @since 5.6 + */ + public DeflatingBrotliEntityProducer( + final AsyncEntityProducer upstream, + final int quality, + final int lgwin, + final Encoder.Mode mode) throws IOException { + this.upstream = Args.notNull(upstream, "upstream"); + this.encoder = new EncoderJNI.Wrapper(256 * 1024, quality, lgwin, mode); + } + + /** + * Convenience constructor mapping {@code 0=GENERIC, 1=TEXT, 2=FONT}. + * + * @since 5.6 + */ + public DeflatingBrotliEntityProducer( + final AsyncEntityProducer upstream, + final int quality, + final int lgwin, + final int modeInt) throws IOException { + this(upstream, quality, lgwin, + modeInt == 1 ? Encoder.Mode.TEXT : + modeInt == 2 ? Encoder.Mode.FONT : Encoder.Mode.GENERIC); + } + + /** + * Create a producer with sensible defaults ({@code quality=5}, {@code lgwin=22}, {@code GENERIC}). + * + * @since 5.6 + */ + public DeflatingBrotliEntityProducer(final AsyncEntityProducer upstream) throws IOException { + this(upstream, 5, 22, Encoder.Mode.GENERIC); + } + + + @Override + public String getContentType() { + return upstream.getContentType(); + } + + @Override + public String getContentEncoding() { + return "br"; + } + + @Override + public long getContentLength() { + return -1; + } + + @Override + public boolean isChunked() { + return true; + } + + @Override + public Set getTrailerNames() { + return upstream.getTrailerNames(); + } + + @Override + public boolean isRepeatable() { + return upstream.isRepeatable(); + } + + @Override + public int available() { + if (state == State.DONE) { + return 0; + } + if (pendingOut != null && pendingOut.hasRemaining() || pendingTrailers != null) { + return 1; + } + final int up = upstream.available(); + return (state != State.STREAMING || up > 0) ? 1 : 0; + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + if (flushPending(channel)) { + return; + } + + if (state == State.FINISHING) { + encoder.push(EncoderJNI.Operation.FINISH, 0); + if (drainEncoder(channel)) { + return; + } + if (pendingTrailers == null) { + pendingTrailers = Collections.emptyList(); + } + channel.endStream(pendingTrailers); + pendingTrailers = null; + state = State.DONE; + return; + } + + upstream.produce(new DataStreamChannel() { + @Override + public void requestOutput() { + channel.requestOutput(); + } + + @Override + public int write(final ByteBuffer src) throws IOException { + int accepted = 0; + while (src.hasRemaining()) { + final ByteBuffer in = encoder.getInputBuffer(); + if (!in.hasRemaining()) { + encoder.push(EncoderJNI.Operation.PROCESS, 0); + if (drainEncoder(channel)) { + break; + } + continue; + } + final int xfer = Math.min(src.remaining(), in.remaining()); + final int lim = src.limit(); + src.limit(src.position() + xfer); + in.put(src); + src.limit(lim); + accepted += xfer; + + encoder.push(EncoderJNI.Operation.PROCESS, xfer); + if (drainEncoder(channel)) { + break; + } + } + return accepted; + } + + @Override + public void endStream() throws IOException { + endStream(Collections.emptyList()); + } + + @Override + public void endStream(final List trailers) throws IOException { + pendingTrailers = trailers; + state = State.FINISHING; + encoder.push(EncoderJNI.Operation.FINISH, 0); + if (drainEncoder(channel)) { + return; + } + if (pendingTrailers == null) { + pendingTrailers = Collections.emptyList(); + } + channel.endStream(pendingTrailers); + pendingTrailers = null; + state = State.DONE; + } + }); + } + + @Override + public void failed(final Exception cause) { + upstream.failed(cause); + } + + @Override + public void releaseResources() { + try { + encoder.destroy(); + } catch (final Throwable ignore) { + } + upstream.releaseResources(); + pendingOut = null; + pendingTrailers = null; + state = State.DONE; + } + + + private boolean flushPending(final DataStreamChannel channel) throws IOException { + if (pendingOut != null && pendingOut.hasRemaining()) { + channel.write(pendingOut); + if (pendingOut.hasRemaining()) { + channel.requestOutput(); + return true; + } + pendingOut = null; + } + if (pendingOut == null && pendingTrailers != null && state != State.STREAMING) { + channel.endStream(pendingTrailers); + pendingTrailers = null; + state = State.DONE; + return true; + } + return false; + } + + private boolean drainEncoder(final DataStreamChannel channel) throws IOException { + while (encoder.hasMoreOutput()) { + final ByteBuffer buf = encoder.pull(); + if (buf == null || !buf.hasRemaining()) { + continue; + } + channel.write(buf); + if (buf.hasRemaining()) { + pendingOut = ByteBuffer.allocateDirect(buf.remaining()); + pendingOut.put(buf).flip(); + channel.requestOutput(); + return true; + } + } + return false; + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumer.java new file mode 100644 index 0000000000..b2c3ae630f --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumer.java @@ -0,0 +1,179 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.async.methods; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +import com.aayushatharva.brotli4j.decoder.DecoderJNI; + +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.nio.AsyncDataConsumer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.util.Asserts; + +/** + * {@code AsyncDataConsumer} that inflates a Brotli-compressed byte stream and forwards + * decompressed bytes to a downstream consumer. + *

+ * Purely async/streaming: no {@code InputStream}/{@code OutputStream}. Back-pressure from + * the I/O reactor is propagated via {@link CapacityChannel}. JNI output buffers are copied + * into small reusable direct {@link java.nio.ByteBuffer}s before handing them to the + * downstream consumer (which may retain them). + *

+ * + *

Implementation notes

+ * Uses Brotli4j’s {@code DecoderJNI.Wrapper}. Native resources are released in + * {@link #releaseResources()}. Throws an {@link java.io.IOException} if the stream is + * truncated or corrupted. + *

+ * Ensure {@link com.aayushatharva.brotli4j.Brotli4jLoader#ensureAvailability()} has been + * called once at startup; this class also invokes it in a static initializer as a safeguard. + *

+ * + *

Usage

+ *
{@code
+ * AsyncDataConsumer textConsumer = new StringAsyncEntityConsumer();
+ * AsyncDataConsumer brInflating  = new InflatingBrotliDataConsumer(textConsumer);
+ * client.execute(producer, new BasicResponseConsumer<>(brInflating), null);
+ * }
+ * + * @see org.apache.hc.core5.http.nio.AsyncDataConsumer + * @see org.apache.hc.core5.http.nio.CapacityChannel + * @see com.aayushatharva.brotli4j.decoder.DecoderJNI + * @since 5.6 + */ +public final class InflatingBrotliDataConsumer implements AsyncDataConsumer { + + private final AsyncDataConsumer downstream; + private final DecoderJNI.Wrapper decoder; + private volatile CapacityChannel capacity; + + + public InflatingBrotliDataConsumer(final AsyncDataConsumer downstream) { + this.downstream = downstream; + try { + this.decoder = new DecoderJNI.Wrapper(8 * 1024); + } catch (final IOException e) { + throw new RuntimeException("Unable to initialize DecoderJNI", e); + } + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + this.capacity = capacityChannel; + downstream.updateCapacity(capacityChannel); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + while (src.hasRemaining()) { + final ByteBuffer in = decoder.getInputBuffer(); + final int xfer = Math.min(src.remaining(), in.remaining()); + if (xfer == 0) { + decoder.push(0); + pump(); + continue; + } + final int lim = src.limit(); + src.limit(src.position() + xfer); + in.put(src); + src.limit(lim); + + decoder.push(xfer); + pump(); + } + final CapacityChannel ch = this.capacity; + if (ch != null) { + ch.update(Integer.MAX_VALUE); + } + } + + @Override + public void streamEnd(final List trailers) throws IOException, HttpException { + pump(); + Asserts.check(decoder.getStatus() == DecoderJNI.Status.DONE || !decoder.hasOutput(), + "Truncated brotli stream"); + downstream.streamEnd(trailers); + } + + @Override + public void releaseResources() { + try { + decoder.destroy(); + } catch (final Throwable ignore) { + } + downstream.releaseResources(); + } + + private void pump() throws IOException { + for (; ; ) { + switch (decoder.getStatus()) { + case OK: + decoder.push(0); + break; + case NEEDS_MORE_OUTPUT: { + // Pull a decoder-owned buffer; copy before handing off. + final ByteBuffer nativeBuf = decoder.pull(); + if (nativeBuf != null && nativeBuf.hasRemaining()) { + final ByteBuffer copy = ByteBuffer.allocateDirect(nativeBuf.remaining()); + copy.put(nativeBuf).flip(); + downstream.consume(copy); + } + break; + } + case NEEDS_MORE_INPUT: + if (decoder.hasOutput()) { + final ByteBuffer nativeBuf = decoder.pull(); + if (nativeBuf != null && nativeBuf.hasRemaining()) { + final ByteBuffer copy = ByteBuffer.allocateDirect(nativeBuf.remaining()); + copy.put(nativeBuf).flip(); + downstream.consume(copy); + break; + } + } + return; // wait for more input + case DONE: + if (decoder.hasOutput()) { + final ByteBuffer nativeBuf = decoder.pull(); + if (nativeBuf != null && nativeBuf.hasRemaining()) { + final ByteBuffer copy = ByteBuffer.allocateDirect(nativeBuf.remaining()); + copy.put(nativeBuf).flip(); + downstream.consume(copy); + break; + } + } + return; + default: + // Corrupted stream + throw new IOException("Brotli stream corrupted"); + } + } + } +} \ No newline at end of file diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/Brotli4jRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/Brotli4jRuntime.java new file mode 100644 index 0000000000..f60b1099f2 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/Brotli4jRuntime.java @@ -0,0 +1,55 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl; + +import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.annotation.ThreadingBehavior; + + +@Internal +@Contract(threading = ThreadingBehavior.STATELESS) +public final class Brotli4jRuntime { + + private static final String BROTLI = "com.aayushatharva.brotli4j.Brotli4jLoader"; + + private Brotli4jRuntime() { + } + + /** + * @return {@code true} if {@code com.aayushatharva.brotli4j} can be loaded + * by the current class loader; {@code false} otherwise + */ + public static boolean available() { + try { + Class.forName(BROTLI, false, Brotli4jRuntime.class.getClassLoader()); + return true; + } catch (ClassNotFoundException | LinkageError ex) { + return false; + } + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java index 41e69505c1..cae264a0d3 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java @@ -38,9 +38,11 @@ import org.apache.hc.client5.http.async.AsyncExecChain; import org.apache.hc.client5.http.async.AsyncExecChainHandler; import org.apache.hc.client5.http.async.methods.InflatingAsyncDataConsumer; +import org.apache.hc.client5.http.async.methods.InflatingBrotliDataConsumer; import org.apache.hc.client5.http.async.methods.InflatingGzipDataConsumer; import org.apache.hc.client5.http.async.methods.InflatingZstdDataConsumer; import org.apache.hc.client5.http.entity.compress.ContentCoding; +import org.apache.hc.client5.http.impl.Brotli4jRuntime; import org.apache.hc.client5.http.impl.ZstdRuntime; import org.apache.hc.client5.http.impl.ContentCodingSupport; import org.apache.hc.client5.http.protocol.HttpClientContext; @@ -100,6 +102,11 @@ public ContentCompressionAsyncExec() { tokens.add("zstd"); } + if (Brotli4jRuntime.available()) { + rb.register(ContentCoding.BROTLI.token(), InflatingBrotliDataConsumer::new); + tokens.add(ContentCoding.BROTLI.token()); + } + this.decoders = rb.build(); this.acceptTokens = tokens; } diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducerTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducerTest.java new file mode 100644 index 0000000000..549f12cbcd --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducerTest.java @@ -0,0 +1,143 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.async.methods; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; + +import com.aayushatharva.brotli4j.Brotli4jLoader; +import com.aayushatharva.brotli4j.decoder.Decoder; +import com.aayushatharva.brotli4j.decoder.DirectDecompress; +import com.aayushatharva.brotli4j.encoder.Encoder; + +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class DeflatingBrotliEntityProducerTest { + + private static String longText() { + final String seed = "lorem ipsum äëïöü – "; + final StringBuilder sb = new StringBuilder(seed.length() * 3000); + for (int i = 0; i < 3000; i++) { + sb.append(seed); + } + return sb.toString(); + } + + /** + * DataStreamChannel that accepts at most maxChunk bytes per write. + */ + private static final class ThrottledChannel implements DataStreamChannel { + private final ByteArrayOutputStream sink = new ByteArrayOutputStream(); + private final int maxChunk; + private boolean ended; + + ThrottledChannel(final int maxChunk) { + this.maxChunk = maxChunk; + } + + @Override + public void requestOutput() { /* no-op for test */ } + + @Override + public int write(final ByteBuffer src) { + final int len = Math.min(src.remaining(), maxChunk); + if (len <= 0) return 0; + final byte[] tmp = new byte[len]; + src.get(tmp); + sink.write(tmp, 0, len); + return len; + } + + @Override + public void endStream() { + // Core interface in some versions; delegate to list variant for safety + endStream(Collections.emptyList()); + } + + @Override + public void endStream(final List trailers) { + ended = true; + } + + byte[] data() { + return sink.toByteArray(); + } + + boolean ended() { + return ended; + } + } + + @BeforeAll + static void init() { + Brotli4jLoader.ensureAvailability(); + } + + @Test + void roundTrip() throws Exception { + final String text = longText(); + + final byte[] payload = text.getBytes(java.nio.charset.StandardCharsets.UTF_8); + final AsyncEntityProducer raw = + new org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer( + payload, + org.apache.hc.core5.http.ContentType.TEXT_PLAIN.withCharset(java.nio.charset.StandardCharsets.UTF_8) + ); + final DeflatingBrotliEntityProducer br = + new DeflatingBrotliEntityProducer(raw, 5, 22, Encoder.Mode.TEXT); + + final ThrottledChannel ch = new ThrottledChannel(1024); + + // drive until producer reports no more work + while (br.available() > 0) { + br.produce(ch); + } + + final byte[] compressed = ch.data(); + assertTrue(compressed.length > 0, "no compressed bytes were produced"); + + // Decompress using brotli4j + final DirectDecompress dd = Decoder.decompress(compressed); + final String decoded = new String(dd.getDecompressedData(), StandardCharsets.UTF_8); + + assertEquals(text, decoded); + assertEquals("br", br.getContentEncoding()); + assertTrue(br.isChunked()); + assertEquals(-1, br.getContentLength()); + assertTrue(ch.ended(), "stream was not ended"); + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumerTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumerTest.java new file mode 100644 index 0000000000..8446d28307 --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumerTest.java @@ -0,0 +1,201 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.async.methods; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.UnaryOperator; + +import com.aayushatharva.brotli4j.Brotli4jLoader; +import com.aayushatharva.brotli4j.encoder.Encoder; + +import org.apache.hc.client5.http.impl.async.ContentCompressionAsyncExec; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.nio.AsyncDataConsumer; +import org.apache.hc.core5.http.nio.AsyncEntityConsumer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class InflatingBrotliDataConsumerTest { + + /** + * Collects bytes and decodes to UTF-8 once at the end. + */ + private static final class ByteCollector implements AsyncEntityConsumer { + + private final java.io.ByteArrayOutputStream buf = new java.io.ByteArrayOutputStream(); + private FutureCallback callback; + + @Override + public void streamStart(final EntityDetails ed, final FutureCallback cb) { + this.callback = cb; + } + + @Override + public void updateCapacity(final CapacityChannel c) { + } + + @Override + public void consume(final ByteBuffer src) { + final byte[] tmp = new byte[src.remaining()]; + src.get(tmp); + buf.write(tmp, 0, tmp.length); + } + + @Override + public void streamEnd(final List t) { + if (callback != null) { + callback.completed(getContent()); + } + } + + @Override + public void failed(final Exception cause) { + throw new RuntimeException(cause); + } + + @Override + public void releaseResources() { + } + + @Override + public String getContent() { + return new String(buf.toByteArray(), StandardCharsets.UTF_8); + } + } + + private static String ORIGINAL; + + @BeforeAll + static void setUp() { + Brotli4jLoader.ensureAvailability(); + final String seed = "Hello ✈ brotli 🎈! "; + final StringBuilder sb = new StringBuilder(seed.length() * 4000); + for (int i = 0; i < 4000; i++) { + sb.append(seed); + } + ORIGINAL = sb.toString(); + } + + private static byte[] brCompress() throws IOException { + final byte[] src = ORIGINAL.getBytes(StandardCharsets.UTF_8); + final Encoder.Parameters p = new Encoder.Parameters() + .setQuality(6) + .setWindow(22) + .setMode(Encoder.Mode.TEXT); + return Encoder.compress(src, p); + } + + @Test + void inflateBrotli() throws Exception { + + final byte[] compressed = brCompress(); + + final ByteCollector inner = new ByteCollector(); + final InflatingBrotliDataConsumer inflating = new InflatingBrotliDataConsumer(inner); + + final CountDownLatch done = new CountDownLatch(1); + final FutureCallback cb = new FutureCallback() { + @Override + public void completed(final String result) { + done.countDown(); + } + + @Override + public void failed(final Exception ex) { + fail(ex); + } + + @Override + public void cancelled() { + fail("cancelled"); + } + }; + + /* minimal EntityDetails stub */ + final EntityDetails details = new EntityDetails() { + @Override + public long getContentLength() { + return compressed.length; + } + + @Override + public String getContentType() { + return "text/plain"; + } + + @Override + public String getContentEncoding() { + return "br"; + } + + @Override + public boolean isChunked() { + return false; + } + + @Override + public Set getTrailerNames() { + return new HashSet<>(); + } + }; + inner.streamStart(details, cb); + + for (int off = 0; off < compressed.length; off += 1024) { + final int n = Math.min(1024, compressed.length - off); + inflating.consume(ByteBuffer.wrap(compressed, off, n)); + } + inflating.streamEnd(Collections.emptyList()); + + assertTrue(done.await(2, TimeUnit.SECONDS), "callback timeout"); + assertEquals(ORIGINAL, inner.getContent(), "br inflate mismatch"); + } + + @Test + void registerInExec() { + final LinkedHashMap> map = new LinkedHashMap<>(); + map.put("br", InflatingBrotliDataConsumer::new); + final ContentCompressionAsyncExec exec = new ContentCompressionAsyncExec(map, false); + assertNotNull(exec); + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientServerBrotliRoundTrip.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientServerBrotliRoundTrip.java new file mode 100644 index 0000000000..74f522f78b --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientServerBrotliRoundTrip.java @@ -0,0 +1,220 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.examples; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Future; + +import com.aayushatharva.brotli4j.Brotli4jLoader; +import com.aayushatharva.brotli4j.encoder.BrotliOutputStream; + +import org.apache.commons.compress.compressors.CompressorException; +import org.apache.commons.compress.compressors.CompressorInputStream; +import org.apache.commons.compress.compressors.CompressorStreamFactory; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.impl.bootstrap.HttpServer; +import org.apache.hc.core5.http.impl.bootstrap.ServerBootstrap; +import org.apache.hc.core5.http.io.HttpRequestHandler; +import org.apache.hc.core5.http.io.entity.ByteArrayEntity; +import org.apache.hc.core5.http.io.entity.StringEntity; +import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.io.CloseMode; + +/** + * Async client/server demo with Brotli in both directions: + *

+ * - Client sends a Brotli-compressed request body (Content-Encoding: br) + * - Server decompresses request, then responds with a Brotli-compressed body + * - Client checks the response Content-Encoding and decompresses if needed + *

+ * Notes: + * - Encoding uses brotli4j (native JNI); make sure matching native dependency is on the runtime classpath. + * - Decoding here uses Commons Compress via CompressorStreamFactory("br"). + */ +public final class AsyncClientServerBrotliRoundTrip { + + static { + Brotli4jLoader.ensureAvailability(); + } + + private static final String BR = "br"; + + public static void main(final String[] args) throws Exception { + final HttpServer server = ServerBootstrap.bootstrap() + .setListenerPort(0) + .setCanonicalHostName("localhost") + .register("/echo", new EchoHandler()) + .create(); + server.start(); + final int port = server.getLocalPort(); + final String url = "http://localhost:" + port + "/echo"; + + try (final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault()) { + client.start(); + + final String requestBody = "Hello Brotli world (round-trip)!"; + System.out.println("Request (plain): " + requestBody); + + // --- client compresses request --- + final byte[] reqCompressed = brotliCompress(requestBody.getBytes(StandardCharsets.UTF_8)); + + final SimpleHttpRequest post = SimpleRequestBuilder.post(url) + .setHeader(HttpHeaders.CONTENT_TYPE, ContentType.TEXT_PLAIN.toString()) + .setHeader(HttpHeaders.CONTENT_ENCODING, BR) + .build(); + + final Future> f = client.execute( + new BasicRequestProducer(post, + new BasicAsyncEntityProducer(reqCompressed, ContentType.APPLICATION_OCTET_STREAM)), + new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), + null); + + final Message msg = f.get(); + final HttpResponse head = msg.getHead(); + final byte[] respBodyRaw = msg.getBody() != null ? msg.getBody() : new byte[0]; + + System.out.println("Status : " + head.getCode()); + final Header ce = head.getFirstHeader(HttpHeaders.CONTENT_ENCODING); + final boolean isBr = ce != null && BR.equalsIgnoreCase(ce.getValue()); + System.out.println("Response C-E : " + (isBr ? BR : "(none)")); + + final byte[] respPlain = isBr ? brotliDecompress(respBodyRaw) : respBodyRaw; + System.out.println("Response (plain) : " + new String(respPlain, StandardCharsets.UTF_8)); + } finally { + server.close(CloseMode.GRACEFUL); + } + } + + /** + * Server handler: + * - If request has Content-Encoding: br, decompress it + * - Echo the text back, but re-encode the response with Brotli (Content-Encoding: br) + */ + private static final class EchoHandler implements HttpRequestHandler { + @Override + public void handle( + final ClassicHttpRequest request, + final ClassicHttpResponse response, + final HttpContext context) throws IOException { + + final HttpEntity entity = request.getEntity(); + if (entity == null) { + response.setCode(HttpStatus.SC_BAD_REQUEST); + response.setEntity(new StringEntity("Missing request body", StandardCharsets.UTF_8)); + return; + } + + try { + final byte[] requestPlain; + final Header ce = request.getFirstHeader(HttpHeaders.CONTENT_ENCODING); + if (ce != null && BR.equalsIgnoreCase(ce.getValue())) { + try (final InputStream in = entity.getContent(); + final CompressorInputStream bin = + new CompressorStreamFactory().createCompressorInputStream(BR, in)) { + requestPlain = readAll(bin); + } + } else { + try (final InputStream in = entity.getContent()) { + requestPlain = readAll(in); + } + } + + final String echoed = new String(requestPlain, StandardCharsets.UTF_8); + + // --- server compresses response with Brotli --- + final byte[] respCompressed = brotliCompress(echoed.getBytes(StandardCharsets.UTF_8)); + response.setCode(HttpStatus.SC_OK); + response.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.TEXT_PLAIN.toString()); + response.addHeader(HttpHeaders.CONTENT_ENCODING, BR); + response.setEntity(new ByteArrayEntity(respCompressed, ContentType.APPLICATION_OCTET_STREAM)); + + } catch (final CompressorException ex) { + response.setCode(HttpStatus.SC_BAD_REQUEST); + response.setEntity(new StringEntity("Invalid Brotli payload", StandardCharsets.UTF_8)); + } catch (final Exception ex) { + response.setCode(HttpStatus.SC_INTERNAL_SERVER_ERROR); + response.setEntity(new StringEntity("Server error", StandardCharsets.UTF_8)); + } + } + } + + /** + * Utility: read entire stream into a byte[] (demo-only). + */ + private static byte[] readAll(final InputStream in) throws IOException { + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + final byte[] buf = new byte[8192]; + int n; + while ((n = in.read(buf)) != -1) { + bos.write(buf, 0, n); + } + return bos.toByteArray(); + } + + /** + * Compress a byte[] with Brotli using brotli4j. + */ + private static byte[] brotliCompress(final byte[] plain) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final BrotliOutputStream out = new BrotliOutputStream(baos)) { + out.write(plain); + } + return baos.toByteArray(); + } + + /** + * Decompress a Brotli-compressed byte[] using Commons Compress. + */ + private static byte[] brotliDecompress(final byte[] compressed) throws IOException { + try (final InputStream in = new ByteArrayInputStream(compressed); + final CompressorInputStream bin = new CompressorStreamFactory().createCompressorInputStream(BR, in)) { + return readAll(bin); + } catch (final CompressorException e) { + throw new IOException("Failed to decompress Brotli data", e); + } + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java index 690bc07c10..a02ddb45d4 100644 --- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java @@ -116,7 +116,7 @@ void testAcceptEncodingAdded() throws Exception { final HttpRequest request = new BasicHttpRequest(Method.GET, "/path"); executeAndCapture(request); assertTrue(request.containsHeader(HttpHeaders.ACCEPT_ENCODING)); - assertEquals("gzip, x-gzip, deflate, zstd", request.getFirstHeader(HttpHeaders.ACCEPT_ENCODING).getValue()); + assertEquals("gzip, x-gzip, deflate, zstd, br", request.getFirstHeader(HttpHeaders.ACCEPT_ENCODING).getValue()); } @Test diff --git a/pom.xml b/pom.xml index 9039588609..1eb352e2aa 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,7 @@ 5.3.6 2.25.0 0.1.2 + 1.20.0 2.5.2 3.10.8 2.12.3 @@ -256,6 +257,12 @@ ${otel.version} test + + com.aayushatharva.brotli4j + brotli4j + ${brotli4j.version} + true +