File tree Expand file tree Collapse file tree 2 files changed +49
-0
lines changed
test/java/com/rabbitmq/client/amqp/docs Expand file tree Collapse file tree 2 files changed +49
-0
lines changed Original file line number Diff line number Diff line change @@ -26,6 +26,21 @@ include::{test-examples}/Api.java[tag=connection-settings]
26
26
<1> Use the `guest` user by default
27
27
<2> Use the `admin` user for this connection
28
28
29
+ === Settling Messages in Batch
30
+
31
+ .Settling messages in batch
32
+ [source,java,indent=0]
33
+ --------
34
+ include::{test-examples}/Api.java[tag=settling-message-in-batch]
35
+ --------
36
+ <1> Declare batch context property
37
+ <2> Create a new batch context instance
38
+ <3> Add the current message context to the batch context if processing is successful
39
+ <4> Settle the batch context once it contains 10 messages
40
+ <5> Reset the batch context
41
+ <6> Discard the current message context if processing fails
42
+
43
+
29
44
=== Subscription Listener
30
45
31
46
The client provides a `SubscriptionListener` interface callback to add behavior before a subscription is created.
Original file line number Diff line number Diff line change @@ -86,6 +86,40 @@ void metricsCollectorMicrometerPrometheus() {
86
86
// end::metrics-micrometer-prometheus[]
87
87
}
88
88
89
+ void settlingMessagesInBatch () {
90
+ Connection connection = null ;
91
+
92
+ // tag::settling-message-in-batch[]
93
+ Consumer .MessageHandler handler = new Consumer .MessageHandler () {
94
+ volatile Consumer .BatchContext batch = null ; // <1>
95
+ @ Override
96
+ public void handle (Consumer .Context context , Message message ) {
97
+ if (batch == null ) {
98
+ batch = context .batch (); // <2>
99
+ }
100
+ boolean success = process (message );
101
+ if (success ) {
102
+ batch .add (context ); // <3>
103
+ if (batch .size () == 10 ) {
104
+ batch .accept (); // <4>
105
+ batch = null ; // <5>
106
+ }
107
+ } else {
108
+ context .discard (); // <6>
109
+ }
110
+ }
111
+ };
112
+ Consumer consumer = connection .consumerBuilder ()
113
+ .queue ("some-queue" )
114
+ .messageHandler (handler )
115
+ .build ();
116
+ // end::settling-message-in-batch[]
117
+ }
118
+
119
+ boolean process (Message message ) {
120
+ return true ;
121
+ }
122
+
89
123
void micrometerObservation () {
90
124
ObservationRegistry observationRegistry = ObservationRegistry .NOOP ;
91
125
// tag::micrometer-observation[]
You can’t perform that action at this time.
0 commit comments