1010import com .datastax .oss .driver .api .core .CqlSessionBuilder ;
1111import com .datastax .oss .driver .api .core .config .DefaultDriverOption ;
1212import com .datastax .oss .driver .api .core .config .DriverConfigLoader ;
13- import com .datastax .oss .driver .api .core .session . Session ;
13+ import com .datastax .oss .driver .api .core .metadata . Node ;
1414import com .datastax .oss .driver .api .testinfra .ScyllaOnly ;
1515import com .datastax .oss .driver .api .testinfra .ccm .CustomCcmRule ;
1616import com .datastax .oss .driver .api .testinfra .session .SessionUtils ;
1717import com .datastax .oss .driver .internal .core .pool .ChannelPool ;
18+ import com .datastax .oss .driver .internal .core .session .DefaultSession ;
1819import com .datastax .oss .driver .internal .core .util .concurrent .CompletableFutures ;
1920import com .datastax .oss .driver .internal .core .util .concurrent .Reconnection ;
21+ import com .google .common .collect .ImmutableList ;
2022import com .google .common .collect .ImmutableMap ;
2123import com .google .common .collect .ImmutableSet ;
22- import com .google .common .util .concurrent .Uninterruptibles ;
2324import com .tngtech .java .junit .dataprovider .DataProvider ;
2425import com .tngtech .java .junit .dataprovider .DataProviderRunner ;
2526import com .tngtech .java .junit .dataprovider .UseDataProvider ;
2627import java .net .InetSocketAddress ;
2728import java .time .Duration ;
29+ import java .util .Arrays ;
30+ import java .util .Collections ;
2831import java .util .List ;
2932import java .util .Map ;
3033import java .util .Set ;
3134import java .util .concurrent .CompletionStage ;
3235import java .util .concurrent .TimeUnit ;
3336import java .util .regex .Pattern ;
37+ import org .awaitility .Awaitility ;
3438import org .junit .After ;
3539import org .junit .Before ;
3640import org .junit .ClassRule ;
@@ -92,11 +96,14 @@ public void stopCapturingLogs() {
9296 @ Test
9397 @ UseDataProvider ("reuseAddressOption" )
9498 public void should_initialize_all_channels (boolean reuseAddress ) {
99+ int poolLocalSizeSetting = 4 ; // Will round up to 6 due to not being divisible by 3 shards
100+ int expectedChannelsPerNode = 6 ;
95101 Map <Pattern , Integer > expectedOccurences =
96102 ImmutableMap .of (
97103 Pattern .compile (".*\\ .2:19042.*Reconnection attempt complete, 6/6 channels.*" ), 1 ,
98104 Pattern .compile (".*\\ .1:19042.*Reconnection attempt complete, 6/6 channels.*" ), 1 ,
99- Pattern .compile (".*Reconnection attempt complete.*" ), 2 ,
105+ // Temporarily commented out because hanging sessions from other tests pollute logs
106+ // Pattern.compile(".*Reconnection attempt complete.*"), 2,
100107 Pattern .compile (".*\\ .1:19042.*New channel added \\ [.*" ), 5 ,
101108 Pattern .compile (".*\\ .2:19042.*New channel added \\ [.*" ), 5 ,
102109 Pattern .compile (".*\\ .1:19042\\ ] Trying to create 5 missing channels.*" ), 1 ,
@@ -107,52 +114,63 @@ public void should_initialize_all_channels(boolean reuseAddress) {
107114 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , true )
108115 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_LOW , 10000 )
109116 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_HIGH , 60000 )
110- // Due to rounding up the connections per shard this will result in 6 connections per
111- // node
112- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 4 )
117+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , poolLocalSizeSetting )
113118 .build ();
114- try (Session session =
119+ try (CqlSession session =
115120 CqlSession .builder ()
116121 .addContactPoint (
117122 new InetSocketAddress (CCM_RULE .getCcmBridge ().getNodeIpAddress (1 ), 19042 ))
118123 .withConfigLoader (loader )
119124 .build ()) {
120- Uninterruptibles .sleepUninterruptibly (1 , TimeUnit .SECONDS );
125+ List <CqlSession > allSessions = Collections .singletonList (session );
126+ Awaitility .await ()
127+ .atMost (20 , TimeUnit .SECONDS )
128+ .pollInterval (500 , TimeUnit .MILLISECONDS )
129+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode ));
130+ List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
121131 expectedOccurences .forEach (
122- (pattern , times ) -> assertMatchesExactly (pattern , times , appender . list ));
123- forbiddenOccurences .forEach (pattern -> assertNoLogMatches (pattern , appender . list ));
132+ (pattern , times ) -> assertMatchesExactly (pattern , times , logsCopy ));
133+ forbiddenOccurences .forEach (pattern -> assertNoLogMatches (pattern , logsCopy ));
124134 }
125135 }
126136
127137 @ Test
128138 public void should_see_mismatched_shard () {
139+ int expectedChannelsPerNode = 33 ;
129140 DriverConfigLoader loader =
130141 SessionUtils .configLoaderBuilder ()
131142 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , true )
132143 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_LOW , 10000 )
133144 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_HIGH , 60000 )
134- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 64 )
145+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , expectedChannelsPerNode )
135146 .build ();
136- try (Session session =
147+ try (CqlSession session =
137148 CqlSession .builder ()
138149 .addContactPoint (
139150 new InetSocketAddress (CCM_RULE .getCcmBridge ().getNodeIpAddress (1 ), 9042 ))
140151 .withConfigLoader (loader )
141152 .build ()) {
142- Uninterruptibles .sleepUninterruptibly (1 , TimeUnit .SECONDS );
143- assertMatchesAtLeast (shardMismatchPattern , 5 , appender .list );
153+ List <CqlSession > allSessions = Collections .singletonList (session );
154+ Awaitility .await ()
155+ .atMost (20 , TimeUnit .SECONDS )
156+ .pollInterval (500 , TimeUnit .MILLISECONDS )
157+ // Waits until 2/3rds are initialized instead of all. It does not matter here.
158+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode * 2 / 3 ));
159+ List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
160+ assertMatchesAtLeast (shardMismatchPattern , 5 , logsCopy );
144161 }
145162 }
146163
147164 // There is no need to run this as a test, but it serves as a comparison
148165 @ SuppressWarnings ("unused" )
149166 public void should_struggle_to_fill_pools () {
167+ int expectedChannelsPerNode = 33 ; // Divisible by number of shards
150168 DriverConfigLoader loader =
151169 SessionUtils .configLoaderBuilder ()
152170 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , false )
153- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 64 )
154- .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofMillis ( 200 ))
155- .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofMillis ( 4000 ))
171+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , expectedChannelsPerNode )
172+ .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofSeconds ( 1 ))
173+ .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofSeconds ( 20 ))
156174 .build ();
157175 CqlSessionBuilder builder =
158176 CqlSession .builder ()
@@ -167,20 +185,26 @@ public void should_struggle_to_fill_pools() {
167185 CqlSession session2 = CompletableFutures .getUninterruptibly (stage2 );
168186 CqlSession session3 = CompletableFutures .getUninterruptibly (stage3 );
169187 CqlSession session4 = CompletableFutures .getUninterruptibly (stage4 ); ) {
170- Uninterruptibles .sleepUninterruptibly (20 , TimeUnit .SECONDS );
171- assertNoLogMatches (shardMismatchPattern , appender .list );
172- assertMatchesAtLeast (reconnectionPattern , 8 , appender .list );
188+ List <CqlSession > allSessions = Arrays .asList (session1 , session2 , session3 , session4 );
189+ Awaitility .await ()
190+ .atMost (20 , TimeUnit .SECONDS )
191+ .pollInterval (500 , TimeUnit .MILLISECONDS )
192+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode ));
193+ List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
194+ assertNoLogMatches (shardMismatchPattern , logsCopy );
195+ assertMatchesAtLeast (reconnectionPattern , 8 , logsCopy );
173196 }
174197 }
175198
176199 @ Test
177200 public void should_not_struggle_to_fill_pools () {
201+ int expectedChannelsPerNode = 33 ; // Divisible by number of shards
178202 DriverConfigLoader loader =
179203 SessionUtils .configLoaderBuilder ()
180204 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , true )
181- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 66 )
182- .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofMillis ( 10 ))
183- .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofMillis (20 ))
205+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , expectedChannelsPerNode )
206+ .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofSeconds ( 1 ))
207+ .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofSeconds (20 ))
184208 .build ();
185209 CqlSessionBuilder builder =
186210 CqlSession .builder ()
@@ -196,25 +220,54 @@ public void should_not_struggle_to_fill_pools() {
196220 CqlSession session2 = CompletableFutures .getUninterruptibly (stage2 );
197221 CqlSession session3 = CompletableFutures .getUninterruptibly (stage3 );
198222 CqlSession session4 = CompletableFutures .getUninterruptibly (stage4 ); ) {
199- Uninterruptibles .sleepUninterruptibly (8 , TimeUnit .SECONDS );
200- int tolerance = 2 ; // Sometimes socket ends up already in use
223+
224+ List <CqlSession > allSessions = Arrays .asList (session1 , session2 , session3 , session4 );
225+ Awaitility .await ()
226+ .atMost (20 , TimeUnit .SECONDS )
227+ .pollInterval (500 , TimeUnit .MILLISECONDS )
228+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode ));
229+
230+ int tolerance = 4 ; // Sometimes socket ends up already in use
201231 Map <Pattern , Integer > expectedOccurences =
202232 ImmutableMap .of (
203- Pattern .compile (".*\\ .2:19042.*Reconnection attempt complete, 66/66 channels.*" ),
233+ Pattern .compile (".*\\ .2:19042.*Reconnection attempt complete, 33/33 channels.*" ),
204234 1 * sessions ,
205- Pattern .compile (".*\\ .1:19042.*Reconnection attempt complete, 66/66 channels.*" ),
235+ Pattern .compile (".*\\ .1:19042.*Reconnection attempt complete, 33/33 channels.*" ),
206236 1 * sessions ,
207- Pattern .compile (".*Reconnection attempt complete.*" ), 2 * sessions ,
208- Pattern .compile (".*.1:19042.*New channel added \\ [.*" ), 65 * sessions - tolerance ,
209- Pattern .compile (".*.2:19042.*New channel added \\ [.*" ), 65 * sessions - tolerance ,
210- Pattern .compile (".*.1:19042\\ ] Trying to create 65 missing channels.*" ), 1 * sessions ,
211- Pattern .compile (".*.2:19042\\ ] Trying to create 65 missing channels.*" ),
237+ // Temporarily commented out because hanging sessions from other tests pollute logs
238+ // Pattern.compile(".*Reconnection attempt complete.*"), 2 * sessions,
239+ Pattern .compile (".*.1:19042.*New channel added \\ [.*" ), 32 * sessions - tolerance ,
240+ Pattern .compile (".*.2:19042.*New channel added \\ [.*" ), 32 * sessions - tolerance ,
241+ Pattern .compile (".*.1:19042\\ ] Trying to create 32 missing channels.*" ), 1 * sessions ,
242+ Pattern .compile (".*.2:19042\\ ] Trying to create 32 missing channels.*" ),
212243 1 * sessions );
244+ List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
213245 expectedOccurences .forEach (
214- (pattern , times ) -> assertMatchesAtLeast (pattern , times , appender .list ));
215- assertNoLogMatches (shardMismatchPattern , appender .list );
216- assertMatchesAtMost (reconnectionPattern , tolerance , appender .list );
246+ (pattern , times ) -> assertMatchesAtLeast (pattern , times , logsCopy ));
247+ assertNoLogMatches (shardMismatchPattern , logsCopy );
248+ assertMatchesAtMost (reconnectionPattern , tolerance , logsCopy );
249+ }
250+ }
251+
252+ private boolean areAllPoolsFullyInitialized (
253+ List <CqlSession > sessions , int expectedChannelsPerNode ) {
254+ for (CqlSession session : sessions ) {
255+ DefaultSession defaultSession = (DefaultSession ) session ;
256+ Map <Node , ChannelPool > pools = defaultSession .getPools ();
257+ if (pools == null || pools .isEmpty ()) {
258+ return false ;
259+ }
260+
261+ for (ChannelPool pool : pools .values ()) {
262+ if (pool == null ) {
263+ return false ;
264+ }
265+ if (pool .size () < expectedChannelsPerNode ) {
266+ return false ;
267+ }
268+ }
217269 }
270+ return true ;
218271 }
219272
220273 private void assertNoLogMatches (Pattern pattern , List <ILoggingEvent > logs ) {
0 commit comments