2727import org .junit .jupiter .api .Test ;
2828import org .junit .jupiter .api .extension .RegisterExtension ;
2929import org .testcontainers .containers .PostgreSQLContainer ;
30+ import reactor .core .publisher .Flux ;
3031import reactor .core .publisher .Mono ;
32+ import reactor .netty .DisposableChannel ;
33+ import reactor .netty .DisposableServer ;
34+ import reactor .netty .tcp .TcpServer ;
3135import reactor .test .StepVerifier ;
3236
3337import static org .assertj .core .api .Assertions .assertThat ;
@@ -119,6 +123,53 @@ void testTargetPreferSecondaryConnectedToStandby() {
119123 .verifyComplete ();
120124 }
121125
126+ @ Test
127+ void testTargetPreferSecondaryConnectedToMasterOnStandbyFailure () {
128+ DisposableServer failingServer = newServer ();
129+ try {
130+ isConnectedToPrimary (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (), failingServer )
131+ .as (StepVerifier ::create )
132+ .expectNext (true )
133+ .verifyComplete ();
134+ } finally {
135+ failingServer .dispose ();
136+ }
137+ }
138+
139+ @ Test
140+ void testMultipleCallsWithTargetPreferSecondaryConnectedToStandby () {
141+ PostgresqlConnectionFactory connectionFactory = this .multiHostConnectionFactory (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (), SERVERS .getStandby ());
142+
143+ Mono <Boolean > allocator = Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
144+ Flux <Boolean > connectionPool = Flux .merge (allocator , allocator );
145+
146+ connectionPool
147+ .as (StepVerifier ::create )
148+ .expectNext (false )
149+ .expectNext (false )
150+ .verifyComplete ();
151+ }
152+
153+ @ Test
154+ void testMultipleCallsWithTargetPreferSecondaryConnectedToMasterOnStandbyFailure () {
155+ DisposableServer failingServer = newServer ();
156+ try {
157+ PostgresqlConnectionFactory connectionFactory = this .multiHostConnectionFactoryWithFailingServer (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (),
158+ failingServer );
159+
160+ Mono <Boolean > allocator = Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
161+ Flux <Boolean > connectionPool = Flux .merge (allocator , allocator );
162+
163+ connectionPool
164+ .as (StepVerifier ::create )
165+ .expectNext (true )
166+ .expectNext (true )
167+ .verifyComplete ();
168+ } finally {
169+ failingServer .dispose ();
170+ }
171+ }
172+
122173 @ Test
123174 void testTargetPrimaryChoosePrimary () {
124175 isConnectedToPrimary (MultiHostConnectionStrategy .TargetServerType .PRIMARY , SERVERS .getPrimary (), SERVERS .getStandby ())
@@ -181,6 +232,12 @@ private Mono<Boolean> isConnectedToPrimary(MultiHostConnectionStrategy.TargetSer
181232 return Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
182233 }
183234
235+ private Mono <Boolean > isConnectedToPrimary (MultiHostConnectionStrategy .TargetServerType targetServerType , PostgreSQLContainer <?> primaryServer , DisposableServer failingServer ) {
236+ PostgresqlConnectionFactory connectionFactory = this .multiHostConnectionFactoryWithFailingServer (targetServerType , primaryServer , failingServer );
237+
238+ return Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
239+ }
240+
184241 private Mono <Boolean > isPrimary (PostgresqlConnection connection ) {
185242 return connection .createStatement ("SHOW TRANSACTION_READ_ONLY" )
186243 .execute ()
@@ -203,4 +260,25 @@ private PostgresqlConnectionFactory multiHostConnectionFactory(MultiHostConnecti
203260 return new PostgresqlConnectionFactory (configuration );
204261 }
205262
263+ private PostgresqlConnectionFactory multiHostConnectionFactoryWithFailingServer (MultiHostConnectionStrategy .TargetServerType targetServerType , PostgreSQLContainer <?> primaryServer ,
264+ DisposableServer failingServer ) {
265+ PostgresqlConnectionConfiguration .Builder builder = PostgresqlConnectionConfiguration .builder ();
266+ builder .addHost (primaryServer .getHost (), primaryServer .getMappedPort (5432 ));
267+ builder .addHost (failingServer .host (), failingServer .port ());
268+
269+ PostgresqlConnectionConfiguration configuration = builder
270+ .targetServerType (targetServerType )
271+ .username (primaryServer .getUsername ())
272+ .password (primaryServer .getPassword ())
273+ .build ();
274+ return new PostgresqlConnectionFactory (configuration );
275+ }
276+
277+ // Simulate server downtime, where connections are accepted and then closed immediately
278+ static DisposableServer newServer () {
279+ return TcpServer .create ()
280+ .doOnConnection (DisposableChannel ::dispose )
281+ .bindNow ();
282+ }
283+
206284}
0 commit comments