1515
1616package  software .amazon .awssdk .services .sqs .internal .batchmanager ;
1717
18+ import  java .time .Duration ;
1819import  java .util .concurrent .CompletableFuture ;
1920import  java .util .concurrent .ScheduledExecutorService ;
2021import  software .amazon .awssdk .annotations .SdkInternalApi ;
22+ import  software .amazon .awssdk .annotations .SdkTestInternalApi ;
2123import  software .amazon .awssdk .services .sqs .SqsAsyncClient ;
2224import  software .amazon .awssdk .services .sqs .batchmanager .BatchOverrideConfiguration ;
2325import  software .amazon .awssdk .services .sqs .batchmanager .SqsAsyncBatchManager ;
3941public  final  class  DefaultSqsAsyncBatchManager  implements  SqsAsyncBatchManager  {
4042    // TODO : update the validation here while implementing this class in next PR 
4143    private  final  SqsAsyncClient  client ;
42-     private  final   ScheduledExecutorService  scheduledExecutor ;
43-     private  final   BatchOverrideConfiguration  overrideConfiguration ;
4444
4545    private  final  BatchManager <SendMessageRequest , SendMessageResponse , SendMessageBatchResponse > sendMessageBatchManager ;
4646
@@ -53,34 +53,69 @@ public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager {
5353
5454    private  DefaultSqsAsyncBatchManager (DefaultBuilder  builder ) {
5555        this .client  = Validate .notNull (builder .client , "client cannot be null" );
56-         this .scheduledExecutor  = Validate .notNull (builder .scheduledExecutor , "scheduledExecutor cannot be null" );
57-         // TODO : create overrideConfiguration with Default values if null 
58-         this .overrideConfiguration  = builder .overrideConfiguration ;
5956
60-         sendMessageBatchManager  = null ;
61-         deleteMessageBatchManager  = null ;
62-         changeMessageVisibilityBatchManager  = null ;
57+         ScheduledExecutorService  scheduledExecutor  = builder .scheduledExecutor ;
58+ 
59+         this .sendMessageBatchManager  = BatchManager 
60+             .requestBatchManagerBuilder (SendMessageRequest .class , SendMessageResponse .class , SendMessageBatchResponse .class )
61+             .batchFunction (SqsBatchFunctions .sendMessageBatchAsyncFunction (client ))
62+             .responseMapper (SqsBatchFunctions .sendMessageResponseMapper ())
63+             .batchKeyMapper (SqsBatchFunctions .sendMessageBatchKeyMapper ())
64+             .overrideConfiguration (sendMessageConfig (builder .overrideConfiguration )).scheduledExecutor (scheduledExecutor )
65+             .build ();
66+         this .deleteMessageBatchManager  = BatchManager 
67+             .requestBatchManagerBuilder (DeleteMessageRequest .class , DeleteMessageResponse .class , DeleteMessageBatchResponse .class )
68+             .batchFunction (SqsBatchFunctions .deleteMessageBatchAsyncFunction (client ))
69+             .responseMapper (SqsBatchFunctions .deleteMessageResponseMapper ())
70+             .batchKeyMapper (SqsBatchFunctions .deleteMessageBatchKeyMapper ())
71+             .overrideConfiguration (deleteMessageConfig (builder .overrideConfiguration )).scheduledExecutor (scheduledExecutor )
72+             .build ();
73+         this .changeMessageVisibilityBatchManager  = BatchManager 
74+             .requestBatchManagerBuilder (ChangeMessageVisibilityRequest .class , ChangeMessageVisibilityResponse .class ,
75+                      ChangeMessageVisibilityBatchResponse .class )
76+             .batchFunction (SqsBatchFunctions .changeMessageVisibilityBatchAsyncFunction (client ))
77+             .responseMapper (SqsBatchFunctions .changeMessageVisibilityResponseMapper ())
78+             .batchKeyMapper (SqsBatchFunctions .changeMessageVisibilityBatchKeyMapper ())
79+             .overrideConfiguration (changeMessageVisibilityConfig (builder .overrideConfiguration ))
80+             .scheduledExecutor (scheduledExecutor ).build ();
81+ 
82+         //TODO : this will be updated while implementing the Receive Message Batch Manager 
6383        receiveMessageBatchManager  = null ;
6484    }
6585
86+ 
87+     @ SdkTestInternalApi 
88+     public  DefaultSqsAsyncBatchManager (
89+         SqsAsyncClient  client ,
90+         BatchManager <SendMessageRequest , SendMessageResponse , SendMessageBatchResponse > sendMessageBatchManager ,
91+         BatchManager <DeleteMessageRequest , DeleteMessageResponse , DeleteMessageBatchResponse > deleteMessageBatchManager ,
92+         BatchManager <ChangeMessageVisibilityRequest , ChangeMessageVisibilityResponse ,
93+             ChangeMessageVisibilityBatchResponse > changeMessageVisibilityBatchManager ) {
94+         this .sendMessageBatchManager  = sendMessageBatchManager ;
95+         this .deleteMessageBatchManager  = deleteMessageBatchManager ;
96+         this .changeMessageVisibilityBatchManager  = changeMessageVisibilityBatchManager ;
97+         receiveMessageBatchManager  = null ;
98+         this .client  = client ;
99+     }
100+ 
66101    @ Override 
67102    public  CompletableFuture <SendMessageResponse > sendMessage (SendMessageRequest  request ) {
68-         return  SqsAsyncBatchManager . super . sendMessage (request );
103+         return  sendMessageBatchManager . batchRequest (request );
69104    }
70105
71106    @ Override 
72107    public  CompletableFuture <DeleteMessageResponse > deleteMessage (DeleteMessageRequest  request ) {
73-         return  SqsAsyncBatchManager . super . deleteMessage (request );
108+         return  deleteMessageBatchManager . batchRequest (request );
74109    }
75110
76111    @ Override 
77112    public  CompletableFuture <ChangeMessageVisibilityResponse > changeMessageVisibility (ChangeMessageVisibilityRequest  request ) {
78-         return  SqsAsyncBatchManager . super . changeMessageVisibility (request );
113+         return  changeMessageVisibilityBatchManager . batchRequest (request );
79114    }
80115
81116    @ Override 
82117    public  CompletableFuture <ReceiveMessageResponse > receiveMessage (ReceiveMessageRequest  request ) {
83-         return  SqsAsyncBatchManager . super . receiveMessage (request );
118+         return  receiveMessageBatchManager . batchRequest (request );
84119    }
85120
86121    public  static  SqsAsyncBatchManager .Builder  builder () {
@@ -89,6 +124,33 @@ public static SqsAsyncBatchManager.Builder builder() {
89124
90125    @ Override 
91126    public  void  close () {
127+         sendMessageBatchManager .close ();
128+         deleteMessageBatchManager .close ();
129+         changeMessageVisibilityBatchManager .close ();
130+     }
131+ 
132+     private  BatchOverrideConfiguration  createConfig (BatchOverrideConfiguration  overrideConfiguration ) {
133+         BatchOverrideConfiguration .Builder  config  = BatchOverrideConfiguration .builder ();
134+         if  (overrideConfiguration  == null ) {
135+             config .maxBatchItems (10 );
136+             config .maxBatchOpenInMs (Duration .ofMillis (200 ));
137+         } else  {
138+             config .maxBatchItems (overrideConfiguration .maxBatchItems ().orElse (10 ));
139+             config .maxBatchOpenInMs (overrideConfiguration .maxBatchOpenInMs ().orElse (Duration .ofMillis (200 )));
140+         }
141+         return  config .build ();
142+     }
143+ 
144+     private  BatchOverrideConfiguration  sendMessageConfig (BatchOverrideConfiguration  overrideConfiguration ) {
145+         return  createConfig (overrideConfiguration );
146+     }
147+ 
148+     private  BatchOverrideConfiguration  deleteMessageConfig (BatchOverrideConfiguration  overrideConfiguration ) {
149+         return  createConfig (overrideConfiguration );
150+     }
151+ 
152+     private  BatchOverrideConfiguration  changeMessageVisibilityConfig (BatchOverrideConfiguration  overrideConfiguration ) {
153+         return  createConfig (overrideConfiguration );
92154    }
93155
94156    public  static  final  class  DefaultBuilder  implements  SqsAsyncBatchManager .Builder  {
0 commit comments