Skip to content

Commit df68350

Browse files
committed
Add batch size hint parameter to create batch context
And add Javadoc to BatchContext.
1 parent f842223 commit df68350

File tree

7 files changed

+54
-17
lines changed

7 files changed

+54
-17
lines changed

src/main/java/com/rabbitmq/client/amqp/Consumer.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,44 @@ interface Context {
135135
*/
136136
void requeue(Map<String, Object> annotations);
137137

138-
BatchContext batch();
138+
/**
139+
* Create a batch context to accumulate message contexts and settle them at once.
140+
*
141+
* <p>The message context the batch context is created from is <b>not</b> added to the batch
142+
* context.
143+
*
144+
* @return the created batch context
145+
*/
146+
BatchContext batch(int batchSizeHint);
139147
}
140148

149+
/**
150+
* Context to accumulate message contexts and settle them at once.
151+
*
152+
* <p>A {@link BatchContext} is also a {@link Context}: the same methods are available to settle
153+
* the messages.
154+
*
155+
* <p>Only "simple" (not batch) message contexts can be added to a batch context. Calling {@link
156+
* Context#batch()} on a batch context returns the instance itself.
157+
*
158+
* @see <a
159+
* href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-disposition">AMQP
160+
* 1.0 Disposition performative</a>
161+
*/
141162
interface BatchContext extends Context {
142163

164+
/**
165+
* Add a message context to the batch context.
166+
*
167+
* @param context the message context to add
168+
*/
143169
void add(Context context);
144170

171+
/**
172+
* Get the current number of message contexts in the batch context.
173+
*
174+
* @return current number of message contexts in the batch
175+
*/
145176
int size();
146177
}
147178
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,9 @@ public void requeue(Map<String, Object> annotations) {
447447
}
448448

