Skip to content

Commit f8eac1d

Browse files
authored
Merge pull request #14240 from rabbitmq/stream-fix-test-flake-2
Increase timeouts and improve error logging in stream test
2 parents fb9f048 + c1fd7c3 commit f8eac1d

File tree

5 files changed

+139
-81
lines changed

5 files changed

+139
-81
lines changed

deps/rabbitmq_ct_helpers/src/stream_test_utils.erl

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,17 @@ connect(Config, Node) ->
2121
connect(StreamPort).
2222

2323
connect(StreamPort) ->
24+
do_connect(StreamPort, #{}).
25+
26+
connect_pp(StreamPort, PeerProperties) ->
27+
do_connect(StreamPort, PeerProperties).
28+
29+
do_connect(StreamPort, PeerProperties) ->
2430
{ok, Sock} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),
2531

2632
C0 = rabbit_stream_core:init(0),
27-
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
33+
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties,
34+
PeerProperties}}),
2835
ok = gen_tcp:send(Sock, PeerPropertiesFrame),
2936
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(Sock, C0),
3037

@@ -78,8 +85,12 @@ delete_publisher(Sock, C0, PublisherId) ->
7885
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
7986
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, #{}).
8087

88+
8189
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props) ->
82-
Cmd = {subscribe, SubscriptionId, Stream, _OffsetSpec = first,
90+
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props, first).
91+
92+
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props, OffsetSpec) ->
93+
Cmd = {subscribe, SubscriptionId, Stream, OffsetSpec,
8394
InitialCredit, Props},
8495
SubscribeFrame = rabbit_stream_core:frame({request, 1, Cmd}),
8596
ok = gen_tcp:send(Sock, SubscribeFrame),

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2344,8 +2344,8 @@ handle_frame_post_auth(Transport,
23442344
case {is_binary(Host), is_integer(Port)} of
23452345
{true, true} -> Acc#{Node => {Host, Port}};
23462346
_ ->
2347-
rabbit_log:warning("Error when retrieving broker metadata: ~tp ~tp",
2348-
[Host, Port]),
2347+
rabbit_log:warning("Error when retrieving broker '~tp' metadata: ~tp ~tp",
2348+
[Node, Host, Port]),
23492349
Acc
23502350
end
23512351
end,

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 69 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -819,89 +819,86 @@ store_offset_requires_read_access(Config) ->
819819

820820
offset_lag_calculation(Config) ->
821821
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
822-
T = gen_tcp,
823-
Port = get_port(T, Config),
824-
Opts = get_opts(T),
825-
{ok, S} = T:connect("localhost", Port, Opts),
826-
C = rabbit_stream_core:init(0),
822+
Port = get_port(gen_tcp, Config),
827823
ConnectionName = FunctionName,
828-
test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C),
829-
test_authenticate(T, S, C),
824+
{ok, S, C0} = stream_test_utils:connect_pp(Port,
825+
#{<<"connection_name">> => ConnectionName}),
830826

831-
Stream = FunctionName,
832-
test_create_stream(T, S, Stream, C),
827+
St = FunctionName,
828+
{ok, C1} = stream_test_utils:create_stream(S, C0, St),
833829

834830
SubId = 1,
835831
TheFuture = os:system_time(millisecond) + 60 * 60 * 1_000,
836-
lists:foreach(fun(OffsetSpec) ->
837-
test_subscribe(T, S, SubId, Stream,
838-
OffsetSpec, 10, #{},
839-
?RESPONSE_CODE_OK, C),
840-
ConsumerInfo = consumer_offset_info(Config, ConnectionName),
841-
?assertEqual({0, 0}, ConsumerInfo),
842-
test_unsubscribe(T, S, SubId, C)
843-
end, [first, last, next, 0, 1_000, {timestamp, TheFuture}]),
844-
845-
846-
PublisherId = 1,
847-
test_declare_publisher(T, S, PublisherId, Stream, C),
832+
C2 = lists:foldl(
833+
fun(OffsetSpec, C00) ->
834+
{ok, C01} = stream_test_utils:subscribe(S, C00, St, SubId,
835+
10, #{}, OffsetSpec),
836+
ConsumerInfo = consumer_offset_info(Config, ConnectionName),
837+
?assertEqual({0, 0}, ConsumerInfo),
838+
{ok, C02} = stream_test_utils:unsubscribe(S, C01, SubId),
839+
C02
840+
end, C1, [first, last, next, 0, 1_000, {timestamp, TheFuture}]),
841+
842+
PubId = 1,
843+
{ok, C3} = stream_test_utils:declare_publisher(S, C2, St, PubId),
848844
MessageCount = 10,
849845
Body = <<"hello">>,
850-
lists:foreach(fun(_) ->
851-
test_publish_confirm(T, S, PublisherId, Body, C)
852-
end, lists:seq(1, MessageCount - 1)),
846+
{ok, C4} = stream_test_utils:publish(S, C3, PubId, 1,
847+
lists:duplicate(MessageCount - 1, Body)),
853848
%% to make sure to have 2 chunks
854849
timer:sleep(200),
855-
test_publish_confirm(T, S, PublisherId, Body, C),
856-
test_delete_publisher(T, S, PublisherId, C),
850+
{ok, C5} = stream_test_utils:publish(S, C4, PubId, 1, [Body]),
851+
{ok, C6} = stream_test_utils:delete_publisher(S, C5, PubId),
857852

858853
NextOffset = MessageCount,
859-
lists:foreach(fun({OffsetSpec, ReceiveDeliver, CheckFun}) ->
860-
test_subscribe(T, S, SubId, Stream,
861-
OffsetSpec, 1, #{},
862-
?RESPONSE_CODE_OK, C),
863-
case ReceiveDeliver of
864-
true ->
865-
{{deliver, SubId, _}, _} = receive_commands(T, S, C);
866-
_ ->
867-
ok
868-
end,
869-
{Offset, Lag} = consumer_offset_info(Config, ConnectionName),
870-
CheckFun(Offset, Lag),
871-
test_unsubscribe(T, S, SubId, C)
872-
end, [{first, true,
873-
fun(Offset, Lag) ->
874-
?assert(Offset >= 0, "first, at least one chunk consumed"),
875-
?assert(Lag > 0, "first, not all messages consumed")
876-
end},
877-
{last, true,
878-
fun(Offset, _Lag) ->
879-
?assert(Offset > 0, "offset expected for last")
880-
end},
881-
{next, false,
882-
fun(Offset, Lag) ->
883-
?assertEqual(NextOffset, Offset, "next, offset should be at the end of the stream"),
884-
?assert(Lag =:= 0, "next, offset lag should be 0")
885-
end},
886-
{0, true,
887-
fun(Offset, Lag) ->
888-
?assert(Offset >= 0, "offset spec = 0, at least one chunk consumed"),
889-
?assert(Lag > 0, "offset spec = 0, not all messages consumed")
890-
end},
891-
{1_000, false,
892-
fun(Offset, Lag) ->
893-
?assertEqual(NextOffset, Offset, "offset spec = 1000, offset should be at the end of the stream"),
894-
?assert(Lag =:= 0, "offset spec = 1000, offset lag should be 0")
895-
end},
896-
{{timestamp, TheFuture}, false,
897-
fun(Offset, Lag) ->
898-
?assertEqual(NextOffset, Offset, "offset spec in future, offset should be at the end of the stream"),
899-
?assert(Lag =:= 0, "offset spec in future , offset lag should be 0")
900-
end}]),
901-
902-
test_delete_stream(T, S, Stream, C, false),
903-
test_close(T, S, C),
904-
854+
C7 = lists:foldl(
855+
fun({OffsetSpec, ReceiveDeliver, CheckFun}, C00) ->
856+
{ok, C01} = stream_test_utils:subscribe(S, C00, St, SubId,
857+
1, #{}, OffsetSpec),
858+
859+
C03 = case ReceiveDeliver of
860+
true ->
861+
{{deliver, SubId, _}, C02} = receive_commands(S, C01),
862+
C02;
863+
_ ->
864+
C01
865+
end,
866+
{Offset, Lag} = consumer_offset_info(Config, ConnectionName),
867+
CheckFun(Offset, Lag),
868+
{ok, C04} = stream_test_utils:unsubscribe(S, C03, SubId),
869+
C04
870+
end, C6, [{first, true,
871+
fun(Offset, Lag) ->
872+
?assert(Offset >= 0, "first, at least one chunk consumed"),
873+
?assert(Lag > 0, "first, not all messages consumed")
874+
end},
875+
{last, true,
876+
fun(Offset, _Lag) ->
877+
?assert(Offset > 0, "offset expected for last")
878+
end},
879+
{next, false,
880+
fun(Offset, Lag) ->
881+
?assertEqual(NextOffset, Offset, "next, offset should be at the end of the stream"),
882+
?assert(Lag =:= 0, "next, offset lag should be 0")
883+
end},
884+
{0, true,
885+
fun(Offset, Lag) ->
886+
?assert(Offset >= 0, "offset spec = 0, at least one chunk consumed"),
887+
?assert(Lag > 0, "offset spec = 0, not all messages consumed")
888+
end},
889+
{1_000, false,
890+
fun(Offset, Lag) ->
891+
?assertEqual(NextOffset, Offset, "offset spec = 1000, offset should be at the end of the stream"),
892+
?assert(Lag =:= 0, "offset spec = 1000, offset lag should be 0")
893+
end},
894+
{{timestamp, TheFuture}, false,
895+
fun(Offset, Lag) ->
896+
?assertEqual(NextOffset, Offset, "offset spec in future, offset should be at the end of the stream"),
897+
?assert(Lag =:= 0, "offset spec in future , offset lag should be 0")
898+
end}]),
899+
900+
{ok, C8} = stream_test_utils:delete_stream(S, C7, St),
901+
{ok, _} = stream_test_utils:close(S, C8),
905902
ok.
906903

907904
authentication_error_should_close_with_delay(Config) ->

deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,11 @@
3434
import java.util.concurrent.atomic.AtomicInteger;
3535
import java.util.concurrent.atomic.AtomicLong;
3636
import java.util.concurrent.atomic.AtomicReference;
37+
import java.util.function.ToLongFunction;
3738
import org.junit.jupiter.api.AfterEach;
39+
import org.junit.jupiter.api.BeforeEach;
3840
import org.junit.jupiter.api.Test;
41+
import org.junit.jupiter.api.TestInfo;
3942
import org.junit.jupiter.api.extension.ExtendWith;
4043
import org.slf4j.Logger;
4144
import org.slf4j.LoggerFactory;
@@ -45,6 +48,7 @@ public class FailureTest {
4548

4649
private static final Logger LOGGER = LoggerFactory.getLogger(FailureTest.class);
4750

51+
static String testMethod;
4852
TestUtils.ClientFactory cf;
4953
String stream;
5054
ExecutorService executorService;
@@ -57,6 +61,11 @@ static void wait(Duration duration) {
5761
}
5862
}
5963

64+
@BeforeEach
65+
void init(TestInfo info) {
66+
testMethod = info.getTestMethod().get().getName();
67+
}
68+
6069
@AfterEach
6170
void tearDown() {
6271
if (executorService != null) {
@@ -142,9 +151,9 @@ void leaderFailureWhenPublisherConnectedToReplica() throws Exception {
142151
waitAtMost(
143152
Duration.ofSeconds(10),
144153
() -> {
145-
LOGGER.info("Getting metadata for {}", stream);
154+
log("Getting metadata for {}", stream);
146155
Client.StreamMetadata m = publisher.metadata(stream).get(stream);
147-
LOGGER.info("Metadata for {} (expecting 2 replicas): {}", stream, m);
156+
log("Metadata for {} (expecting 2 replicas): {}", stream, m);
148157
return m.getReplicas().size() == 2;
149158
});
150159

@@ -195,6 +204,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
195204
Map<Long, Message> published = new ConcurrentHashMap<>();
196205
Set<Message> confirmed = ConcurrentHashMap.newKeySet();
197206

207+
// match confirmed messages to published messages
198208
Client.PublishConfirmListener publishConfirmListener =
199209
(publisherId, publishingId) -> {
200210
Message confirmedMessage;
@@ -212,18 +222,22 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
212222
AtomicReference<Client> publisher = new AtomicReference<>();
213223
CountDownLatch reconnectionLatch = new CountDownLatch(1);
214224
AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>();
225+
// shutdown listener reconnects to node 2 to locate the node the stream leader is on
226+
// it then re-creates a publisher connected to this node
215227
Client.ShutdownListener shutdownListener =
216228
shutdownContext -> {
217229
if (shutdownContext.getShutdownReason()
218230
== Client.ShutdownContext.ShutdownReason.UNKNOWN) {
231+
log("Connection got closed, reconnecting");
219232
// avoid long-running task in the IO thread
220233
executorService.submit(
221234
() -> {
222235
connected.set(false);
223236
AtomicReference<Client> locator = new AtomicReference<>();
224237
try {
238+
log("Reconnecting to node 2");
225239
waitAtMost(
226-
Duration.ofSeconds(5),
240+
Duration.ofSeconds(20),
227241
() -> {
228242
try {
229243
locator.set(
@@ -233,14 +247,35 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
233247
return false;
234248
}
235249
});
250+
log("Reconnected to node 2, looking up new stream leader");
236251
waitAtMost(
237-
Duration.ofSeconds(5),
252+
Duration.ofSeconds(20),
238253
() -> {
239254
Client.StreamMetadata m = locator.get().metadata(stream).get(stream);
240255
return m.getLeader() != null
241256
&& m.getLeader().getPort() != streamPortNode1();
242257
});
258+
log("New stream leader is on another node than node 1");
243259
} catch (Throwable e) {
260+
log("Error while trying to connect to new stream leader");
261+
if (locator.get() == null) {
262+
log("Could not reconnect");
263+
} else {
264+
try {
265+
Client.StreamMetadata m = locator.get().metadata(stream).get(stream);
266+
if (m.getLeader() == null) {
267+
log("The stream has no leader");
268+
} else {
269+
log(
270+
"The stream is on node with port {} (node 1 = {}, node 2 = {})",
271+
m.getLeader().getPort(),
272+
streamPortNode1(),
273+
streamPortNode2());
274+
}
275+
} catch (Exception ex) {
276+
log("Error while checking failure: {}", ex.getMessage());
277+
}
278+
}
244279
reconnectionLatch.countDown();
245280
return;
246281
}
@@ -278,6 +313,9 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
278313

279314
AtomicBoolean keepPublishing = new AtomicBoolean(true);
280315

316+
AtomicLong publishSequence = new AtomicLong(0);
317+
ToLongFunction<Object> publishSequenceFunction = value -> publishSequence.getAndIncrement();
318+
281319
executorService.submit(
282320
() -> {
283321
while (keepPublishing.get()) {
@@ -295,7 +333,11 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
295333
.build();
296334
try {
297335
long publishingId =
298-
publisher.get().publish((byte) 1, Collections.singletonList(message)).get(0);
336+
publisher
337+
.get()
338+
.publish(
339+
(byte) 1, Collections.singletonList(message), publishSequenceFunction)
340+
.get(0);
299341
published.put(publishingId, message);
300342
} catch (Exception e) {
301343
// keep going
@@ -314,6 +356,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
314356
int confirmedCount = confirmed.size();
315357

316358
try {
359+
// stop the first node (this is where the stream leader is)
317360
Host.rabbitmqctl("stop_app");
318361

319362
assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
@@ -324,6 +367,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
324367
} finally {
325368
Host.rabbitmqctl("start_app");
326369
}
370+
// making sure we published a few messages and got the confirmations
327371
assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
328372
confirmedCount = confirmed.size();
329373

@@ -339,6 +383,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
339383
// let's publish for a bit of time
340384
Thread.sleep(2000);
341385

386+
// making sure we published messages and got the confirmations
342387
assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
343388

344389
keepPublishing.set(false);
@@ -640,4 +685,8 @@ void shouldReceiveMetadataUpdateWhenReplicaIsKilledWithPublisherAndConsumerOnSam
640685
Host.killStreamLeaderProcess(stream);
641686
waitUntil(() -> metadataNotifications.get() == 2);
642687
}
688+
689+
private static void log(String format, Object... args) {
690+
LOGGER.info("[" + testMethod + "] " + format, args);
691+
}
643692
}

deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/resources/logback-test.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
</appender>
77

88
<logger name="com.rabbitmq.stream" level="info" />
9+
<logger name="com.rabbitmq.stream.impl.Client" level="warn" />
910

1011
<root level="info">
1112
<appender-ref ref="STDOUT" />

0 commit comments

Comments
 (0)