From 7e1379b0c3622ad1a3f6d754f2af8b78dbd706ce Mon Sep 17 00:00:00 2001 From: default Date: Thu, 11 Sep 2025 12:08:39 +0200 Subject: [PATCH 1/5] initial mutiny commit --- parallel-consumer-mutiny/pom.xml | 57 +++++++ .../mutiny/MutinyProcessor.java | 152 ++++++++++++++++++ .../parallelconsumer/mutiny/MutinyTest.java | 38 +++++ pom.xml | 1 + 4 files changed, 248 insertions(+) create mode 100644 parallel-consumer-mutiny/pom.xml create mode 100644 parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java create mode 100644 parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyTest.java diff --git a/parallel-consumer-mutiny/pom.xml b/parallel-consumer-mutiny/pom.xml new file mode 100644 index 000000000..08fc730e0 --- /dev/null +++ b/parallel-consumer-mutiny/pom.xml @@ -0,0 +1,57 @@ + + + + + parallel-consumer-parent + io.confluent.parallelconsumer + 0.5.3.4-SNAPSHOT + + 4.0.0 + + Confluent Parallel Consumer SmallRye Mutiny + parallel-consumer-mutiny + + + + io.confluent.parallelconsumer + parallel-consumer-core + ${project.version} + + + io.confluent.parallelconsumer + parallel-consumer-core + ${project.version} + tests + test + + + io.smallrye.reactive + mutiny + 2.9.4 + + + com.google.guava + guava + + + me.tongfei + progressbar + test + + + org.junit.jupiter + junit-jupiter-params + test + + + org.junit-pioneer + junit-pioneer + test + + + + diff --git a/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java b/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java new file mode 100644 index 000000000..8f9d0761e --- /dev/null +++ b/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java @@ -0,0 +1,152 @@ +package io.confluent.parallelconsumer.mutiny; + +/*- + * Copyright (C) 2020-2025 Confluent, Inc. + */ + +import io.confluent.parallelconsumer.PCRetriableException; +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.PollContext; +import io.confluent.parallelconsumer.PollContextInternal; +import io.confluent.parallelconsumer.internal.ExternalEngine; +import io.confluent.parallelconsumer.state.WorkContainer; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.subscription.Cancellable; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import pl.tlinkowski.unij.api.UniLists; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static io.confluent.parallelconsumer.internal.UserFunctions.carefullyRun; + +/** + * Adapter for using Mutiny as the asynchronous execution engine. + */ +@Slf4j +public class MutinyProcessor extends ExternalEngine { + + /** + * @see WorkContainer#getWorkType() + */ + private static final String MUTINY_TYPE = "mutiny.x-type"; + + private final Supplier executorSupplier; + private final Supplier defaultExecutorSupplier = Infrastructure::getDefaultExecutor; + + public MutinyProcessor(ParallelConsumerOptions options, Supplier newExecutorSupplier) { + super(options); + this.executorSupplier = (newExecutorSupplier == null) ? defaultExecutorSupplier : newExecutorSupplier; + } + + public MutinyProcessor(ParallelConsumerOptions options) { + this(options, null); + } + + @Override + protected boolean isAsyncFutureWork(List resultsFromUserFunction) { + for (Object object : resultsFromUserFunction) { + return (object instanceof io.smallrye.mutiny.subscription.Cancellable); + } + return false; + } + + @Override + public void close(Duration timeout, DrainingMode drainMode) { + super.close(timeout, drainMode); + } + + /** + * Register a function to be applied to polled messages. + *

+ * Make sure that you do any work immediately - do not block this thread. + *

+ * + * @param mutinyFunction user function that takes a PollContext and returns a Uni + * @see #onRecord(Function) + * @see ParallelConsumerOptions + * @see ParallelConsumerOptions#batchSize + * @see io.confluent.parallelconsumer.ParallelStreamProcessor#poll + */ + + /** + * Register a function to be applied to polled messages. + * This must return a Uni to signal async completion. + * + * @param mutinyFunction user function that takes a PollContext and returns a Uni + */ + public void onRecord(Function, Uni> mutinyFunction) { + + Function, List> wrappedUserFunc = pollContext -> { + + if (log.isTraceEnabled()) { + log.trace("Record list ({}), executing void function...", + pollContext.streamConsumerRecords() + .map(ConsumerRecord::offset) + .collect(Collectors.toList()) + ); + } + + pollContext.streamWorkContainers() + .forEach(x -> x.setWorkType(MUTINY_TYPE)); + + Cancellable uni = Uni.createFrom().deferred(() -> + carefullyRun(mutinyFunction, pollContext.getPollContext()) + ) + .onItem() + .transformToMulti(result -> { + if (result instanceof Multi multi) { + return multi; // unwrap Multi + } else { + return Multi.createFrom().item(result); // wrap single item as Multi + } + }) + .onItem() + .invoke(signal -> log.trace("onItem {}", signal)) + .runSubscriptionOn(getExecutor()) + .subscribe().with( + ignored -> onComplete(pollContext), + throwable -> onError(pollContext, throwable) + ); + + log.trace("asyncPoll - user function finished ok."); + return UniLists.of(uni); + }; + + // + Consumer voidCallBack = ignored -> log.trace("Void callback applied."); + supervisorLoop(wrappedUserFunc, voidCallBack); + } + + private void onComplete(PollContextInternal pollContext) { + log.debug("Mutiny success"); + pollContext.streamWorkContainers().forEach(wc -> { + wc.onUserFunctionSuccess(); + addToMailbox(pollContext, wc); + }); + } + + private void onError(PollContextInternal pollContext, Throwable throwable) { + if (throwable instanceof PCRetriableException) { + log.debug("Mutiny fail signal", throwable); + } else { + log.error("Mutiny fail signal", throwable); + } + pollContext.streamWorkContainers().forEach(wc -> { + wc.onUserFunctionFailure(throwable); + addToMailbox(pollContext, wc); + }); + } + + private Executor getExecutor() { + return this.executorSupplier.get(); + } +} diff --git a/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyTest.java b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyTest.java new file mode 100644 index 000000000..142021070 --- /dev/null +++ b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyTest.java @@ -0,0 +1,38 @@ +package io.confluent.parallelconsumer.mutiny; + +/*- + * Copyright (C) 2020-2025 Confluent, Inc. + */ + +import io.smallrye.mutiny.Multi; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +class MutinyTest { + + @Test + void emitOnExample() { + ExecutorService executor = Executors.newFixedThreadPool(4); + + Multi multi = Multi.createFrom().range(1, 3) // 1..2 inclusive + .map(i -> 10 + i) + .emitOn(executor) // similar to publishOn + .map(i -> "value " + i); + + multi.subscribe().with(System.out::println, Throwable::printStackTrace); + } + + @Test + void runSubscriptionOnExample() { + ExecutorService executor = Executors.newFixedThreadPool(4); + + Multi multi = Multi.createFrom().range(1, 3) + .map(i -> 10 + i) + .runSubscriptionOn(executor) // similar to subscribeOn + .map(i -> "value " + i); + + multi.subscribe().with(System.out::println, Throwable::printStackTrace); + } +} diff --git a/pom.xml b/pom.xml index aa9078292..976b57504 100644 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,7 @@ parallel-consumer-core parallel-consumer-vertx parallel-consumer-reactor + parallel-consumer-mutiny parallel-consumer-examples From d10f1f20e86e7bfad62335a1356e40b102c60446 Mon Sep 17 00:00:00 2001 From: default Date: Thu, 11 Sep 2025 12:38:12 +0200 Subject: [PATCH 2/5] initial mutiny commit --- .../mutiny/MutinyBatchTest.java | 99 +++++++++++++ .../parallelconsumer/mutiny/MutinyPCTest.java | 140 ++++++++++++++++++ .../mutiny/MutinyUnitTestBase.java | 26 ++++ 3 files changed, 265 insertions(+) create mode 100644 parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyBatchTest.java create mode 100644 parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyPCTest.java create mode 100644 parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyUnitTestBase.java diff --git a/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyBatchTest.java b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyBatchTest.java new file mode 100644 index 000000000..116b6b94a --- /dev/null +++ b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyBatchTest.java @@ -0,0 +1,99 @@ +package io.confluent.parallelconsumer.mutiny; + +import io.confluent.csid.utils.KafkaTestUtils; +import io.confluent.parallelconsumer.BatchTestBase; +import io.confluent.parallelconsumer.BatchTestMethods; +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.PollContext; +import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; +import io.confluent.parallelconsumer.internal.RateLimiter; +import io.smallrye.mutiny.Uni; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static io.confluent.csid.utils.StringUtils.msg; + +@Slf4j +public class MutinyBatchTest extends MutinyUnitTestBase implements BatchTestBase { + + BatchTestMethods> batchTestMethods; + + @BeforeEach + void setup() { + batchTestMethods = new BatchTestMethods<>(this) { + + @Override + protected KafkaTestUtils getKtu() { + return ktu; + } + + @SneakyThrows + @Override + protected Uni averageBatchSizeTestPollStep(PollContext recordList) { + return Uni.createFrom() + .item(msg("Saw batch or records: {}", recordList.getOffsetsFlattened())) + .onItem().delayIt().by(Duration.ofMillis(30)); + } + + @Override + protected void averageBatchSizeTestPoll(AtomicInteger numBatches, AtomicInteger numRecords, RateLimiter statusLogger) { + mutinyPC.onRecord(recordList -> + averageBatchSizeTestPollInner(numBatches, numRecords, statusLogger, recordList) + ); + } + + @Override + protected AbstractParallelEoSStreamProcessor getPC() { + return mutinyPC; + } + + @Override + public void simpleBatchTestPoll(List> batchesReceived) { + mutinyPC.onRecord(recordList -> { + String msg = msg("Saw batch or records: {}", recordList.getOffsetsFlattened()); + log.debug(msg); + batchesReceived.add(recordList); + return Uni.createFrom().item(msg); + }); + } + + @Override + protected void batchFailPoll(List> batchesReceived) { + mutinyPC.onRecord(recordList -> { + batchFailPollInner(recordList); + batchesReceived.add(recordList); + return Uni.createFrom().item(msg("Saw batch or records: {}", recordList.getOffsetsFlattened())); + }); + } + }; + } + + @Test + public void averageBatchSizeTest() { + batchTestMethods.averageBatchSizeTest(10000); + } + + @ParameterizedTest + @EnumSource + @Override + public void simpleBatchTest(ParallelConsumerOptions.ProcessingOrder order) { + batchTestMethods.simpleBatchTest(order); + } + + @ParameterizedTest + @EnumSource + @Override + public void batchFailureTest(ParallelConsumerOptions.ProcessingOrder order) { + batchTestMethods.batchFailureTest(order); + } + +} + diff --git a/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyPCTest.java b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyPCTest.java new file mode 100644 index 000000000..6898c4cd2 --- /dev/null +++ b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyPCTest.java @@ -0,0 +1,140 @@ +package io.confluent.parallelconsumer.mutiny; + +import io.confluent.csid.utils.LatchTestUtils; +import io.confluent.csid.utils.ProgressBarUtils; +import io.smallrye.mutiny.Uni; +import lombok.extern.slf4j.Slf4j; +import me.tongfei.progressbar.ProgressBar; +import org.assertj.core.data.Percentage; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.truth.Truth.assertWithMessage; +import static io.confluent.parallelconsumer.truth.LongPollingMockConsumerSubject.assertThat; +import static org.awaitility.Awaitility.await; + +@Slf4j +class MutinyPCTest extends MutinyUnitTestBase { + + /** + * The percent of the max concurrency tolerance allowed + */ + public static final Percentage MAX_CONCURRENCY_OVERFLOW_ALLOWANCE = Percentage.withPercentage(1.2); + + @BeforeEach + public void setupData() { + super.primeFirstRecord(); + } + + @Test + void kickTires() { + primeFirstRecord(); + primeFirstRecord(); + primeFirstRecord(); + + ConcurrentLinkedQueue msgs = new ConcurrentLinkedQueue<>(); + ConcurrentLinkedQueue threads = new ConcurrentLinkedQueue<>(); + + mutinyPC.onRecord(ctx -> { + log.info("Mutiny user function: {}", ctx); + msgs.add(ctx); + threads.add(Thread.currentThread().getName()); + + // return a Uni for async processing + return Uni.createFrom().item(String.format("result: %d:%s", ctx.getSingleConsumerRecord().offset(), ctx.getSingleConsumerRecord().value())); + }); + + await() + .atMost(defaultTimeout) + .untilAsserted(() -> { + assertWithMessage("Processed records collection so far") + .that(msgs.size()) + .isEqualTo(4); + + assertThat(consumerSpy) + .hasCommittedToPartition(topicPartition) + .atLeastOffset(4); + + assertWithMessage("The user-defined function should be executed by the scheduler") + .that(threads.stream().allMatch(thread -> thread.startsWith("pc-pool"))) + .isTrue(); + }); + } + + @Test + void concurrencyTest() throws InterruptedException { + int quantity = 100_000; + var consumerRecords = ktu.generateRecords(quantity - 1); // -1 because 1 is already primed + ktu.send(consumerSpy, consumerRecords); + log.info("Finished priming records"); + + ProgressBar bar = ProgressBarUtils.getNewMessagesBar(log, quantity); + + ConcurrentLinkedQueue msgs = new ConcurrentLinkedQueue<>(); + AtomicInteger finishedCount = new AtomicInteger(0); + AtomicInteger maxConcurrentRecordsSeen = new AtomicInteger(0); + CountDownLatch completeOrProblem = new CountDownLatch(1); + int maxConcurrency = MAX_CONCURRENCY; + + mutinyPC.onRecord(ctx -> { + var record = ctx.getSingleConsumerRecord(); + return Uni.createFrom().item(String.format("result: %d:%s", record.offset(), record.value())) + .onItem().invoke(ignore -> { + // add that our uni processing has started + log.trace("Mutiny user function executing: {}", ctx); + msgs.add(ctx); + if (msgs.size() > maxConcurrency) { + log.error("More records submitted for processing than max concurrency settings ({} vs {})", msgs.size(), maxConcurrency); + completeOrProblem.countDown(); + } + }) + // simulate async delay + .onItem().delayIt().by(Duration.ofMillis((int) (100 * Math.random()))) + .onItem().invoke(s -> { + log.trace("User function after delay. Records pending: {}, removing from processing: {}", msgs.size(), ctx); + int currentConcurrentRecords = msgs.size(); + int highestSoFar = Math.max(currentConcurrentRecords, maxConcurrentRecordsSeen.get()); + maxConcurrentRecordsSeen.set(highestSoFar); + + boolean removed = msgs.remove(ctx); + assertWithMessage("record was present and removed") + .that(removed).isTrue(); + + int numberOfFinishedRecords = finishedCount.incrementAndGet(); + if (numberOfFinishedRecords > quantity - 1) { + completeOrProblem.countDown(); + } + + bar.step(); + }); + }); + + // block until all messages processed + LatchTestUtils.awaitLatch(completeOrProblem, defaultTimeoutSeconds); + + int maxConcurrencyAllowedThreshold = (int) (maxConcurrency * MAX_CONCURRENCY_OVERFLOW_ALLOWANCE.value); + assertWithMessage("Max concurrency should never be exceeded") + .that(maxConcurrentRecordsSeen.get()).isLessThan(maxConcurrencyAllowedThreshold); + + await() + .atMost(defaultTimeout) + .failFast("Max concurrency exceeded", () -> msgs.size() > maxConcurrencyAllowedThreshold) + .untilAsserted(() -> { + assertWithMessage("Number of completed messages") + .that(finishedCount.get()).isEqualTo(quantity); + + assertThat(consumerSpy) + .hasCommittedToPartition(topicPartition) + .offset(quantity); + }); + + bar.close(); + log.info("Max concurrency was {}", maxConcurrentRecordsSeen.get()); + } +} + diff --git a/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyUnitTestBase.java b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyUnitTestBase.java new file mode 100644 index 000000000..4e3cd7b26 --- /dev/null +++ b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyUnitTestBase.java @@ -0,0 +1,26 @@ +package io.confluent.parallelconsumer.mutiny; + +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase; +import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; + +import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC; + +public class MutinyUnitTestBase extends ParallelEoSStreamProcessorTestBase { + + protected MutinyProcessor mutinyPC; + + protected static final int MAX_CONCURRENCY = 1000; + + @Override + protected AbstractParallelEoSStreamProcessor initAsyncConsumer(ParallelConsumerOptions parallelConsumerOptions) { + var build = parallelConsumerOptions.toBuilder() + .commitMode(PERIODIC_CONSUMER_SYNC) + .maxConcurrency(MAX_CONCURRENCY) + .build(); + + mutinyPC = new MutinyProcessor<>(build); + + return mutinyPC; + } +} \ No newline at end of file From 844e95e91957123e4ddd341f28a0caafc402fde8 Mon Sep 17 00:00:00 2001 From: bvukasovic Date: Thu, 11 Sep 2025 13:15:10 +0200 Subject: [PATCH 3/5] updated tests --- .../mutiny/MutinyProcessor.java | 2 +- .../mutiny/MutinyBatchTest.java | 4 + .../parallelconsumer/mutiny/MutinyPCTest.java | 9 +- .../mutiny/MutinyUnitTestBase.java | 4 + .../src/test/resources/logback-test.xml | 83 +++++++++++++++++++ 5 files changed, 98 insertions(+), 4 deletions(-) create mode 100644 parallel-consumer-mutiny/src/test/resources/logback-test.xml diff --git a/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java b/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java index 8f9d0761e..e7e75e460 100644 --- a/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java +++ b/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java @@ -40,7 +40,7 @@ public class MutinyProcessor extends ExternalEngine { private static final String MUTINY_TYPE = "mutiny.x-type"; private final Supplier executorSupplier; - private final Supplier defaultExecutorSupplier = Infrastructure::getDefaultExecutor; + private final Supplier defaultExecutorSupplier = Infrastructure::getDefaultWorkerPool; public MutinyProcessor(ParallelConsumerOptions options, Supplier newExecutorSupplier) { super(options); diff --git a/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyBatchTest.java b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyBatchTest.java index 116b6b94a..67a6b0e8d 100644 --- a/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyBatchTest.java +++ b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyBatchTest.java @@ -1,5 +1,9 @@ package io.confluent.parallelconsumer.mutiny; +/*- + * Copyright (C) 2020-2025 Confluent, Inc. + */ + import io.confluent.csid.utils.KafkaTestUtils; import io.confluent.parallelconsumer.BatchTestBase; import io.confluent.parallelconsumer.BatchTestMethods; diff --git a/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyPCTest.java b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyPCTest.java index 6898c4cd2..8c1723022 100644 --- a/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyPCTest.java +++ b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyPCTest.java @@ -1,5 +1,9 @@ package io.confluent.parallelconsumer.mutiny; +/*- + * Copyright (C) 2020-2025 Confluent, Inc. + */ + import io.confluent.csid.utils.LatchTestUtils; import io.confluent.csid.utils.ProgressBarUtils; import io.smallrye.mutiny.Uni; @@ -44,7 +48,6 @@ void kickTires() { log.info("Mutiny user function: {}", ctx); msgs.add(ctx); threads.add(Thread.currentThread().getName()); - // return a Uni for async processing return Uni.createFrom().item(String.format("result: %d:%s", ctx.getSingleConsumerRecord().offset(), ctx.getSingleConsumerRecord().value())); }); @@ -61,7 +64,7 @@ void kickTires() { .atLeastOffset(4); assertWithMessage("The user-defined function should be executed by the scheduler") - .that(threads.stream().allMatch(thread -> thread.startsWith("pc-pool"))) + .that(threads.stream().allMatch(thread -> thread.startsWith("pool"))) .isTrue(); }); } @@ -94,7 +97,7 @@ var record = ctx.getSingleConsumerRecord(); } }) // simulate async delay - .onItem().delayIt().by(Duration.ofMillis((int) (100 * Math.random()))) + .onItem().delayIt().by(Duration.ofMillis(Math.max(1, (int) (100 * Math.random())))) .onItem().invoke(s -> { log.trace("User function after delay. Records pending: {}, removing from processing: {}", msgs.size(), ctx); int currentConcurrentRecords = msgs.size(); diff --git a/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyUnitTestBase.java b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyUnitTestBase.java index 4e3cd7b26..ce92b03f7 100644 --- a/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyUnitTestBase.java +++ b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyUnitTestBase.java @@ -1,5 +1,9 @@ package io.confluent.parallelconsumer.mutiny; +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase; import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; diff --git a/parallel-consumer-mutiny/src/test/resources/logback-test.xml b/parallel-consumer-mutiny/src/test/resources/logback-test.xml new file mode 100644 index 000000000..3aec1d52e --- /dev/null +++ b/parallel-consumer-mutiny/src/test/resources/logback-test.xml @@ -0,0 +1,83 @@ + + + + + + + + + + + + + %d{mm:ss.SSS} %yellow(%X{pcId}) %highlight(%-5level) %yellow([%thread]) %X{offset} %cyan(\(%file:%line\)#%M) %msg%n + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + From f9173570b6efdfd53f5e77a52e909762959518da Mon Sep 17 00:00:00 2001 From: bvukasovic Date: Mon, 22 Sep 2025 10:14:49 +0200 Subject: [PATCH 4/5] updated multi processing --- .../parallelconsumer/mutiny/MutinyProcessor.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java b/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java index e7e75e460..cc99aeab5 100644 --- a/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java +++ b/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java @@ -103,7 +103,10 @@ public void onRecord(Function, Uni> mutinyFunction) { ) .onItem() .transformToMulti(result -> { - if (result instanceof Multi multi) { + if(result == null) { + return Multi.createFrom().empty(); + } + else if (result instanceof Multi multi) { return multi; // unwrap Multi } else { return Multi.createFrom().item(result); // wrap single item as Multi @@ -114,7 +117,8 @@ public void onRecord(Function, Uni> mutinyFunction) { .runSubscriptionOn(getExecutor()) .subscribe().with( ignored -> onComplete(pollContext), - throwable -> onError(pollContext, throwable) + throwable -> onError(pollContext, throwable), + () -> onComplete(pollContext) ); log.trace("asyncPoll - user function finished ok."); From 968caa76b95c2e14e231ed23f0fa92a9e550dc8c Mon Sep 17 00:00:00 2001 From: bvukasovic Date: Mon, 22 Sep 2025 10:25:02 +0200 Subject: [PATCH 5/5] handle null result --- .../io/confluent/parallelconsumer/mutiny/MutinyProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java b/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java index cc99aeab5..040dfe237 100644 --- a/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java +++ b/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java @@ -116,7 +116,7 @@ else if (result instanceof Multi multi) { .invoke(signal -> log.trace("onItem {}", signal)) .runSubscriptionOn(getExecutor()) .subscribe().with( - ignored -> onComplete(pollContext), + ignored -> {}, throwable -> onError(pollContext, throwable), () -> onComplete(pollContext) );