1010import com .datastax .oss .driver .api .core .CqlSessionBuilder ;
1111import com .datastax .oss .driver .api .core .config .DefaultDriverOption ;
1212import com .datastax .oss .driver .api .core .config .DriverConfigLoader ;
13- import com .datastax .oss .driver .api .core .session . Session ;
13+ import com .datastax .oss .driver .api .core .metadata . Node ;
1414import com .datastax .oss .driver .api .testinfra .ScyllaOnly ;
1515import com .datastax .oss .driver .api .testinfra .ccm .CustomCcmRule ;
1616import com .datastax .oss .driver .api .testinfra .session .SessionUtils ;
1717import com .datastax .oss .driver .internal .core .pool .ChannelPool ;
18+ import com .datastax .oss .driver .internal .core .session .DefaultSession ;
1819import com .datastax .oss .driver .internal .core .util .concurrent .CompletableFutures ;
1920import com .datastax .oss .driver .internal .core .util .concurrent .Reconnection ;
21+ import com .google .common .collect .ImmutableList ;
2022import com .google .common .collect .ImmutableMap ;
2123import com .google .common .collect .ImmutableSet ;
22- import com .google .common .util .concurrent .Uninterruptibles ;
2324import com .tngtech .java .junit .dataprovider .DataProvider ;
2425import com .tngtech .java .junit .dataprovider .DataProviderRunner ;
2526import com .tngtech .java .junit .dataprovider .UseDataProvider ;
2627import java .net .InetSocketAddress ;
2728import java .time .Duration ;
29+ import java .util .Arrays ;
30+ import java .util .Collections ;
2831import java .util .List ;
2932import java .util .Map ;
3033import java .util .Set ;
3134import java .util .concurrent .CompletionStage ;
3235import java .util .concurrent .TimeUnit ;
3336import java .util .regex .Pattern ;
37+ import org .awaitility .Awaitility ;
3438import org .junit .After ;
3539import org .junit .Before ;
3640import org .junit .ClassRule ;
@@ -92,6 +96,8 @@ public void stopCapturingLogs() {
9296 @ Test
9397 @ UseDataProvider ("reuseAddressOption" )
9498 public void should_initialize_all_channels (boolean reuseAddress ) {
99+ int poolLocalSizeSetting = 4 ; // Will round up to 6 due to not being divisible by 3 shards
100+ int expectedChannelsPerNode = 6 ;
95101 Map <Pattern , Integer > expectedOccurences =
96102 ImmutableMap .of (
97103 Pattern .compile (".*\\ .2:19042.*Reconnection attempt complete, 6/6 channels.*" ), 1 ,
@@ -107,52 +113,63 @@ public void should_initialize_all_channels(boolean reuseAddress) {
107113 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , true )
108114 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_LOW , 10000 )
109115 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_HIGH , 60000 )
110- // Due to rounding up the connections per shard this will result in 6 connections per
111- // node
112- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 4 )
116+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , poolLocalSizeSetting )
113117 .build ();
114- try (Session session =
118+ try (CqlSession session =
115119 CqlSession .builder ()
116120 .addContactPoint (
117121 new InetSocketAddress (CCM_RULE .getCcmBridge ().getNodeIpAddress (1 ), 19042 ))
118122 .withConfigLoader (loader )
119123 .build ()) {
120- Uninterruptibles .sleepUninterruptibly (1 , TimeUnit .SECONDS );
124+ List <CqlSession > allSessions = Collections .singletonList (session );
125+ Awaitility .await ()
126+ .atMost (20 , TimeUnit .SECONDS )
127+ .pollInterval (500 , TimeUnit .MILLISECONDS )
128+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode ));
129+ List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
121130 expectedOccurences .forEach (
122- (pattern , times ) -> assertMatchesExactly (pattern , times , appender . list ));
123- forbiddenOccurences .forEach (pattern -> assertNoLogMatches (pattern , appender . list ));
131+ (pattern , times ) -> assertMatchesExactly (pattern , times , logsCopy ));
132+ forbiddenOccurences .forEach (pattern -> assertNoLogMatches (pattern , logsCopy ));
124133 }
125134 }
126135
127136 @ Test
128137 public void should_see_mismatched_shard () {
138+ int expectedChannelsPerNode = 33 ;
129139 DriverConfigLoader loader =
130140 SessionUtils .configLoaderBuilder ()
131141 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , true )
132142 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_LOW , 10000 )
133143 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_HIGH , 60000 )
134- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 64 )
144+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , expectedChannelsPerNode )
135145 .build ();
136- try (Session session =
146+ try (CqlSession session =
137147 CqlSession .builder ()
138148 .addContactPoint (
139149 new InetSocketAddress (CCM_RULE .getCcmBridge ().getNodeIpAddress (1 ), 9042 ))
140150 .withConfigLoader (loader )
141151 .build ()) {
142- Uninterruptibles .sleepUninterruptibly (1 , TimeUnit .SECONDS );
143- assertMatchesAtLeast (shardMismatchPattern , 5 , appender .list );
152+ List <CqlSession > allSessions = Collections .singletonList (session );
153+ Awaitility .await ()
154+ .atMost (20 , TimeUnit .SECONDS )
155+ .pollInterval (500 , TimeUnit .MILLISECONDS )
156+ // Waits until 2/3rds are initialized instead of all. It does not matter here.
157+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode * 2 / 3 ));
158+ List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
159+ assertMatchesAtLeast (shardMismatchPattern , 5 , logsCopy );
144160 }
145161 }
146162
147163 // There is no need to run this as a test, but it serves as a comparison
148164 @ SuppressWarnings ("unused" )
149165 public void should_struggle_to_fill_pools () {
166+ int expectedChannelsPerNode = 33 ; // Divisible by number of shards
150167 DriverConfigLoader loader =
151168 SessionUtils .configLoaderBuilder ()
152169 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , false )
153- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 64 )
154- .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofMillis ( 200 ))
155- .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofMillis ( 4000 ))
170+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , expectedChannelsPerNode )
171+ .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofSeconds ( 1 ))
172+ .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofSeconds ( 20 ))
156173 .build ();
157174 CqlSessionBuilder builder =
158175 CqlSession .builder ()
@@ -167,20 +184,26 @@ public void should_struggle_to_fill_pools() {
167184 CqlSession session2 = CompletableFutures .getUninterruptibly (stage2 );
168185 CqlSession session3 = CompletableFutures .getUninterruptibly (stage3 );
169186 CqlSession session4 = CompletableFutures .getUninterruptibly (stage4 ); ) {
170- Uninterruptibles .sleepUninterruptibly (20 , TimeUnit .SECONDS );
171- assertNoLogMatches (shardMismatchPattern , appender .list );
172- assertMatchesAtLeast (reconnectionPattern , 8 , appender .list );
187+ List <CqlSession > allSessions = Arrays .asList (session1 , session2 , session3 , session4 );
188+ Awaitility .await ()
189+ .atMost (20 , TimeUnit .SECONDS )
190+ .pollInterval (500 , TimeUnit .MILLISECONDS )
191+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode ));
192+ List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
193+ assertNoLogMatches (shardMismatchPattern , logsCopy );
194+ assertMatchesAtLeast (reconnectionPattern , 8 , logsCopy );
173195 }
174196 }
175197
176198 @ Test
177199 public void should_not_struggle_to_fill_pools () {
200+ int expectedChannelsPerNode = 33 ; // Divisible by number of shards
178201 DriverConfigLoader loader =
179202 SessionUtils .configLoaderBuilder ()
180203 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , true )
181- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 66 )
182- .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofMillis ( 10 ))
183- .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofMillis (20 ))
204+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , expectedChannelsPerNode )
205+ .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofSeconds ( 1 ))
206+ .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofSeconds (20 ))
184207 .build ();
185208 CqlSessionBuilder builder =
186209 CqlSession .builder ()
@@ -196,25 +219,47 @@ public void should_not_struggle_to_fill_pools() {
196219 CqlSession session2 = CompletableFutures .getUninterruptibly (stage2 );
197220 CqlSession session3 = CompletableFutures .getUninterruptibly (stage3 );
198221 CqlSession session4 = CompletableFutures .getUninterruptibly (stage4 ); ) {
199- Uninterruptibles .sleepUninterruptibly (8 , TimeUnit .SECONDS );
200- int tolerance = 2 ; // Sometimes socket ends up already in use
222+
223+ List <CqlSession > allSessions = Arrays .asList (session1 , session2 , session3 , session4 );
224+ Awaitility .await ()
225+ .atMost (20 , TimeUnit .SECONDS )
226+ .pollInterval (500 , TimeUnit .MILLISECONDS )
227+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode ));
228+
229+ int tolerance = 4 ; // Sometimes socket ends up already in use
201230 Map <Pattern , Integer > expectedOccurences =
202231 ImmutableMap .of (
203- Pattern .compile (".*\\ .2:19042.*Reconnection attempt complete, 66/66 channels.*" ),
232+ Pattern .compile (".*\\ .2:19042.*Reconnection attempt complete, 33/33 channels.*" ),
204233 1 * sessions ,
205- Pattern .compile (".*\\ .1:19042.*Reconnection attempt complete, 66/66 channels.*" ),
234+ Pattern .compile (".*\\ .1:19042.*Reconnection attempt complete, 33/33 channels.*" ),
206235 1 * sessions ,
207236 Pattern .compile (".*Reconnection attempt complete.*" ), 2 * sessions ,
208- Pattern .compile (".*.1:19042.*New channel added \\ [.*" ), 65 * sessions - tolerance ,
209- Pattern .compile (".*.2:19042.*New channel added \\ [.*" ), 65 * sessions - tolerance ,
210- Pattern .compile (".*.1:19042\\ ] Trying to create 65 missing channels.*" ), 1 * sessions ,
211- Pattern .compile (".*.2:19042\\ ] Trying to create 65 missing channels.*" ),
237+ Pattern .compile (".*.1:19042.*New channel added \\ [.*" ), 32 * sessions - tolerance ,
238+ Pattern .compile (".*.2:19042.*New channel added \\ [.*" ), 32 * sessions - tolerance ,
239+ Pattern .compile (".*.1:19042\\ ] Trying to create 32 missing channels.*" ), 1 * sessions ,
240+ Pattern .compile (".*.2:19042\\ ] Trying to create 32 missing channels.*" ),
212241 1 * sessions );
242+ List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
213243 expectedOccurences .forEach (
214- (pattern , times ) -> assertMatchesAtLeast (pattern , times , appender .list ));
215- assertNoLogMatches (shardMismatchPattern , appender .list );
216- assertMatchesAtMost (reconnectionPattern , tolerance , appender .list );
244+ (pattern , times ) -> assertMatchesAtLeast (pattern , times , logsCopy ));
245+ assertNoLogMatches (shardMismatchPattern , logsCopy );
246+ assertMatchesAtMost (reconnectionPattern , tolerance , logsCopy );
247+ }
248+ }
249+
250+ private boolean areAllPoolsFullyInitialized (
251+ List <CqlSession > sessions , int expectedChannelsPerNode ) {
252+ for (CqlSession session : sessions ) {
253+ DefaultSession defaultSession = (DefaultSession ) session ;
254+ Map <Node , ChannelPool > pools = defaultSession .getPools ();
255+
256+ for (ChannelPool pool : pools .values ()) {
257+ if (pool .size () < expectedChannelsPerNode ) {
258+ return false ;
259+ }
260+ }
217261 }
262+ return true ;
218263 }
219264
220265 private void assertNoLogMatches (Pattern pattern , List <ILoggingEvent > logs ) {
0 commit comments