449449
@Override
450-
public BatchContext batch() {
450+
public BatchContext batch(int batchSizeHint) {
451451
return new BatchDeliveryContext(
452+
batchSizeHint,
452453
protonExecutor,
453454
protonReceiver,
454455
metricsCollector,
@@ -481,7 +482,7 @@ private static final class BatchDeliveryContext implements BatchContext {
481482

482483
private static final org.apache.qpid.protonj2.types.transport.DeliveryState REJECTED =
483484
new Rejected();
484-
private final List<DeliveryContext> contexts = new ArrayList<>();
485+
private final List<DeliveryContext> contexts;
485486
private final AtomicBoolean settled = new AtomicBoolean(false);
486487
private final Scheduler protonExecutor;
487488
private final ProtonReceiver protonReceiver;
@@ -491,12 +492,14 @@ private static final class BatchDeliveryContext implements BatchContext {
491492
private final AmqpConsumer consumer;
492493

493494
private BatchDeliveryContext(
495+
int batchSizeHint,
494496
Scheduler protonExecutor,
495497
ProtonReceiver protonReceiver,
496498
MetricsCollector metricsCollector,
497499
AtomicLong unsettledMessageCount,
498500
Runnable replenishCreditOperation,
499501
AmqpConsumer consumer) {
502+
this.contexts = new ArrayList<>(batchSizeHint);
500503
this.protonExecutor = protonExecutor;
501504
this.protonReceiver = protonReceiver;
502505
this.metricsCollector = metricsCollector;
@@ -563,7 +566,7 @@ public void requeue(Map<String, Object> annotations) {
563566
}
564567

565568
@Override
566-
public BatchContext batch() {
569+
public BatchContext batch(int batchSizeHint) {
567570
return this;
568571
}
569572

src/main/java/com/rabbitmq/client/amqp/impl/SerialNumberUtils.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ final class SerialNumberUtils {
2727
// SERIAL_BITS = 32
2828
// 2 ^ SERIAL_BITS
2929
static final long SERIAL_SPACE = 0x100000000L;
30-
// 2 ^ (SERIAL_BITS - 1) - 1
31-
private static final long SERIAL_MAX_ADDEND = 0x7fffffffL;
3230
// 2 ^ (SERIAL_BITS - 1)
3331
private static final long COMPARE = 2_147_483_648L;
3432

src/test/java/com/rabbitmq/client/amqp/docs/Api.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,17 +90,18 @@ void settlingMessagesInBatch() {
9090
Connection connection = null;
9191

9292
// tag::settling-message-in-batch[]
93+
int batchSize = 10;
9394
Consumer.MessageHandler handler = new Consumer.MessageHandler() {
9495
volatile Consumer.BatchContext batch = null; // <1>
9596
@Override
9697
public void handle(Consumer.Context context, Message message) {
9798
if (batch == null) {
98-
batch = context.batch(); // <2>
99+
batch = context.batch(batchSize); // <2>
99100
}
100101
boolean success = process(message);
101102
if (success) {
102103
batch.add(context); // <3>
103-
if (batch.size() == 10) {
104+
if (batch.size() == batchSize) {
104105
batch.accept(); // <4>
105106
batch = null; // <5>
106107
}

src/test/java/com/rabbitmq/client/amqp/impl/BatchDispositionTest.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ void test() {
5959
@Override
6060
public void handle(Consumer.Context context, Message message) {
6161
if (batch == null) {
62-
batch = context.batch();
62+
batch = context.batch(batchSize);
6363
}
6464

6565
boolean success = processMessage(message);
@@ -154,16 +154,20 @@ private void dispose() {
154154
}
155155

156156
@Override
157-
public Consumer.BatchContext batch() {
158-
return new TestBatchContext();
157+
public Consumer.BatchContext batch(int batchSizeHint) {
158+
return new TestBatchContext(batchSizeHint);
159159
}
160160
}
161161

162162
static class TestBatchContext implements Consumer.BatchContext {
163163

164-
private final List<TestContext> contexts = new ArrayList<>(batchSize);
164+
private final List<TestContext> contexts;
165165
private final AtomicBoolean disposed = new AtomicBoolean(false);
166166

167+
public TestBatchContext(int batchSizeHint) {
168+
this.contexts = new ArrayList<>(batchSizeHint);
169+
}
170+
167171
@Override
168172
public void add(Consumer.Context context) {
169173
this.contexts.add((TestContext) context);
@@ -218,7 +222,7 @@ private void dispose() {
218222
}
219223

220224
@Override
221-
public Consumer.BatchContext batch() {
225+
public Consumer.BatchContext batch(int batchSizeHint) {
222226
return this;
223227
}
224228
}

src/test/java/com/rabbitmq/client/amqp/impl/ConsumerOutcomeTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ void requeuedMessageWithAnnotationShouldContainAnnotationsOnRedelivery(boolean b
128128
messages.offer(message);
129129
if (deliveryCount.get() == 1) {
130130
if (batch) {
131-
Consumer.BatchContext bc = ctx.batch();
131+
Consumer.BatchContext bc = ctx.batch(1);
132132
bc.add(ctx);
133133
ctx = bc;
134134
}
@@ -195,7 +195,7 @@ void discardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndContainAnnotation
195195
.messageHandler(
196196
(ctx, msg) -> {
197197
if (batch) {
198-
Consumer.BatchContext bc = ctx.batch();
198+
Consumer.BatchContext bc = ctx.batch(1);
199199
bc.add(ctx);
200200
ctx = bc;
201201
}
@@ -260,7 +260,7 @@ void batchAcceptShouldSettleMessages() {
260260
.messageHandler(
261261
(ctx, msg) -> {
262262
if (batchContext.get() == null) {
263-
batchContext.set(ctx.batch());
263+
batchContext.set(ctx.batch(batchSize));
264264
}
265265
if (random.nextInt(10) == 0) {
266266
ctx.discard();

src/test/java/com/rabbitmq/client/amqp/perf/AmqpPerfTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public static void main(String[] args) throws IOException {
116116
(context, message) -> {
117117
recordMessage.accept(message);
118118
if (batch.get() == null) {
119-
batch.set(context.batch());
119+
batch.set(context.batch(disposeEvery));
120120
}
121121
batch.get().add(context);
122122
if (batch.get().size() == disposeEvery) {

0 commit comments

Comments
 (0)