1515
1616package software .amazon .awssdk .services .sqs .internal .batchmanager ;
1717
18+ import java .time .Duration ;
1819import java .util .concurrent .CompletableFuture ;
1920import java .util .concurrent .ScheduledExecutorService ;
2021import software .amazon .awssdk .annotations .SdkInternalApi ;
3940public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager {
4041 // TODO : update the validation here while implementing this class in next PR
4142 private final SqsAsyncClient client ;
42- private final ScheduledExecutorService scheduledExecutor ;
43- private final BatchOverrideConfiguration overrideConfiguration ;
4443
4544 private final BatchManager <SendMessageRequest , SendMessageResponse , SendMessageBatchResponse > sendMessageBatchManager ;
4645
@@ -53,34 +52,68 @@ public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager {
5352
5453 private DefaultSqsAsyncBatchManager (DefaultBuilder builder ) {
5554 this .client = Validate .notNull (builder .client , "client cannot be null" );
56- this .scheduledExecutor = Validate .notNull (builder .scheduledExecutor , "scheduledExecutor cannot be null" );
57- // TODO : create overrideConfiguration with Default values if null
58- this .overrideConfiguration = builder .overrideConfiguration ;
5955
60- sendMessageBatchManager = null ;
61- deleteMessageBatchManager = null ;
62- changeMessageVisibilityBatchManager = null ;
56+ ScheduledExecutorService scheduledExecutor = builder .scheduledExecutor ;
57+
58+ this .sendMessageBatchManager = BatchManager
59+ .requestBatchManagerBuilder (SendMessageRequest .class , SendMessageResponse .class , SendMessageBatchResponse .class )
60+ .batchFunction (SqsBatchFunctions .sendMessageBatchAsyncFunction (client ))
61+ .responseMapper (SqsBatchFunctions .sendMessageResponseMapper ())
62+ .batchKeyMapper (SqsBatchFunctions .sendMessageBatchKeyMapper ())
63+ .overrideConfiguration (sendMessageConfig (builder .overrideConfiguration )).scheduledExecutor (scheduledExecutor )
64+ .build ();
65+ this .deleteMessageBatchManager = BatchManager
66+ .requestBatchManagerBuilder (DeleteMessageRequest .class , DeleteMessageResponse .class , DeleteMessageBatchResponse .class )
67+ .batchFunction (SqsBatchFunctions .deleteMessageBatchAsyncFunction (client ))
68+ .responseMapper (SqsBatchFunctions .deleteMessageResponseMapper ())
69+ .batchKeyMapper (SqsBatchFunctions .deleteMessageBatchKeyMapper ())
70+ .overrideConfiguration (deleteMessageConfig (builder .overrideConfiguration )).scheduledExecutor (scheduledExecutor )
71+ .build ();
72+ this .changeMessageVisibilityBatchManager = BatchManager
73+ .requestBatchManagerBuilder (ChangeMessageVisibilityRequest .class , ChangeMessageVisibilityResponse .class ,
74+ ChangeMessageVisibilityBatchResponse .class )
75+ .batchFunction (SqsBatchFunctions .changeMessageVisibilityBatchAsyncFunction (client ))
76+ .responseMapper (SqsBatchFunctions .changeMessageVisibilityResponseMapper ())
77+ .batchKeyMapper (SqsBatchFunctions .changeMessageVisibilityBatchKeyMapper ())
78+ .overrideConfiguration (changeMessageVisibilityConfig (builder .overrideConfiguration ))
79+ .scheduledExecutor (scheduledExecutor ).build ();
6380 receiveMessageBatchManager = null ;
6481 }
6582
83+
84+ @ SdkInternalApi
85+ public DefaultSqsAsyncBatchManager (
86+ SqsAsyncClient client ,
87+ BatchManager <SendMessageRequest , SendMessageResponse , SendMessageBatchResponse > sendMessageBatchManager ,
88+ BatchManager <DeleteMessageRequest , DeleteMessageResponse , DeleteMessageBatchResponse > deleteMessageBatchManager ,
89+ BatchManager <ChangeMessageVisibilityRequest , ChangeMessageVisibilityResponse ,
90+ ChangeMessageVisibilityBatchResponse > changeMessageVisibilityBatchManager ) {
91+ this .sendMessageBatchManager = sendMessageBatchManager ;
92+ this .deleteMessageBatchManager = deleteMessageBatchManager ;
93+ this .changeMessageVisibilityBatchManager = changeMessageVisibilityBatchManager ;
94+ receiveMessageBatchManager = null ;
95+
96+ this .client = client ;
97+ }
98+
6699 @ Override
67100 public CompletableFuture <SendMessageResponse > sendMessage (SendMessageRequest request ) {
68- return SqsAsyncBatchManager . super . sendMessage (request );
101+ return sendMessageBatchManager . batchRequest (request );
69102 }
70103
71104 @ Override
72105 public CompletableFuture <DeleteMessageResponse > deleteMessage (DeleteMessageRequest request ) {
73- return SqsAsyncBatchManager . super . deleteMessage (request );
106+ return deleteMessageBatchManager . batchRequest (request );
74107 }
75108
76109 @ Override
77110 public CompletableFuture <ChangeMessageVisibilityResponse > changeMessageVisibility (ChangeMessageVisibilityRequest request ) {
78- return SqsAsyncBatchManager . super . changeMessageVisibility (request );
111+ return changeMessageVisibilityBatchManager . batchRequest (request );
79112 }
80113
81114 @ Override
82115 public CompletableFuture <ReceiveMessageResponse > receiveMessage (ReceiveMessageRequest request ) {
83- return SqsAsyncBatchManager . super . receiveMessage (request );
116+ return receiveMessageBatchManager . batchRequest (request );
84117 }
85118
86119 public static SqsAsyncBatchManager .Builder builder () {
@@ -89,6 +122,45 @@ public static SqsAsyncBatchManager.Builder builder() {
89122
90123 @ Override
91124 public void close () {
125+ sendMessageBatchManager .close ();
126+ deleteMessageBatchManager .close ();
127+ changeMessageVisibilityBatchManager .close ();
128+ }
129+
130+ private BatchOverrideConfiguration sendMessageConfig (BatchOverrideConfiguration overrideConfiguration ) {
131+ BatchOverrideConfiguration .Builder config = BatchOverrideConfiguration .builder ();
132+ if (overrideConfiguration == null ) {
133+ config .maxBatchItems (10 );
134+ config .maxBatchOpenInMs (Duration .ofMillis (200 ));
135+ } else {
136+ config .maxBatchItems (overrideConfiguration .maxBatchItems ().orElse (10 ));
137+ config .maxBatchOpenInMs (overrideConfiguration .maxBatchOpenInMs ().orElse (Duration .ofMillis (200 )));
138+ }
139+ return config .build ();
140+ }
141+
142+ private BatchOverrideConfiguration deleteMessageConfig (BatchOverrideConfiguration overrideConfiguration ) {
143+ BatchOverrideConfiguration .Builder config = BatchOverrideConfiguration .builder ();
144+ if (overrideConfiguration == null ) {
145+ config .maxBatchItems (10 );
146+ config .maxBatchOpenInMs (Duration .ofMillis (200 ));
147+ } else {
148+ config .maxBatchItems (overrideConfiguration .maxBatchItems ().orElse (10 ));
149+ config .maxBatchOpenInMs (overrideConfiguration .maxBatchOpenInMs ().orElse (Duration .ofMillis (200 )));
150+ }
151+ return config .build ();
152+ }
153+
154+ private BatchOverrideConfiguration changeMessageVisibilityConfig (BatchOverrideConfiguration overrideConfiguration ) {
155+ BatchOverrideConfiguration .Builder config = BatchOverrideConfiguration .builder ();
156+ if (overrideConfiguration == null ) {
157+ config .maxBatchItems (10 );
158+ config .maxBatchOpenInMs (Duration .ofMillis (200 ));
159+ } else {
160+ config .maxBatchItems (overrideConfiguration .maxBatchItems ().orElse (10 ));
161+ config .maxBatchOpenInMs (overrideConfiguration .maxBatchOpenInMs ().orElse (Duration .ofMillis (200 )));
162+ }
163+ return config .build ();
92164 }
93165
94166 public static final class DefaultBuilder implements SqsAsyncBatchManager .Builder {
0 commit comments