Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions .semaphore/run-all-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,29 @@ blocks:
type: s1-prod-ubuntu24-04-amd64-2
prologue:
commands:
- if [[ "$TEST_ARCHES" != *"x86_64"* ]]; then exit 0; fi
# Semaphore CI uses return 130
# to indicate that the job should be stopped.
- if [[ "$TEST_ARCHES" != *"x86_64"* ]]; then
return 130;
fi
jobs:
- name: "PLAINTEXT cluster (x86_64)"
env_vars:
- name: TEST_SSL
value: "False"
commands:
- if [[ "$TEST_TYPE" != *"plaintext"* ]]; then exit 0; fi
- if [[ "$TEST_TYPE" != *"plaintext"* ]]; then
return 130;
fi
- ./tests/run-all-tests.sh
- name: "SSL cluster (x86_64)"
env_vars:
- name: TEST_SSL
value: "True"
commands:
- if [[ "$TEST_TYPE" != *"ssl"* ]]; then exit 0; fi
- if [[ "$TEST_TYPE" != *"ssl"* ]]; then
return 130;
fi
- ./tests/run-all-tests.sh
- name: "Run all tests (aarch64)"
dependencies: []
Expand All @@ -59,19 +67,25 @@ blocks:
type: s1-prod-ubuntu24-04-arm64-2
prologue:
commands:
- if [[ "$TEST_ARCHES" != *"aarch64"* ]]; then exit 0; fi
- if [[ "$TEST_ARCHES" != *"aarch64"* ]]; then
return 130;
fi
jobs:
- name: "PLAINTEXT cluster (aarch64)"
env_vars:
- name: TEST_SSL
value: "False"
commands:
- if [[ "$TEST_TYPE" != *"plaintext"* ]]; then exit 0; fi
- if [[ "$TEST_TYPE" != *"plaintext"* ]]; then
return 130;
fi
- ./tests/run-all-tests.sh
- name: "SSL cluster (aarch64)"
env_vars:
- name: TEST_SSL
value: "True"
commands:
- if [[ "$TEST_TYPE" != *"ssl"* ]]; then exit 0; fi
- if [[ "$TEST_TYPE" != *"ssl"* ]]; then
return 130;
fi
- ./tests/run-all-tests.sh
7 changes: 3 additions & 4 deletions tests/0011-produce_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ static void test_per_message_partition_flag(void) {
TEST_SAY("test_per_message_partition_flag: Created kafka instance %s\n",
rd_kafka_name(rk));
topic_name = test_mk_topic_name("0011_per_message_flag", 1);
test_create_topic_wait_exists(rk, topic_name, topic_num_partitions, 1,
test_create_topic_wait_exists(rk, topic_name, topic_num_partitions, -1,
5000);

rkt = rd_kafka_topic_new(rk, topic_name, topic_conf);
Expand Down Expand Up @@ -628,10 +628,9 @@ static void test_message_single_partition_record_fail(int variation) {

SUB_TEST_QUICK();

const char *confs_set_append[] = {"cleanup.policy", "APPEND",
"compact"};
const char *confs_set_append[] = {"cleanup.policy", "SET", "compact"};

const char *confs_delete_subtract[] = {"cleanup.policy", "SUBTRACT",
const char *confs_delete_subtract[] = {"cleanup.policy", "DELETE",
"compact"};

test_conf_init(&conf, &topic_conf, 20);
Expand Down
9 changes: 5 additions & 4 deletions tests/0026-consume_pause.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ static void consume_pause(void) {
test_conf_set(conf, "enable.partition.eof", "true");
test_topic_conf_set(tconf, "auto.offset.reset", "smallest");

test_create_topic_wait_exists(NULL, topic, partition_cnt, 1, 10 * 1000);
test_create_topic_wait_exists(NULL, topic, partition_cnt, -1,
10 * 1000);

/* Produce messages */
testid =
Expand Down Expand Up @@ -259,7 +260,7 @@ static void consume_pause_resume_after_reassign(void) {

test_conf_init(&conf, NULL, 60);

test_create_topic_wait_exists(NULL, topic, (int)partition + 1, 1,
test_create_topic_wait_exists(NULL, topic, (int)partition + 1, -1,
10 * 1000);

/* Produce messages */
Expand Down Expand Up @@ -419,7 +420,7 @@ static void consume_subscribe_assign_pause_resume(void) {

test_conf_init(&conf, NULL, 20);

test_create_topic_wait_exists(NULL, topic, (int)partition + 1, 1,
test_create_topic_wait_exists(NULL, topic, (int)partition + 1, -1,
10 * 1000);

/* Produce messages */
Expand Down Expand Up @@ -471,7 +472,7 @@ static void consume_seek_pause_resume(void) {

test_conf_init(&conf, NULL, 20);

test_create_topic_wait_exists(NULL, topic, (int)partition + 1, 1,
test_create_topic_wait_exists(NULL, topic, (int)partition + 1, -1,
10 * 1000);

/* Produce messages */
Expand Down
2 changes: 1 addition & 1 deletion tests/0028-long_topicnames.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ int main_0028_long_topicnames(int argc, char **argv) {
rk_c = test_create_consumer(topic, NULL, NULL, NULL);

/* Create topic */
test_create_topic_wait_exists(rk_c, topic, 1, 1, 5000);
test_create_topic_wait_exists(rk_c, topic, 1, -1, 5000);

test_consumer_subscribe(rk_c, topic);
test_consumer_poll_no_msgs("consume.nomsgs", rk_c, 0, 5000);
Expand Down
2 changes: 1 addition & 1 deletion tests/0044-partition_cnt.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ static void test_producer_partition_cnt_change(void) {
rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

test_create_topic_wait_exists(rk, topic, partition_cnt / 2, 1, 5000);
test_create_topic_wait_exists(rk, topic, partition_cnt / 2, -1, 5000);

rkt =
test_create_topic_object(rk, topic, "message.timeout.ms",
Expand Down
18 changes: 9 additions & 9 deletions tests/0045-subscribe_update.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ static void do_test_non_exist_and_partchange(void) {
await_no_rebalance("#1: empty", rk, queue, 10000);

TEST_SAY("#1: creating topic %s\n", topic_a);
test_create_topic_wait_exists(NULL, topic_a, 2, 1, 5000);
test_create_topic_wait_exists(NULL, topic_a, 2, -1, 5000);

await_assignment("#1: proper", rk, queue, 1, topic_a, 2);

Expand Down Expand Up @@ -295,7 +295,7 @@ static void do_test_regex(void) {
queue = rd_kafka_queue_get_consumer(rk);

TEST_SAY("Regex: creating topic %s (subscribed)\n", topic_b);
test_create_topic_wait_exists(NULL, topic_b, 2, 1, 5000);
test_create_topic_wait_exists(NULL, topic_b, 2, -1, 5000);

TEST_SAY("Regex: Subscribing to %s & %s & %s\n", topic_b, topic_d,
topic_e);
Expand All @@ -305,13 +305,13 @@ static void do_test_regex(void) {
2);

TEST_SAY("Regex: creating topic %s (not subscribed)\n", topic_c);
test_create_topic_wait_exists(NULL, topic_c, 4, 1, 5000);
test_create_topic_wait_exists(NULL, topic_c, 4, -1, 5000);

/* Should not see a rebalance since no topics are matched. */
await_no_rebalance("Regex: empty", rk, queue, 10000);

TEST_SAY("Regex: creating topic %s (subscribed)\n", topic_d);
test_create_topic_wait_exists(NULL, topic_d, 1, 1, 5000);
test_create_topic_wait_exists(NULL, topic_d, 1, -1, 5000);

if (test_consumer_group_protocol_classic())
await_revoke("Regex: rebalance after topic creation", rk,
Expand Down Expand Up @@ -376,10 +376,10 @@ static void do_test_topic_remove(void) {
queue = rd_kafka_queue_get_consumer(rk);

TEST_SAY("Topic removal: creating topic %s (subscribed)\n", topic_f);
test_create_topic_wait_exists(NULL, topic_f, parts_f, 1, 5000);
test_create_topic_wait_exists(NULL, topic_f, parts_f, -1, 5000);

TEST_SAY("Topic removal: creating topic %s (subscribed)\n", topic_g);
test_create_topic_wait_exists(NULL, topic_g, parts_g, 1, 5000);
test_create_topic_wait_exists(NULL, topic_g, parts_g, -1, 5000);

TEST_SAY("Topic removal: Subscribing to %s & %s\n", topic_f, topic_g);
topics = rd_kafka_topic_partition_list_new(2);
Expand Down Expand Up @@ -725,13 +725,13 @@ static void do_test_resubscribe_with_regex() {
*/

TEST_SAY("Creating topic %s\n", topic1);
test_create_topic_wait_exists(NULL, topic1, 4, 1, 5000);
test_create_topic_wait_exists(NULL, topic1, 4, -1, 5000);

TEST_SAY("Creating topic %s\n", topic2);
test_create_topic_wait_exists(NULL, topic2, 4, 1, 5000);
test_create_topic_wait_exists(NULL, topic2, 4, -1, 5000);

TEST_SAY("Creating topic %s\n", topic_a);
test_create_topic_wait_exists(NULL, topic_a, 2, 1, 5000);
test_create_topic_wait_exists(NULL, topic_a, 2, -1, 5000);

test_conf_init(&conf, NULL, 60);

Expand Down
2 changes: 1 addition & 1 deletion tests/0048-partitioner.c
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ static void do_test_partitioners(void) {
int pi;
const char *topic = test_mk_topic_name(__FUNCTION__, 1);

test_create_topic_wait_exists(NULL, topic, part_cnt, 1, 5000);
test_create_topic_wait_exists(NULL, topic, part_cnt, -1, 5000);

for (pi = 0; ptest[pi].partitioner; pi++) {
do_test_partitioner(topic, ptest[pi].partitioner, _MSG_CNT,
Expand Down
15 changes: 8 additions & 7 deletions tests/0052-msg_timestamps.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,13 @@ static void test_timestamps(const char *broker_tstype,
test_mk_topic_name(tsprintf("0052_msg_timestamps_%s_%s_%s",
broker_tstype, broker_version, codec),
1);
const int msgcnt = 20;
uint64_t testid = test_id_generate();
const int msgcnt = 20;
uint64_t testid = test_id_generate();
const char *topic_configs[] = {
"message.timestamp.type",
broker_tstype,
NULL,
};

if ((!strncmp(broker_version, "0.9", 3) ||
!strncmp(broker_version, "0.8", 3)) &&
Expand All @@ -161,11 +166,7 @@ static void test_timestamps(const char *broker_tstype,
TEST_SAY(_C_MAG "Timestamp test using %s\n", topic);
test_timeout_set(30);

test_kafka_topics(
"--create --topic \"%s\" "
"--replication-factor 1 --partitions 1 "
"--config message.timestamp.type=%s",
topic, broker_tstype);
test_admin_create_topic(NULL, topic, 1, -1, topic_configs);
test_wait_topic_exists(NULL, topic, 5000);

TEST_SAY(_C_MAG "Producing %d messages to %s\n", msgcnt, topic);
Expand Down
2 changes: 1 addition & 1 deletion tests/0055-producer_latency.c
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ int main_0055_producer_latency(int argc, char **argv) {
}

/* Create topic without replicas to keep broker-side latency down */
test_create_topic_wait_exists(NULL, topic, 1, 1, 5000);
test_create_topic_wait_exists(NULL, topic, 1, -1, 5000);

for (latconf = latconfs; latconf->name; latconf++)
test_producer_latency(topic, latconf);
Expand Down
2 changes: 1 addition & 1 deletion tests/0069-consumer_add_parts.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ int main_0069_consumer_add_parts(int argc, char **argv) {
c2 = test_create_consumer(topic, rebalance_cb, NULL, NULL);

TEST_SAY("Creating topic %s with 2 partitions\n", topic);
test_create_topic_wait_exists(c1, topic, 2, 1, 10 * 5000);
test_create_topic_wait_exists(c1, topic, 2, -1, 10 * 5000);

TEST_SAY("Subscribing\n");
test_consumer_subscribe(c1, topic);
Expand Down
34 changes: 20 additions & 14 deletions tests/0077-compaction.c
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,25 @@ static void do_test_compaction(int msgs_per_key, const char *compression) {
int cnt = 0;
test_msgver_t mv;
test_msgver_t mv_correct;
int msgcounter = 0;
const int fillcnt = 20;
int msgcounter = 0;
const int fillcnt = 20;
const char *topic_configs[] = {
"cleanup.policy",
"compact",
"segment.ms",
"10000",
"segment.bytes",
"10000",
"min.cleanable.dirty.ratio",
"0.01",
"delete.retention.ms",
"86400",
"file.delete.delay.ms",
"10000",
"max.compaction.lag.ms",
"100",
NULL,
};

testid = test_id_generate();

Expand All @@ -182,18 +199,7 @@ static void do_test_compaction(int msgs_per_key, const char *compression) {
"Test compaction on topic %s with %s compression (%d messages)\n",
topic, compression ? compression : "no", msgcnt);

test_kafka_topics(
"--create --topic \"%s\" "
"--partitions %d "
"--replication-factor 1 "
"--config cleanup.policy=compact "
"--config segment.ms=10000 "
"--config segment.bytes=10000 "
"--config min.cleanable.dirty.ratio=0.01 "
"--config delete.retention.ms=86400 "
"--config file.delete.delay.ms=10000 "
"--config max.compaction.lag.ms=100",
topic, partition + 1);
test_admin_create_topic(NULL, topic, partition + 1, -1, topic_configs);
test_wait_topic_exists(NULL, topic, 5000);

test_conf_init(&conf, NULL, 120);
Expand Down
2 changes: 1 addition & 1 deletion tests/0084-destroy_flags.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ static void destroy_flags(int local_mode) {
/* Create the topic to avoid not-yet-auto-created-topics being
* subscribed to (and thus raising an error). */
if (!local_mode) {
test_create_topic_wait_exists(NULL, topic, 3, 1, 5000);
test_create_topic_wait_exists(NULL, topic, 3, -1, 5000);
}

for (i = 0; i < (int)RD_ARRAYSIZE(args); i++) {
Expand Down
2 changes: 1 addition & 1 deletion tests/0088-produce_metadata_timeout.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ int main_0088_produce_metadata_timeout(int argc, char **argv) {
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

/* Create topic with single partition, for simplicity. */
test_create_topic_wait_exists(rk, topic, 1, 1, 5000);
test_create_topic_wait_exists(rk, topic, 1, -1, 5000);

rkt = rd_kafka_topic_new(rk, topic, NULL);

Expand Down
8 changes: 4 additions & 4 deletions tests/0089-max_poll_interval.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ static void do_test(void) {

testid = test_id_generate();

test_create_topic_wait_exists(NULL, topic, 1, 1, 5000);
test_create_topic_wait_exists(NULL, topic, 1, -1, 5000);

test_produce_msgs_easy(topic, testid, -1, msgcnt);

Expand Down Expand Up @@ -212,7 +212,7 @@ static void do_test_with_log_queue(void) {

testid = test_id_generate();

test_create_topic_wait_exists(NULL, topic, 1, 1, 5000);
test_create_topic_wait_exists(NULL, topic, 1, -1, 5000);

test_produce_msgs_easy(topic, testid, -1, msgcnt);

Expand Down Expand Up @@ -380,7 +380,7 @@ do_test_rejoin_after_interval_expire(rd_bool_t forward_to_another_q,
"%d",
forward_to_another_q, forward_to_consumer_q);

test_create_topic_wait_exists(NULL, topic, 1, 1, 5000);
test_create_topic_wait_exists(NULL, topic, 1, -1, 5000);

test_str_id_generate(groupid, sizeof(groupid));
test_conf_init(&conf, NULL, 60);
Expand Down Expand Up @@ -471,7 +471,7 @@ static void do_test_max_poll_reset_with_consumer_cb(void) {

SUB_TEST();

test_create_topic_wait_exists(NULL, topic, 1, 1, 5000);
test_create_topic_wait_exists(NULL, topic, 1, -1, 5000);
uint64_t testid = test_id_generate();

test_produce_msgs_easy(topic, testid, -1, 100);
Expand Down
2 changes: 1 addition & 1 deletion tests/0090-idempotence.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ static void do_test_implicit_ack(const char *what,

rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

test_create_topic_wait_exists(rk, topic, 1, 1, 5000);
test_create_topic_wait_exists(rk, topic, 1, -1, 5000);

rkt = test_create_producer_topic(rk, topic, NULL);

Expand Down
6 changes: 3 additions & 3 deletions tests/0091-max_poll_interval_timeout.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ static void do_test_with_assign(const char *topic) {

test_conf_init(&conf, NULL, 60);

test_create_topic_wait_exists(NULL, topic, 2, 1, 5000);
test_create_topic_wait_exists(NULL, topic, 2, -1, 5000);

test_conf_set(conf, "session.timeout.ms", "6000");
test_conf_set(conf, "max.poll.interval.ms", "7000" /*7s*/);
Expand Down Expand Up @@ -249,7 +249,7 @@ static void do_test_no_poll(const char *topic) {

test_conf_init(&conf, NULL, 60);

test_create_topic_wait_exists(NULL, topic, 2, 1, 5000);
test_create_topic_wait_exists(NULL, topic, 2, -1, 5000);

test_conf_set(conf, "session.timeout.ms", "6000");
test_conf_set(conf, "max.poll.interval.ms", "7000" /*7s*/);
Expand Down Expand Up @@ -283,7 +283,7 @@ int main_0091_max_poll_interval_timeout(int argc, char **argv) {
const char *topic =
test_mk_topic_name("0091_max_poll_interval_tmout", 1);

test_create_topic_wait_exists(NULL, topic, 2, 1, 5000);
test_create_topic_wait_exists(NULL, topic, 2, -1, 5000);

do_test_with_subscribe(topic);

Expand Down
Loading