Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions parallel-consumer-mutiny/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Copyright (C) 2020-2025 Confluent, Inc.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>parallel-consumer-parent</artifactId>
<groupId>io.confluent.parallelconsumer</groupId>
<version>0.5.3.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<name>Confluent Parallel Consumer SmallRye Mutiny</name>
<artifactId>parallel-consumer-mutiny</artifactId>

<dependencies>
<dependency>
<groupId>io.confluent.parallelconsumer</groupId>
<artifactId>parallel-consumer-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.confluent.parallelconsumer</groupId>
<artifactId>parallel-consumer-core</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny</artifactId>
<version>2.9.4</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>me.tongfei</groupId>
<artifactId>progressbar</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit-pioneer</groupId>
<artifactId>junit-pioneer</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package io.confluent.parallelconsumer.mutiny;

/*-
* Copyright (C) 2020-2025 Confluent, Inc.
*/

import io.confluent.parallelconsumer.PCRetriableException;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContext;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.internal.ExternalEngine;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.subscription.Cancellable;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import pl.tlinkowski.unij.api.UniLists;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static io.confluent.parallelconsumer.internal.UserFunctions.carefullyRun;

/**
* Adapter for using Mutiny as the asynchronous execution engine.
*/
@Slf4j
public class MutinyProcessor<K, V> extends ExternalEngine<K, V> {

/**
* @see WorkContainer#getWorkType()
*/
private static final String MUTINY_TYPE = "mutiny.x-type";

private final Supplier<Executor> executorSupplier;
private final Supplier<Executor> defaultExecutorSupplier = Infrastructure::getDefaultWorkerPool;

public MutinyProcessor(ParallelConsumerOptions<K, V> options, Supplier<Executor> newExecutorSupplier) {
super(options);
this.executorSupplier = (newExecutorSupplier == null) ? defaultExecutorSupplier : newExecutorSupplier;
}

public MutinyProcessor(ParallelConsumerOptions<K, V> options) {
this(options, null);
}

@Override
protected boolean isAsyncFutureWork(List<?> resultsFromUserFunction) {
for (Object object : resultsFromUserFunction) {
return (object instanceof io.smallrye.mutiny.subscription.Cancellable);
}
return false;
}

@Override
public void close(Duration timeout, DrainingMode drainMode) {
super.close(timeout, drainMode);
}

/**
* Register a function to be applied to polled messages.
* <p>
* Make sure that you do any work immediately - do not block this thread.
* <p>
*
* @param mutinyFunction user function that takes a PollContext and returns a Uni<T>
* @see #onRecord(Function)
* @see ParallelConsumerOptions
* @see ParallelConsumerOptions#batchSize
* @see io.confluent.parallelconsumer.ParallelStreamProcessor#poll
*/

/**
* Register a function to be applied to polled messages.
* This must return a Uni<Void> to signal async completion.
*
* @param mutinyFunction user function that takes a PollContext and returns a Uni<Void>
*/
public <T> void onRecord(Function<PollContext<K, V>, Uni<T>> mutinyFunction) {

Function<PollContextInternal<K, V>, List<Object>> wrappedUserFunc = pollContext -> {

if (log.isTraceEnabled()) {
log.trace("Record list ({}), executing void function...",
pollContext.streamConsumerRecords()
.map(ConsumerRecord::offset)
.collect(Collectors.toList())
);
}

pollContext.streamWorkContainers()
.forEach(x -> x.setWorkType(MUTINY_TYPE));

Cancellable uni = Uni.createFrom().deferred(() ->
carefullyRun(mutinyFunction, pollContext.getPollContext())
)
.onItem()
.transformToMulti(result -> {
if (result instanceof Multi<?> multi) {
return multi; // unwrap Multi
} else {
return Multi.createFrom().item(result); // wrap single item as Multi
}
})
.onItem()
.invoke(signal -> log.trace("onItem {}", signal))
.runSubscriptionOn(getExecutor())
.subscribe().with(
ignored -> onComplete(pollContext),
throwable -> onError(pollContext, throwable)
);

log.trace("asyncPoll - user function finished ok.");
return UniLists.of(uni);
};

//
Consumer<Object> voidCallBack = ignored -> log.trace("Void callback applied.");
supervisorLoop(wrappedUserFunc, voidCallBack);
}

private void onComplete(PollContextInternal<K, V> pollContext) {
log.debug("Mutiny success");
pollContext.streamWorkContainers().forEach(wc -> {
wc.onUserFunctionSuccess();
addToMailbox(pollContext, wc);
});
}

private void onError(PollContextInternal<K, V> pollContext, Throwable throwable) {
if (throwable instanceof PCRetriableException) {
log.debug("Mutiny fail signal", throwable);
} else {
log.error("Mutiny fail signal", throwable);
}
pollContext.streamWorkContainers().forEach(wc -> {
wc.onUserFunctionFailure(throwable);
addToMailbox(pollContext, wc);
});
}

private Executor getExecutor() {
return this.executorSupplier.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package io.confluent.parallelconsumer.mutiny;

/*-
* Copyright (C) 2020-2025 Confluent, Inc.
*/

import io.confluent.csid.utils.KafkaTestUtils;
import io.confluent.parallelconsumer.BatchTestBase;
import io.confluent.parallelconsumer.BatchTestMethods;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContext;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.RateLimiter;
import io.smallrye.mutiny.Uni;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static io.confluent.csid.utils.StringUtils.msg;

@Slf4j
public class MutinyBatchTest extends MutinyUnitTestBase implements BatchTestBase {

BatchTestMethods<Uni<String>> batchTestMethods;

@BeforeEach
void setup() {
batchTestMethods = new BatchTestMethods<>(this) {

@Override
protected KafkaTestUtils getKtu() {
return ktu;
}

@SneakyThrows
@Override
protected Uni<String> averageBatchSizeTestPollStep(PollContext<String, String> recordList) {
return Uni.createFrom()
.item(msg("Saw batch or records: {}", recordList.getOffsetsFlattened()))
.onItem().delayIt().by(Duration.ofMillis(30));
}

@Override
protected void averageBatchSizeTestPoll(AtomicInteger numBatches, AtomicInteger numRecords, RateLimiter statusLogger) {
mutinyPC.onRecord(recordList ->
averageBatchSizeTestPollInner(numBatches, numRecords, statusLogger, recordList)
);
}

@Override
protected AbstractParallelEoSStreamProcessor getPC() {
return mutinyPC;
}

@Override
public void simpleBatchTestPoll(List<PollContext<String, String>> batchesReceived) {
mutinyPC.onRecord(recordList -> {
String msg = msg("Saw batch or records: {}", recordList.getOffsetsFlattened());
log.debug(msg);
batchesReceived.add(recordList);
return Uni.createFrom().item(msg);
});
}

@Override
protected void batchFailPoll(List<PollContext<String, String>> batchesReceived) {
mutinyPC.onRecord(recordList -> {
batchFailPollInner(recordList);
batchesReceived.add(recordList);
return Uni.createFrom().item(msg("Saw batch or records: {}", recordList.getOffsetsFlattened()));
});
}
};
}

@Test
public void averageBatchSizeTest() {
batchTestMethods.averageBatchSizeTest(10000);
}

@ParameterizedTest
@EnumSource
@Override
public void simpleBatchTest(ParallelConsumerOptions.ProcessingOrder order) {
batchTestMethods.simpleBatchTest(order);
}

@ParameterizedTest
@EnumSource
@Override
public void batchFailureTest(ParallelConsumerOptions.ProcessingOrder order) {
batchTestMethods.batchFailureTest(order);
}

}

Loading