34
34
import org .junit .jupiter .api .BeforeEach ;
35
35
import org .junit .jupiter .api .Test ;
36
36
import org .junit .jupiter .api .TestInfo ;
37
+ import org .junit .jupiter .params .ParameterizedTest ;
38
+ import org .junit .jupiter .params .provider .ValueSource ;
37
39
38
40
@ AmqpTestInfrastructure
39
41
public class ConsumerOutcomeTest {
@@ -108,8 +110,9 @@ void requeuedMessageShouldBeRequeued() {
108
110
waitAtMost (() -> management .queueInfo (q ).messageCount () == 0 );
109
111
}
110
112
111
- @ Test
112
- void requeuedMessageWithAnnotationShouldContainAnnotationsOnRedelivery () {
113
+ @ ParameterizedTest
114
+ @ ValueSource (booleans = {true , false })
115
+ void requeuedMessageWithAnnotationShouldContainAnnotationsOnRedelivery (boolean batch ) {
113
116
this .management .queue ().name (q ).type (QUORUM ).declare ();
114
117
115
118
Publisher publisher = this .connection .publisherBuilder ().queue (q ).build ();
@@ -120,13 +123,18 @@ void requeuedMessageWithAnnotationShouldContainAnnotationsOnRedelivery() {
120
123
.consumerBuilder ()
121
124
.queue (q )
122
125
.messageHandler (
123
- (context , message ) -> {
126
+ (ctx , message ) -> {
124
127
deliveryCount .incrementAndGet ();
125
128
messages .offer (message );
126
129
if (deliveryCount .get () == 1 ) {
127
- context .requeue (ANNOTATIONS );
130
+ if (batch ) {
131
+ Consumer .BatchContext bc = ctx .batch ();
132
+ bc .add (ctx );
133
+ ctx = bc ;
134
+ }
135
+ ctx .requeue (ANNOTATIONS );
128
136
} else {
129
- context .accept ();
137
+ ctx .accept ();
130
138
redeliveredSync .down ();
131
139
}
132
140
})
@@ -175,15 +183,24 @@ void discardedMessageShouldBeDeadLeadLetteredWhenConfigured() {
175
183
waitAtMost (() -> management .queueInfo (dlq ).messageCount () == 0 );
176
184
}
177
185
178
- @ Test
179
- void
180
- discardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndContainAnnotationsWhenConfigured () {
186
+ @ ParameterizedTest
187
+ @ ValueSource (booleans = {true , false })
188
+ void discardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndContainAnnotationsWhenConfigured (
189
+ boolean batch ) {
181
190
declareDeadLetterTopology ();
182
191
Publisher publisher = this .connection .publisherBuilder ().queue (q ).build ();
183
192
this .connection
184
193
.consumerBuilder ()
185
194
.queue (q )
186
- .messageHandler ((ctx , msg ) -> ctx .discard (ANNOTATIONS ))
195
+ .messageHandler (
196
+ (ctx , msg ) -> {
197
+ if (batch ) {
198
+ Consumer .BatchContext bc = ctx .batch ();
199
+ bc .add (ctx );
200
+ ctx = bc ;
201
+ }
202
+ ctx .discard (ANNOTATIONS );
203
+ })
187
204
.build ();
188
205
189
206
TestUtils .Sync deadLetteredSync = TestUtils .sync ();
0 commit comments