1010import com .datastax .oss .driver .api .core .CqlSessionBuilder ;
1111import com .datastax .oss .driver .api .core .config .DefaultDriverOption ;
1212import com .datastax .oss .driver .api .core .config .DriverConfigLoader ;
13- import com .datastax .oss .driver .api .core .session . Session ;
13+ import com .datastax .oss .driver .api .core .metadata . Node ;
1414import com .datastax .oss .driver .api .testinfra .ScyllaOnly ;
1515import com .datastax .oss .driver .api .testinfra .ccm .CustomCcmRule ;
1616import com .datastax .oss .driver .api .testinfra .session .SessionUtils ;
1717import com .datastax .oss .driver .internal .core .pool .ChannelPool ;
18+ import com .datastax .oss .driver .internal .core .session .DefaultSession ;
1819import com .datastax .oss .driver .internal .core .util .concurrent .CompletableFutures ;
1920import com .datastax .oss .driver .internal .core .util .concurrent .Reconnection ;
21+ import com .google .common .collect .ImmutableList ;
2022import com .google .common .collect .ImmutableMap ;
2123import com .google .common .collect .ImmutableSet ;
22- import com .google .common .util .concurrent .Uninterruptibles ;
2324import com .tngtech .java .junit .dataprovider .DataProvider ;
2425import com .tngtech .java .junit .dataprovider .DataProviderRunner ;
2526import com .tngtech .java .junit .dataprovider .UseDataProvider ;
2627import java .net .InetSocketAddress ;
2728import java .time .Duration ;
29+ import java .util .Arrays ;
30+ import java .util .Collections ;
2831import java .util .List ;
2932import java .util .Map ;
3033import java .util .Set ;
3134import java .util .concurrent .CompletionStage ;
3235import java .util .concurrent .TimeUnit ;
3336import java .util .regex .Pattern ;
37+ import org .awaitility .Awaitility ;
3438import org .junit .After ;
3539import org .junit .Before ;
3640import org .junit .ClassRule ;
@@ -92,67 +96,99 @@ public void stopCapturingLogs() {
9296 @ Test
9397 @ UseDataProvider ("reuseAddressOption" )
9498 public void should_initialize_all_channels (boolean reuseAddress ) {
99+ int poolLocalSizeSetting = 4 ; // Will round up to 6 due to not being divisible by 3 shards
100+ int expectedChannelsPerNode = 6 ;
101+ String node1 = CCM_RULE .getCcmBridge ().getNodeIpAddress (1 );
102+ String node2 = CCM_RULE .getCcmBridge ().getNodeIpAddress (2 );
95103 Map <Pattern , Integer > expectedOccurences =
96104 ImmutableMap .of (
97- Pattern .compile (".*\\ .2:19042.*Reconnection attempt complete, 6/6 channels.*" ), 1 ,
98- Pattern .compile (".*\\ .1:19042.*Reconnection attempt complete, 6/6 channels.*" ), 1 ,
99- Pattern .compile (".*Reconnection attempt complete.*" ), 2 ,
100- Pattern .compile (".*\\ .1:19042.*New channel added \\ [.*" ), 5 ,
101- Pattern .compile (".*\\ .2:19042.*New channel added \\ [.*" ), 5 ,
102- Pattern .compile (".*\\ .1:19042\\ ] Trying to create 5 missing channels.*" ), 1 ,
103- Pattern .compile (".*\\ .2:19042\\ ] Trying to create 5 missing channels.*" ), 1 );
105+ Pattern .compile (
106+ ".*"
107+ + Pattern .quote (node1 )
108+ + ":19042.*Reconnection attempt complete, 6/6 channels.*" ),
109+ 1 ,
110+ Pattern .compile (
111+ ".*"
112+ + Pattern .quote (node2 )
113+ + ":19042.*Reconnection attempt complete, 6/6 channels.*" ),
114+ 1 ,
115+ // Temporarily commented out because hanging sessions from other tests pollute logs
116+ // Pattern.compile(".*Reconnection attempt complete.*"), 2,
117+ Pattern .compile (".*" + Pattern .quote (node1 ) + ":19042.*New channel added \\ [.*" ), 5 ,
118+ Pattern .compile (".*" + Pattern .quote (node2 ) + ":19042.*New channel added \\ [.*" ), 5 ,
119+ Pattern .compile (
120+ ".*"
121+ + Pattern .quote (node1 )
122+ + ":19042\\ ] Trying to create 5 missing channels.*" ),
123+ 1 ,
124+ Pattern .compile (
125+ ".*"
126+ + Pattern .quote (node2 )
127+ + ":19042\\ ] Trying to create 5 missing channels.*" ),
128+ 1 );
104129 DriverConfigLoader loader =
105130 SessionUtils .configLoaderBuilder ()
106131 .withBoolean (DefaultDriverOption .SOCKET_REUSE_ADDRESS , reuseAddress )
107132 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , true )
108133 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_LOW , 10000 )
109134 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_HIGH , 60000 )
110- // Due to rounding up the connections per shard this will result in 6 connections per
111- // node
112- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 4 )
135+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , poolLocalSizeSetting )
113136 .build ();
114- try (Session session =
137+ try (CqlSession session =
115138 CqlSession .builder ()
116139 .addContactPoint (
117140 new InetSocketAddress (CCM_RULE .getCcmBridge ().getNodeIpAddress (1 ), 19042 ))
118141 .withConfigLoader (loader )
119142 .build ()) {
120- Uninterruptibles .sleepUninterruptibly (1 , TimeUnit .SECONDS );
143+ List <CqlSession > allSessions = Collections .singletonList (session );
144+ Awaitility .await ()
145+ .atMost (20 , TimeUnit .SECONDS )
146+ .pollInterval (500 , TimeUnit .MILLISECONDS )
147+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode ));
148+ List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
121149 expectedOccurences .forEach (
122- (pattern , times ) -> assertMatchesExactly (pattern , times , appender . list ));
123- forbiddenOccurences .forEach (pattern -> assertNoLogMatches (pattern , appender . list ));
150+ (pattern , times ) -> assertMatchesExactly (pattern , times , logsCopy ));
151+ forbiddenOccurences .forEach (pattern -> assertNoLogMatches (pattern , logsCopy ));
124152 }
125153 }
126154
127155 @ Test
128156 public void should_see_mismatched_shard () {
157+ int expectedChannelsPerNode = 33 ;
129158 DriverConfigLoader loader =
130159 SessionUtils .configLoaderBuilder ()
131160 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , true )
132161 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_LOW , 10000 )
133162 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_HIGH , 60000 )
134- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 64 )
163+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , expectedChannelsPerNode )
135164 .build ();
136- try (Session session =
165+ try (CqlSession session =
137166 CqlSession .builder ()
138167 .addContactPoint (
139168 new InetSocketAddress (CCM_RULE .getCcmBridge ().getNodeIpAddress (1 ), 9042 ))
140169 .withConfigLoader (loader )
141170 .build ()) {
142- Uninterruptibles .sleepUninterruptibly (1 , TimeUnit .SECONDS );
143- assertMatchesAtLeast (shardMismatchPattern , 5 , appender .list );
171+ List <CqlSession > allSessions = Collections .singletonList (session );
172+ Awaitility .await ()
173+ .atMost (20 , TimeUnit .SECONDS )
174+ .pollInterval (500 , TimeUnit .MILLISECONDS )
175+ // Waits until 2/3rds are initialized instead of all. It does not matter here.
176+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode * 2 / 3 ));
177+ List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
178+ assertMatchesAtLeast (shardMismatchPattern , 5 , logsCopy );
144179 }
145180 }
146181
147182 // There is no need to run this as a test, but it serves as a comparison
148183 @ SuppressWarnings ("unused" )
149184 public void should_struggle_to_fill_pools () {
185+ int expectedChannelsPerNode = 33 ; // Divisible by number of shards
150186 DriverConfigLoader loader =
151187 SessionUtils .configLoaderBuilder ()
152188 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , false )
153- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 64 )
154- .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofMillis ( 200 ))
155- .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofMillis ( 4000 ))
189+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , expectedChannelsPerNode )
190+ .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofSeconds ( 1 ))
191+ .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofSeconds ( 20 ))
156192 .build ();
157193 CqlSessionBuilder builder =
158194 CqlSession .builder ()
@@ -167,20 +203,26 @@ public void should_struggle_to_fill_pools() {
167203 CqlSession session2 = CompletableFutures .getUninterruptibly (stage2 );
168204 CqlSession session3 = CompletableFutures .getUninterruptibly (stage3 );
169205 CqlSession session4 = CompletableFutures .getUninterruptibly (stage4 ); ) {
170- Uninterruptibles .sleepUninterruptibly (20 , TimeUnit .SECONDS );
171- assertNoLogMatches (shardMismatchPattern , appender .list );
172- assertMatchesAtLeast (reconnectionPattern , 8 , appender .list );
206+ List <CqlSession > allSessions = Arrays .asList (session1 , session2 , session3 , session4 );
207+ Awaitility .await ()
208+ .atMost (20 , TimeUnit .SECONDS )
209+ .pollInterval (500 , TimeUnit .MILLISECONDS )
210+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode ));
211+ List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
212+ assertNoLogMatches (shardMismatchPattern , logsCopy );
213+ assertMatchesAtLeast (reconnectionPattern , 8 , logsCopy );
173214 }
174215 }
175216
176217 @ Test
177218 public void should_not_struggle_to_fill_pools () {
219+ int expectedChannelsPerNode = 33 ; // Divisible by number of shards
178220 DriverConfigLoader loader =
179221 SessionUtils .configLoaderBuilder ()
180222 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , true )
181- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 66 )
182- .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofMillis ( 10 ))
183- .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofMillis (20 ))
223+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , expectedChannelsPerNode )
224+ .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofSeconds ( 1 ))
225+ .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofSeconds (20 ))
184226 .build ();
185227 CqlSessionBuilder builder =
186228 CqlSession .builder ()
@@ -196,25 +238,70 @@ public void should_not_struggle_to_fill_pools() {
196238 CqlSession session2 = CompletableFutures .getUninterruptibly (stage2 );
197239 CqlSession session3 = CompletableFutures .getUninterruptibly (stage3 );
198240 CqlSession session4 = CompletableFutures .getUninterruptibly (stage4 ); ) {
199- Uninterruptibles .sleepUninterruptibly (8 , TimeUnit .SECONDS );
200- int tolerance = 2 ; // Sometimes socket ends up already in use
241+
242+ List <CqlSession > allSessions = Arrays .asList (session1 , session2 , session3 , session4 );
243+ Awaitility .await ()
244+ .atMost (20 , TimeUnit .SECONDS )
245+ .pollInterval (500 , TimeUnit .MILLISECONDS )
246+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode ));
247+ String node1 = CCM_RULE .getCcmBridge ().getNodeIpAddress (1 );
248+ String node2 = CCM_RULE .getCcmBridge ().getNodeIpAddress (2 );
249+ int tolerance = 4 ; // Sometimes socket ends up already in use
201250 Map <Pattern , Integer > expectedOccurences =
202251 ImmutableMap .of (
203- Pattern .compile (".*\\ .2:19042.*Reconnection attempt complete, 66/66 channels.*" ),
252+ Pattern .compile (
253+ ".*"
254+ + Pattern .quote (node1 )
255+ + ":19042.*Reconnection attempt complete, 33/33 channels.*" ),
256+ 1 * sessions ,
257+ Pattern .compile (
258+ ".*"
259+ + Pattern .quote (node2 )
260+ + ":19042.*Reconnection attempt complete, 33/33 channels.*" ),
204261 1 * sessions ,
205- Pattern .compile (".*\\ .1:19042.*Reconnection attempt complete, 66/66 channels.*" ),
262+ // Temporarily commented out because hanging sessions from other tests pollute logs
263+ // Pattern.compile(".*Reconnection attempt complete.*"), 2 * sessions,
264+ Pattern .compile (".*" + Pattern .quote (node1 ) + ":19042.*New channel added \\ [.*" ),
265+ 32 * sessions - tolerance ,
266+ Pattern .compile (".*" + Pattern .quote (node2 ) + ":19042.*New channel added \\ [.*" ),
267+ 32 * sessions - tolerance ,
268+ Pattern .compile (
269+ ".*"
270+ + Pattern .quote (node1 )
271+ + ":19042\\ ] Trying to create 32 missing channels.*" ),
206272 1 * sessions ,
207- Pattern .compile (".*Reconnection attempt complete.*" ), 2 * sessions ,
208- Pattern .compile (".*.1:19042.*New channel added \\ [.*" ), 65 * sessions - tolerance ,
209- Pattern .compile (".*.2:19042.*New channel added \\ [.*" ), 65 * sessions - tolerance ,
210- Pattern .compile (".*.1:19042\\ ] Trying to create 65 missing channels.*" ), 1 * sessions ,
211- Pattern .compile (".*.2:19042\\ ] Trying to create 65 missing channels.*" ),
273+ Pattern .compile (
274+ ".*"
275+ + Pattern .quote (node2 )
276+ + ":19042\\ ] Trying to create 32 missing channels.*" ),
212277 1 * sessions );
278+ List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
213279 expectedOccurences .forEach (
214- (pattern , times ) -> assertMatchesAtLeast (pattern , times , appender .list ));
215- assertNoLogMatches (shardMismatchPattern , appender .list );
216- assertMatchesAtMost (reconnectionPattern , tolerance , appender .list );
280+ (pattern , times ) -> assertMatchesAtLeast (pattern , times , logsCopy ));
281+ assertNoLogMatches (shardMismatchPattern , logsCopy );
282+ assertMatchesAtMost (reconnectionPattern , tolerance , logsCopy );
283+ }
284+ }
285+
286+ private boolean areAllPoolsFullyInitialized (
287+ List <CqlSession > sessions , int expectedChannelsPerNode ) {
288+ for (CqlSession session : sessions ) {
289+ DefaultSession defaultSession = (DefaultSession ) session ;
290+ Map <Node , ChannelPool > pools = defaultSession .getPools ();
291+ if (pools == null || pools .isEmpty ()) {
292+ return false ;
293+ }
294+
295+ for (ChannelPool pool : pools .values ()) {
296+ if (pool == null ) {
297+ return false ;
298+ }
299+ if (pool .size () < expectedChannelsPerNode ) {
300+ return false ;
301+ }
302+ }
217303 }
304+ return true ;
218305 }
219306
220307 private void assertNoLogMatches (Pattern pattern , List <ILoggingEvent > logs ) {
0 commit comments