1010import com .datastax .oss .driver .api .core .CqlSessionBuilder ;
1111import com .datastax .oss .driver .api .core .config .DefaultDriverOption ;
1212import com .datastax .oss .driver .api .core .config .DriverConfigLoader ;
13- import com .datastax .oss .driver .api .core .session . Session ;
13+ import com .datastax .oss .driver .api .core .metadata . Node ;
1414import com .datastax .oss .driver .api .testinfra .ScyllaOnly ;
1515import com .datastax .oss .driver .api .testinfra .ccm .CustomCcmRule ;
1616import com .datastax .oss .driver .api .testinfra .session .SessionUtils ;
1717import com .datastax .oss .driver .internal .core .pool .ChannelPool ;
18+ import com .datastax .oss .driver .internal .core .session .DefaultSession ;
1819import com .datastax .oss .driver .internal .core .util .concurrent .CompletableFutures ;
1920import com .datastax .oss .driver .internal .core .util .concurrent .Reconnection ;
21+ import com .google .common .collect .ImmutableList ;
2022import com .google .common .collect .ImmutableMap ;
2123import com .google .common .collect .ImmutableSet ;
22- import com .google .common .util .concurrent .Uninterruptibles ;
2324import com .tngtech .java .junit .dataprovider .DataProvider ;
2425import com .tngtech .java .junit .dataprovider .DataProviderRunner ;
2526import com .tngtech .java .junit .dataprovider .UseDataProvider ;
2627import java .net .InetSocketAddress ;
2728import java .time .Duration ;
29+ import java .util .Arrays ;
30+ import java .util .Collections ;
2831import java .util .List ;
2932import java .util .Map ;
3033import java .util .Set ;
3134import java .util .concurrent .CompletionStage ;
3235import java .util .concurrent .TimeUnit ;
3336import java .util .regex .Pattern ;
37+ import org .awaitility .Awaitility ;
3438import org .junit .After ;
3539import org .junit .Before ;
3640import org .junit .ClassRule ;
@@ -55,9 +59,8 @@ public class AdvancedShardAwarenessIT {
5559 Level originalLevelReconnection ;
5660 private final Pattern shardMismatchPattern =
5761 Pattern .compile (".*r configuration of shard aware port.*" );
58- private final Pattern reconnectionPattern =
62+ private final Pattern generalReconnectionPattern =
5963 Pattern .compile (".*Scheduling next reconnection in.*" );
60- Set <Pattern > forbiddenOccurences = ImmutableSet .of (shardMismatchPattern , reconnectionPattern );
6164
6265 @ DataProvider
6366 public static Object [][] reuseAddressOption () {
@@ -92,67 +95,105 @@ public void stopCapturingLogs() {
9295 @ Test
9396 @ UseDataProvider ("reuseAddressOption" )
9497 public void should_initialize_all_channels (boolean reuseAddress ) {
98+ int poolLocalSizeSetting = 4 ; // Will round up to 6 due to not being divisible by 3 shards
99+ int expectedChannelsPerNode = 6 ;
100+ String node1 = CCM_RULE .getCcmBridge ().getNodeIpAddress (1 );
101+ String node2 = CCM_RULE .getCcmBridge ().getNodeIpAddress (2 );
102+ Pattern reconnectionPattern1 =
103+ Pattern .compile (".*" + Pattern .quote (node1 ) + ".*Scheduling next reconnection in.*" );
104+ Pattern reconnectionPattern2 =
105+ Pattern .compile (".*" + Pattern .quote (node2 ) + ".*Scheduling next reconnection in.*" );
106+ Set <Pattern > forbiddenOccurences =
107+ ImmutableSet .of (shardMismatchPattern , reconnectionPattern1 , reconnectionPattern2 );
95108 Map <Pattern , Integer > expectedOccurences =
96109 ImmutableMap .of (
97- Pattern .compile (".*\\ .2:19042.*Reconnection attempt complete, 6/6 channels.*" ), 1 ,
98- Pattern .compile (".*\\ .1:19042.*Reconnection attempt complete, 6/6 channels.*" ), 1 ,
99- Pattern .compile (".*Reconnection attempt complete.*" ), 2 ,
100- Pattern .compile (".*\\ .1:19042.*New channel added \\ [.*" ), 5 ,
101- Pattern .compile (".*\\ .2:19042.*New channel added \\ [.*" ), 5 ,
102- Pattern .compile (".*\\ .1:19042\\ ] Trying to create 5 missing channels.*" ), 1 ,
103- Pattern .compile (".*\\ .2:19042\\ ] Trying to create 5 missing channels.*" ), 1 );
110+ Pattern .compile (
111+ ".*"
112+ + Pattern .quote (node1 )
113+ + ":19042.*Reconnection attempt complete, 6/6 channels.*" ),
114+ 1 ,
115+ Pattern .compile (
116+ ".*"
117+ + Pattern .quote (node2 )
118+ + ":19042.*Reconnection attempt complete, 6/6 channels.*" ),
119+ 1 ,
120+ // Temporarily commented out because hanging sessions from other tests pollute logs
121+ // Pattern.compile(".*Reconnection attempt complete.*"), 2,
122+ Pattern .compile (".*" + Pattern .quote (node1 ) + ":19042.*New channel added \\ [.*" ), 5 ,
123+ Pattern .compile (".*" + Pattern .quote (node2 ) + ":19042.*New channel added \\ [.*" ), 5 ,
124+ Pattern .compile (
125+ ".*"
126+ + Pattern .quote (node1 )
127+ + ":19042\\ ] Trying to create 5 missing channels.*" ),
128+ 1 ,
129+ Pattern .compile (
130+ ".*"
131+ + Pattern .quote (node2 )
132+ + ":19042\\ ] Trying to create 5 missing channels.*" ),
133+ 1 );
104134 DriverConfigLoader loader =
105135 SessionUtils .configLoaderBuilder ()
106136 .withBoolean (DefaultDriverOption .SOCKET_REUSE_ADDRESS , reuseAddress )
107137 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , true )
108138 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_LOW , 10000 )
109139 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_HIGH , 60000 )
110- // Due to rounding up the connections per shard this will result in 6 connections per
111- // node
112- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 4 )
140+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , poolLocalSizeSetting )
113141 .build ();
114- try (Session session =
142+ try (CqlSession session =
115143 CqlSession .builder ()
116144 .addContactPoint (
117145 new InetSocketAddress (CCM_RULE .getCcmBridge ().getNodeIpAddress (1 ), 19042 ))
118146 .withConfigLoader (loader )
119147 .build ()) {
120- Uninterruptibles .sleepUninterruptibly (1 , TimeUnit .SECONDS );
148+ List <CqlSession > allSessions = Collections .singletonList (session );
149+ Awaitility .await ()
150+ .atMost (20 , TimeUnit .SECONDS )
151+ .pollInterval (500 , TimeUnit .MILLISECONDS )
152+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode ));
153+ List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
121154 expectedOccurences .forEach (
122- (pattern , times ) -> assertMatchesExactly (pattern , times , appender . list ));
123- forbiddenOccurences .forEach (pattern -> assertNoLogMatches (pattern , appender . list ));
155+ (pattern , times ) -> assertMatchesExactly (pattern , times , logsCopy ));
156+ forbiddenOccurences .forEach (pattern -> assertNoLogMatches (pattern , logsCopy ));
124157 }
125158 }
126159
127160 @ Test
128161 public void should_see_mismatched_shard () {
162+ int expectedChannelsPerNode = 33 ;
129163 DriverConfigLoader loader =
130164 SessionUtils .configLoaderBuilder ()
131165 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , true )
132166 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_LOW , 10000 )
133167 .withInt (DefaultDriverOption .ADVANCED_SHARD_AWARENESS_PORT_HIGH , 60000 )
134- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 64 )
168+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , expectedChannelsPerNode )
135169 .build ();
136- try (Session session =
170+ try (CqlSession session =
137171 CqlSession .builder ()
138172 .addContactPoint (
139173 new InetSocketAddress (CCM_RULE .getCcmBridge ().getNodeIpAddress (1 ), 9042 ))
140174 .withConfigLoader (loader )
141175 .build ()) {
142- Uninterruptibles .sleepUninterruptibly (1 , TimeUnit .SECONDS );
143- assertMatchesAtLeast (shardMismatchPattern , 5 , appender .list );
176+ List <CqlSession > allSessions = Collections .singletonList (session );
177+ Awaitility .await ()
178+ .atMost (20 , TimeUnit .SECONDS )
179+ .pollInterval (500 , TimeUnit .MILLISECONDS )
180+ // Waits until 2/3rds are initialized instead of all. It does not matter here.
181+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode * 2 / 3 ));
182+ List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
183+ assertMatchesAtLeast (shardMismatchPattern , 5 , logsCopy );
144184 }
145185 }
146186
147187 // There is no need to run this as a test, but it serves as a comparison
148188 @ SuppressWarnings ("unused" )
149189 public void should_struggle_to_fill_pools () {
190+ int expectedChannelsPerNode = 33 ; // Divisible by number of shards
150191 DriverConfigLoader loader =
151192 SessionUtils .configLoaderBuilder ()
152193 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , false )
153- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 64 )
154- .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofMillis ( 200 ))
155- .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofMillis ( 4000 ))
194+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , expectedChannelsPerNode )
195+ .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofSeconds ( 1 ))
196+ .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofSeconds ( 20 ))
156197 .build ();
157198 CqlSessionBuilder builder =
158199 CqlSession .builder ()
@@ -167,20 +208,26 @@ public void should_struggle_to_fill_pools() {
167208 CqlSession session2 = CompletableFutures .getUninterruptibly (stage2 );
168209 CqlSession session3 = CompletableFutures .getUninterruptibly (stage3 );
169210 CqlSession session4 = CompletableFutures .getUninterruptibly (stage4 ); ) {
170- Uninterruptibles .sleepUninterruptibly (20 , TimeUnit .SECONDS );
171- assertNoLogMatches (shardMismatchPattern , appender .list );
172- assertMatchesAtLeast (reconnectionPattern , 8 , appender .list );
211+ List <CqlSession > allSessions = Arrays .asList (session1 , session2 , session3 , session4 );
212+ Awaitility .await ()
213+ .atMost (20 , TimeUnit .SECONDS )
214+ .pollInterval (500 , TimeUnit .MILLISECONDS )
215+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode ));
216+ List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
217+ assertNoLogMatches (shardMismatchPattern , logsCopy );
218+ assertMatchesAtLeast (generalReconnectionPattern , 8 , logsCopy );
173219 }
174220 }
175221
176222 @ Test
177223 public void should_not_struggle_to_fill_pools () {
224+ int expectedChannelsPerNode = 33 ; // Divisible by number of shards
178225 DriverConfigLoader loader =
179226 SessionUtils .configLoaderBuilder ()
180227 .withBoolean (DefaultDriverOption .CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED , true )
181- .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , 66 )
182- .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofMillis ( 10 ))
183- .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofMillis (20 ))
228+ .withInt (DefaultDriverOption .CONNECTION_POOL_LOCAL_SIZE , expectedChannelsPerNode )
229+ .withDuration (DefaultDriverOption .RECONNECTION_BASE_DELAY , Duration .ofSeconds ( 1 ))
230+ .withDuration (DefaultDriverOption .RECONNECTION_MAX_DELAY , Duration .ofSeconds (20 ))
184231 .build ();
185232 CqlSessionBuilder builder =
186233 CqlSession .builder ()
@@ -196,25 +243,75 @@ public void should_not_struggle_to_fill_pools() {
196243 CqlSession session2 = CompletableFutures .getUninterruptibly (stage2 );
197244 CqlSession session3 = CompletableFutures .getUninterruptibly (stage3 );
198245 CqlSession session4 = CompletableFutures .getUninterruptibly (stage4 ); ) {
199- Uninterruptibles .sleepUninterruptibly (8 , TimeUnit .SECONDS );
200- int tolerance = 2 ; // Sometimes socket ends up already in use
246+
247+ List <CqlSession > allSessions = Arrays .asList (session1 , session2 , session3 , session4 );
248+ Awaitility .await ()
249+ .atMost (20 , TimeUnit .SECONDS )
250+ .pollInterval (500 , TimeUnit .MILLISECONDS )
251+ .until (() -> areAllPoolsFullyInitialized (allSessions , expectedChannelsPerNode ));
252+ String node1 = CCM_RULE .getCcmBridge ().getNodeIpAddress (1 );
253+ String node2 = CCM_RULE .getCcmBridge ().getNodeIpAddress (2 );
254+ Pattern reconnectionPattern1 =
255+ Pattern .compile (".*" + Pattern .quote (node1 ) + ".*Scheduling next reconnection in.*" );
256+ Pattern reconnectionPattern2 =
257+ Pattern .compile (".*" + Pattern .quote (node2 ) + ".*Scheduling next reconnection in.*" );
258+ int tolerance = 4 ; // Sometimes socket ends up already in use
201259 Map <Pattern , Integer > expectedOccurences =
202260 ImmutableMap .of (
203- Pattern .compile (".*\\ .2:19042.*Reconnection attempt complete, 66/66 channels.*" ),
261+ Pattern .compile (
262+ ".*"
263+ + Pattern .quote (node1 )
264+ + ":19042.*Reconnection attempt complete, 33/33 channels.*" ),
265+ 1 * sessions ,
266+ Pattern .compile (
267+ ".*"
268+ + Pattern .quote (node2 )
269+ + ":19042.*Reconnection attempt complete, 33/33 channels.*" ),
204270 1 * sessions ,
205- Pattern .compile (".*\\ .1:19042.*Reconnection attempt complete, 66/66 channels.*" ),
271+ // Temporarily commented out because hanging sessions from other tests pollute logs
272+ // Pattern.compile(".*Reconnection attempt complete.*"), 2 * sessions,
273+ Pattern .compile (".*" + Pattern .quote (node1 ) + ":19042.*New channel added \\ [.*" ),
274+ 32 * sessions - tolerance ,
275+ Pattern .compile (".*" + Pattern .quote (node2 ) + ":19042.*New channel added \\ [.*" ),
276+ 32 * sessions - tolerance ,
277+ Pattern .compile (
278+ ".*"
279+ + Pattern .quote (node1 )
280+ + ":19042\\ ] Trying to create 32 missing channels.*" ),
206281 1 * sessions ,
207- Pattern .compile (".*Reconnection attempt complete.*" ), 2 * sessions ,
208- Pattern .compile (".*.1:19042.*New channel added \\ [.*" ), 65 * sessions - tolerance ,
209- Pattern .compile (".*.2:19042.*New channel added \\ [.*" ), 65 * sessions - tolerance ,
210- Pattern .compile (".*.1:19042\\ ] Trying to create 65 missing channels.*" ), 1 * sessions ,
211- Pattern .compile (".*.2:19042\\ ] Trying to create 65 missing channels.*" ),
282+ Pattern .compile (
283+ ".*"
284+ + Pattern .quote (node2 )
285+ + ":19042\\ ] Trying to create 32 missing channels.*" ),
212286 1 * sessions );
287+ List <ILoggingEvent > logsCopy = ImmutableList .copyOf (appender .list );
213288 expectedOccurences .forEach (
214- (pattern , times ) -> assertMatchesAtLeast (pattern , times , appender .list ));
215- assertNoLogMatches (shardMismatchPattern , appender .list );
216- assertMatchesAtMost (reconnectionPattern , tolerance , appender .list );
289+ (pattern , times ) -> assertMatchesAtLeast (pattern , times , logsCopy ));
290+ assertNoLogMatches (shardMismatchPattern , logsCopy );
291+ assertMatchesAtMost (reconnectionPattern1 , tolerance , logsCopy );
292+ assertMatchesAtMost (reconnectionPattern2 , tolerance , logsCopy );
293+ }
294+ }
295+
296+ private boolean areAllPoolsFullyInitialized (
297+ List <CqlSession > sessions , int expectedChannelsPerNode ) {
298+ for (CqlSession session : sessions ) {
299+ DefaultSession defaultSession = (DefaultSession ) session ;
300+ Map <Node , ChannelPool > pools = defaultSession .getPools ();
301+ if (pools == null || pools .isEmpty ()) {
302+ return false ;
303+ }
304+
305+ for (ChannelPool pool : pools .values ()) {
306+ if (pool == null ) {
307+ return false ;
308+ }
309+ if (pool .size () < expectedChannelsPerNode ) {
310+ return false ;
311+ }
312+ }
217313 }
314+ return true ;
218315 }
219316
220317 private void assertNoLogMatches (Pattern pattern , List <ILoggingEvent > logs ) {
0 commit comments