From 06e248cb3218ac44553ccbf0c1f589b885daf83d Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 14 Apr 2025 01:22:37 +0200 Subject: [PATCH 1/5] Create all topics with default replication factor to avoid errors because if a minimum replication factor enforced on topic creation --- tests/0011-produce_batch.c | 7 ++- tests/0026-consume_pause.c | 9 ++-- tests/0028-long_topicnames.c | 2 +- tests/0044-partition_cnt.c | 2 +- tests/0045-subscribe_update.c | 18 ++++---- tests/0048-partitioner.c | 2 +- tests/0055-producer_latency.c | 2 +- tests/0069-consumer_add_parts.c | 2 +- tests/0084-destroy_flags.c | 2 +- tests/0088-produce_metadata_timeout.c | 2 +- tests/0089-max_poll_interval.c | 8 ++-- tests/0090-idempotence.c | 2 +- tests/0091-max_poll_interval_timeout.c | 6 +-- tests/0093-holb.c | 2 +- tests/0098-consumer-txn.cpp | 32 +++++++------- tests/0099-commit_metadata.c | 2 +- tests/0101-fetch-from-follower.cpp | 2 +- tests/0102-static_group_rebalance.c | 8 ++-- tests/0103-transactions.c | 14 +++--- tests/0107-topic_recreate.c | 4 +- tests/0109-auto_create_topics.cpp | 4 +- tests/0111-delay_create_topics.cpp | 2 +- tests/0112-assign_unknown_part.c | 2 +- tests/0113-cooperative_rebalance.cpp | 60 +++++++++++++------------- tests/0114-sticky_partitioning.cpp | 2 +- tests/0115-producer_auth.cpp | 2 +- tests/0119-consumer_auth.cpp | 2 +- tests/0125-immediate_flush.c | 2 +- tests/0126-oauthbearer_oidc.c | 2 +- tests/0129-fetch_aborted_msgs.c | 2 +- tests/0132-strategy_ordering.c | 2 +- tests/0137-barrier_batch_consume.c | 10 ++--- tests/0140-commit_metadata.cpp | 2 +- tests/0142-reauthentication.c | 2 +- tests/test.c | 2 +- 35 files changed, 113 insertions(+), 113 deletions(-) diff --git a/tests/0011-produce_batch.c b/tests/0011-produce_batch.c index f0c618bf88..8b22d084db 100644 --- a/tests/0011-produce_batch.c +++ b/tests/0011-produce_batch.c @@ -366,7 +366,7 @@ static void test_per_message_partition_flag(void) { TEST_SAY("test_per_message_partition_flag: Created kafka instance %s\n", rd_kafka_name(rk)); topic_name = test_mk_topic_name("0011_per_message_flag", 1); - test_create_topic_wait_exists(rk, topic_name, topic_num_partitions, 1, + test_create_topic_wait_exists(rk, topic_name, topic_num_partitions, -1, 5000); rkt = rd_kafka_topic_new(rk, topic_name, topic_conf); @@ -628,10 +628,9 @@ static void test_message_single_partition_record_fail(int variation) { SUB_TEST_QUICK(); - const char *confs_set_append[] = {"cleanup.policy", "APPEND", - "compact"}; + const char *confs_set_append[] = {"cleanup.policy", "SET", "compact"}; - const char *confs_delete_subtract[] = {"cleanup.policy", "SUBTRACT", + const char *confs_delete_subtract[] = {"cleanup.policy", "DELETE", "compact"}; test_conf_init(&conf, &topic_conf, 20); diff --git a/tests/0026-consume_pause.c b/tests/0026-consume_pause.c index 87119ae9c3..9d748983bc 100644 --- a/tests/0026-consume_pause.c +++ b/tests/0026-consume_pause.c @@ -63,7 +63,8 @@ static void consume_pause(void) { test_conf_set(conf, "enable.partition.eof", "true"); test_topic_conf_set(tconf, "auto.offset.reset", "smallest"); - test_create_topic_wait_exists(NULL, topic, partition_cnt, 1, 10 * 1000); + test_create_topic_wait_exists(NULL, topic, partition_cnt, -1, + 10 * 1000); /* Produce messages */ testid = @@ -259,7 +260,7 @@ static void consume_pause_resume_after_reassign(void) { test_conf_init(&conf, NULL, 60); - test_create_topic_wait_exists(NULL, topic, (int)partition + 1, 1, + test_create_topic_wait_exists(NULL, topic, (int)partition + 1, -1, 10 * 1000); /* Produce messages */ @@ -419,7 +420,7 @@ static void consume_subscribe_assign_pause_resume(void) { test_conf_init(&conf, NULL, 20); - test_create_topic_wait_exists(NULL, topic, (int)partition + 1, 1, + test_create_topic_wait_exists(NULL, topic, (int)partition + 1, -1, 10 * 1000); /* Produce messages */ @@ -471,7 +472,7 @@ static void consume_seek_pause_resume(void) { test_conf_init(&conf, NULL, 20); - test_create_topic_wait_exists(NULL, topic, (int)partition + 1, 1, + test_create_topic_wait_exists(NULL, topic, (int)partition + 1, -1, 10 * 1000); /* Produce messages */ diff --git a/tests/0028-long_topicnames.c b/tests/0028-long_topicnames.c index 3649805ee7..a02602e1ed 100644 --- a/tests/0028-long_topicnames.c +++ b/tests/0028-long_topicnames.c @@ -62,7 +62,7 @@ int main_0028_long_topicnames(int argc, char **argv) { rk_c = test_create_consumer(topic, NULL, NULL, NULL); /* Create topic */ - test_create_topic_wait_exists(rk_c, topic, 1, 1, 5000); + test_create_topic_wait_exists(rk_c, topic, 1, -1, 5000); test_consumer_subscribe(rk_c, topic); test_consumer_poll_no_msgs("consume.nomsgs", rk_c, 0, 5000); diff --git a/tests/0044-partition_cnt.c b/tests/0044-partition_cnt.c index 64df57affb..2b566cadc4 100644 --- a/tests/0044-partition_cnt.c +++ b/tests/0044-partition_cnt.c @@ -61,7 +61,7 @@ static void test_producer_partition_cnt_change(void) { rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); rk = test_create_handle(RD_KAFKA_PRODUCER, conf); - test_create_topic_wait_exists(rk, topic, partition_cnt / 2, 1, 5000); + test_create_topic_wait_exists(rk, topic, partition_cnt / 2, -1, 5000); rkt = test_create_topic_object(rk, topic, "message.timeout.ms", diff --git a/tests/0045-subscribe_update.c b/tests/0045-subscribe_update.c index adf432b062..7ef22467cc 100644 --- a/tests/0045-subscribe_update.c +++ b/tests/0045-subscribe_update.c @@ -235,7 +235,7 @@ static void do_test_non_exist_and_partchange(void) { await_no_rebalance("#1: empty", rk, queue, 10000); TEST_SAY("#1: creating topic %s\n", topic_a); - test_create_topic_wait_exists(NULL, topic_a, 2, 1, 5000); + test_create_topic_wait_exists(NULL, topic_a, 2, -1, 5000); await_assignment("#1: proper", rk, queue, 1, topic_a, 2); @@ -295,7 +295,7 @@ static void do_test_regex(void) { queue = rd_kafka_queue_get_consumer(rk); TEST_SAY("Regex: creating topic %s (subscribed)\n", topic_b); - test_create_topic_wait_exists(NULL, topic_b, 2, 1, 5000); + test_create_topic_wait_exists(NULL, topic_b, 2, -1, 5000); TEST_SAY("Regex: Subscribing to %s & %s & %s\n", topic_b, topic_d, topic_e); @@ -305,13 +305,13 @@ static void do_test_regex(void) { 2); TEST_SAY("Regex: creating topic %s (not subscribed)\n", topic_c); - test_create_topic_wait_exists(NULL, topic_c, 4, 1, 5000); + test_create_topic_wait_exists(NULL, topic_c, 4, -1, 5000); /* Should not see a rebalance since no topics are matched. */ await_no_rebalance("Regex: empty", rk, queue, 10000); TEST_SAY("Regex: creating topic %s (subscribed)\n", topic_d); - test_create_topic_wait_exists(NULL, topic_d, 1, 1, 5000); + test_create_topic_wait_exists(NULL, topic_d, 1, -1, 5000); if (test_consumer_group_protocol_classic()) await_revoke("Regex: rebalance after topic creation", rk, @@ -376,10 +376,10 @@ static void do_test_topic_remove(void) { queue = rd_kafka_queue_get_consumer(rk); TEST_SAY("Topic removal: creating topic %s (subscribed)\n", topic_f); - test_create_topic_wait_exists(NULL, topic_f, parts_f, 1, 5000); + test_create_topic_wait_exists(NULL, topic_f, parts_f, -1, 5000); TEST_SAY("Topic removal: creating topic %s (subscribed)\n", topic_g); - test_create_topic_wait_exists(NULL, topic_g, parts_g, 1, 5000); + test_create_topic_wait_exists(NULL, topic_g, parts_g, -1, 5000); TEST_SAY("Topic removal: Subscribing to %s & %s\n", topic_f, topic_g); topics = rd_kafka_topic_partition_list_new(2); @@ -725,13 +725,13 @@ static void do_test_resubscribe_with_regex() { */ TEST_SAY("Creating topic %s\n", topic1); - test_create_topic_wait_exists(NULL, topic1, 4, 1, 5000); + test_create_topic_wait_exists(NULL, topic1, 4, -1, 5000); TEST_SAY("Creating topic %s\n", topic2); - test_create_topic_wait_exists(NULL, topic2, 4, 1, 5000); + test_create_topic_wait_exists(NULL, topic2, 4, -1, 5000); TEST_SAY("Creating topic %s\n", topic_a); - test_create_topic_wait_exists(NULL, topic_a, 2, 1, 5000); + test_create_topic_wait_exists(NULL, topic_a, 2, -1, 5000); test_conf_init(&conf, NULL, 60); diff --git a/tests/0048-partitioner.c b/tests/0048-partitioner.c index 638bbf83e8..b15f60f485 100644 --- a/tests/0048-partitioner.c +++ b/tests/0048-partitioner.c @@ -267,7 +267,7 @@ static void do_test_partitioners(void) { int pi; const char *topic = test_mk_topic_name(__FUNCTION__, 1); - test_create_topic_wait_exists(NULL, topic, part_cnt, 1, 5000); + test_create_topic_wait_exists(NULL, topic, part_cnt, -1, 5000); for (pi = 0; ptest[pi].partitioner; pi++) { do_test_partitioner(topic, ptest[pi].partitioner, _MSG_CNT, diff --git a/tests/0055-producer_latency.c b/tests/0055-producer_latency.c index 6cff6848b1..1b612b205b 100644 --- a/tests/0055-producer_latency.c +++ b/tests/0055-producer_latency.c @@ -343,7 +343,7 @@ int main_0055_producer_latency(int argc, char **argv) { } /* Create topic without replicas to keep broker-side latency down */ - test_create_topic_wait_exists(NULL, topic, 1, 1, 5000); + test_create_topic_wait_exists(NULL, topic, 1, -1, 5000); for (latconf = latconfs; latconf->name; latconf++) test_producer_latency(topic, latconf); diff --git a/tests/0069-consumer_add_parts.c b/tests/0069-consumer_add_parts.c index d8c4e444e0..08c64c7021 100644 --- a/tests/0069-consumer_add_parts.c +++ b/tests/0069-consumer_add_parts.c @@ -77,7 +77,7 @@ int main_0069_consumer_add_parts(int argc, char **argv) { c2 = test_create_consumer(topic, rebalance_cb, NULL, NULL); TEST_SAY("Creating topic %s with 2 partitions\n", topic); - test_create_topic_wait_exists(c1, topic, 2, 1, 10 * 5000); + test_create_topic_wait_exists(c1, topic, 2, -1, 10 * 5000); TEST_SAY("Subscribing\n"); test_consumer_subscribe(c1, topic); diff --git a/tests/0084-destroy_flags.c b/tests/0084-destroy_flags.c index c2c7a5ad7d..f2bba744e3 100644 --- a/tests/0084-destroy_flags.c +++ b/tests/0084-destroy_flags.c @@ -171,7 +171,7 @@ static void destroy_flags(int local_mode) { /* Create the topic to avoid not-yet-auto-created-topics being * subscribed to (and thus raising an error). */ if (!local_mode) { - test_create_topic_wait_exists(NULL, topic, 3, 1, 5000); + test_create_topic_wait_exists(NULL, topic, 3, -1, 5000); } for (i = 0; i < (int)RD_ARRAYSIZE(args); i++) { diff --git a/tests/0088-produce_metadata_timeout.c b/tests/0088-produce_metadata_timeout.c index a34cbfa38b..bca32a9bb8 100644 --- a/tests/0088-produce_metadata_timeout.c +++ b/tests/0088-produce_metadata_timeout.c @@ -114,7 +114,7 @@ int main_0088_produce_metadata_timeout(int argc, char **argv) { rk = test_create_handle(RD_KAFKA_PRODUCER, conf); /* Create topic with single partition, for simplicity. */ - test_create_topic_wait_exists(rk, topic, 1, 1, 5000); + test_create_topic_wait_exists(rk, topic, 1, -1, 5000); rkt = rd_kafka_topic_new(rk, topic, NULL); diff --git a/tests/0089-max_poll_interval.c b/tests/0089-max_poll_interval.c index 3678ea0928..c112c5f9c9 100644 --- a/tests/0089-max_poll_interval.c +++ b/tests/0089-max_poll_interval.c @@ -61,7 +61,7 @@ static void do_test(void) { testid = test_id_generate(); - test_create_topic_wait_exists(NULL, topic, 1, 1, 5000); + test_create_topic_wait_exists(NULL, topic, 1, -1, 5000); test_produce_msgs_easy(topic, testid, -1, msgcnt); @@ -212,7 +212,7 @@ static void do_test_with_log_queue(void) { testid = test_id_generate(); - test_create_topic_wait_exists(NULL, topic, 1, 1, 5000); + test_create_topic_wait_exists(NULL, topic, 1, -1, 5000); test_produce_msgs_easy(topic, testid, -1, msgcnt); @@ -380,7 +380,7 @@ do_test_rejoin_after_interval_expire(rd_bool_t forward_to_another_q, "%d", forward_to_another_q, forward_to_consumer_q); - test_create_topic_wait_exists(NULL, topic, 1, 1, 5000); + test_create_topic_wait_exists(NULL, topic, 1, -1, 5000); test_str_id_generate(groupid, sizeof(groupid)); test_conf_init(&conf, NULL, 60); @@ -471,7 +471,7 @@ static void do_test_max_poll_reset_with_consumer_cb(void) { SUB_TEST(); - test_create_topic_wait_exists(NULL, topic, 1, 1, 5000); + test_create_topic_wait_exists(NULL, topic, 1, -1, 5000); uint64_t testid = test_id_generate(); test_produce_msgs_easy(topic, testid, -1, 100); diff --git a/tests/0090-idempotence.c b/tests/0090-idempotence.c index 10975a6362..43053a810f 100644 --- a/tests/0090-idempotence.c +++ b/tests/0090-idempotence.c @@ -130,7 +130,7 @@ static void do_test_implicit_ack(const char *what, rk = test_create_handle(RD_KAFKA_PRODUCER, conf); - test_create_topic_wait_exists(rk, topic, 1, 1, 5000); + test_create_topic_wait_exists(rk, topic, 1, -1, 5000); rkt = test_create_producer_topic(rk, topic, NULL); diff --git a/tests/0091-max_poll_interval_timeout.c b/tests/0091-max_poll_interval_timeout.c index e915bb8624..01614cb3d6 100644 --- a/tests/0091-max_poll_interval_timeout.c +++ b/tests/0091-max_poll_interval_timeout.c @@ -204,7 +204,7 @@ static void do_test_with_assign(const char *topic) { test_conf_init(&conf, NULL, 60); - test_create_topic_wait_exists(NULL, topic, 2, 1, 5000); + test_create_topic_wait_exists(NULL, topic, 2, -1, 5000); test_conf_set(conf, "session.timeout.ms", "6000"); test_conf_set(conf, "max.poll.interval.ms", "7000" /*7s*/); @@ -249,7 +249,7 @@ static void do_test_no_poll(const char *topic) { test_conf_init(&conf, NULL, 60); - test_create_topic_wait_exists(NULL, topic, 2, 1, 5000); + test_create_topic_wait_exists(NULL, topic, 2, -1, 5000); test_conf_set(conf, "session.timeout.ms", "6000"); test_conf_set(conf, "max.poll.interval.ms", "7000" /*7s*/); @@ -283,7 +283,7 @@ int main_0091_max_poll_interval_timeout(int argc, char **argv) { const char *topic = test_mk_topic_name("0091_max_poll_interval_tmout", 1); - test_create_topic_wait_exists(NULL, topic, 2, 1, 5000); + test_create_topic_wait_exists(NULL, topic, 2, -1, 5000); do_test_with_subscribe(topic); diff --git a/tests/0093-holb.c b/tests/0093-holb.c index 65fa4083a6..6d37ec8363 100644 --- a/tests/0093-holb.c +++ b/tests/0093-holb.c @@ -108,7 +108,7 @@ int main_0093_holb_consumer(int argc, char **argv) { test_conf_init(&conf, NULL, 60); - test_create_topic_wait_exists(NULL, topic, 1, 1, 5000); + test_create_topic_wait_exists(NULL, topic, 1, -1, 5000); test_produce_msgs_easy(topic, testid, 0, msgcnt); diff --git a/tests/0098-consumer-txn.cpp b/tests/0098-consumer-txn.cpp index 6d034a4361..059b7492d8 100644 --- a/tests/0098-consumer-txn.cpp +++ b/tests/0098-consumer-txn.cpp @@ -500,7 +500,7 @@ static void do_test_consumer_txn_test(bool use_java_producer) { topic_name = Test::mk_topic_name("0098-consumer_txn-0", 1); c = create_consumer(topic_name, "READ_COMMITTED"); - Test::create_topic_wait_exists(c, topic_name.c_str(), 1, 3, 5000); + Test::create_topic_wait_exists(c, topic_name.c_str(), 1, -1, 5000); run_producer("producer1, -1, 0x0, 5, BeginCommit, DoFlush", "producer1, -1, 0x10, 5, BeginAbort, DoFlush"); @@ -554,7 +554,7 @@ static void do_test_consumer_txn_test(bool use_java_producer) { topic_name = Test::mk_topic_name("0098-consumer_txn-0.1", 1); c = create_consumer(topic_name, "READ_COMMITTED"); - Test::create_topic_wait_exists(c, topic_name.c_str(), 1, 3, 5000); + Test::create_topic_wait_exists(c, topic_name.c_str(), 1, -1, 5000); run_producer("producer1, -1, 0x0, 5, BeginCommit, DontFlush", "producer1, -1, 0x10, 5, BeginAbort, DoFlush"); @@ -598,7 +598,7 @@ static void do_test_consumer_txn_test(bool use_java_producer) { topic_name = Test::mk_topic_name("0098-consumer_txn-0.2", 1); c = create_consumer(topic_name, "READ_COMMITTED"); - Test::create_topic_wait_exists(c, topic_name.c_str(), 1, 3, 5000); + Test::create_topic_wait_exists(c, topic_name.c_str(), 1, -1, 5000); run_producer("producer1, -1, 0x10, 5, BeginAbort, DoFlush", "producer1, -1, 0x30, 5, BeginCommit, DoFlush"); @@ -642,7 +642,7 @@ static void do_test_consumer_txn_test(bool use_java_producer) { topic_name = Test::mk_topic_name("0098-consumer_txn-1", 1); c = create_consumer(topic_name, "READ_COMMITTED"); - Test::create_topic_wait_exists(c, topic_name.c_str(), 1, 3, 5000); + Test::create_topic_wait_exists(c, topic_name.c_str(), 1, -1, 5000); TestEventCb::topic = topic_name; run_producer("producer3, -1, 0x10, 5, None, DoFlush", @@ -682,7 +682,7 @@ static void do_test_consumer_txn_test(bool use_java_producer) { topic_name = Test::mk_topic_name("0098-consumer_txn-1.1", 1); c = create_consumer(topic_name, "READ_COMMITTED"); - Test::create_topic_wait_exists(c, topic_name.c_str(), 1, 3, 5000); + Test::create_topic_wait_exists(c, topic_name.c_str(), 1, -1, 5000); run_producer("producer1, -1, 0x30, 5, BeginAbort, DoFlush", "producer3, -1, 0x40, 5, None, DoFlush", @@ -714,7 +714,7 @@ static void do_test_consumer_txn_test(bool use_java_producer) { topic_name = Test::mk_topic_name("0098-consumer_txn-1.2", 1); c = create_consumer(topic_name, "READ_COMMITTED"); - Test::create_topic_wait_exists(c, topic_name.c_str(), 1, 3, 5000); + Test::create_topic_wait_exists(c, topic_name.c_str(), 1, -1, 5000); run_producer("producer1, -1, 0x10, 5, BeginCommit, DoFlush", "producer1, -1, 0x20, 5, BeginAbort, DoFlush", @@ -746,7 +746,7 @@ static void do_test_consumer_txn_test(bool use_java_producer) { topic_name = Test::mk_topic_name("0098-consumer_txn-2", 1); c = create_consumer(topic_name, "READ_COMMITTED"); - Test::create_topic_wait_exists(c, topic_name.c_str(), 1, 3, 5000); + Test::create_topic_wait_exists(c, topic_name.c_str(), 1, -1, 5000); run_producer("producer1, -1, 0x10, 1, BeginAbort, DontFlush", "producer1, -1, 0x20, 1, BeginCommit, DontFlush", @@ -799,7 +799,7 @@ static void do_test_consumer_txn_test(bool use_java_producer) { topic_name = Test::mk_topic_name("0098-consumer_txn-2.1", 1); c = create_consumer(topic_name, "READ_COMMITTED"); - Test::create_topic_wait_exists(c, topic_name.c_str(), 1, 3, 5000); + Test::create_topic_wait_exists(c, topic_name.c_str(), 1, -1, 5000); run_producer("producer1, -1, 0x10, 1, BeginAbort, DoFlush", "producer1, -1, 0x20, 1, BeginCommit, DoFlush", @@ -883,7 +883,7 @@ static void do_test_consumer_txn_test(bool use_java_producer) { topic_name = Test::mk_topic_name("0098-consumer_txn-3", 1); c = create_consumer(topic_name, "READ_COMMITTED"); - Test::create_topic_wait_exists(c, topic_name.c_str(), 2, 3, 5000); + Test::create_topic_wait_exists(c, topic_name.c_str(), 2, -1, 5000); run_producer("producer1, 0, 0x10, 3, BeginOpen, DoFlush", "producer1, 1, 0x20, 3, ContinueOpen, DoFlush", @@ -928,7 +928,7 @@ static void do_test_consumer_txn_test(bool use_java_producer) { topic_name = Test::mk_topic_name("0098-consumer_txn-3.1", 1); c = create_consumer(topic_name, "READ_COMMITTED"); - Test::create_topic_wait_exists(c, topic_name.c_str(), 2, 3, 5000); + Test::create_topic_wait_exists(c, topic_name.c_str(), 2, -1, 5000); run_producer("producer1, 0, 0x55, 1, BeginCommit, DoFlush", "producer1, 0, 0x10, 3, BeginOpen, DoFlush", @@ -969,7 +969,7 @@ static void do_test_consumer_txn_test(bool use_java_producer) { topic_name = Test::mk_topic_name("0098-consumer_txn-4", 1); c = create_consumer(topic_name, "READ_COMMITTED"); - Test::create_topic_wait_exists(c, topic_name.c_str(), 1, 3, 5000); + Test::create_topic_wait_exists(c, topic_name.c_str(), 1, -1, 5000); run_producer("producer3, 0, 0x10, 1, None, DoFlush", "producer1, 0, 0x20, 3, BeginOpen, DoFlush", @@ -1004,7 +1004,7 @@ static void do_test_consumer_txn_test(bool use_java_producer) { topic_name = Test::mk_topic_name("0098-consumer_txn-4.1", 1); c = create_consumer(topic_name, "READ_COMMITTED"); - Test::create_topic_wait_exists(c, topic_name.c_str(), 1, 3, 5000); + Test::create_topic_wait_exists(c, topic_name.c_str(), 1, -1, 5000); run_producer("producer3, 0, 0x10, 1, None, DoFlush", "producer1, 0, 0x20, 3, BeginOpen, DoFlush", @@ -1039,7 +1039,7 @@ static void do_test_consumer_txn_test(bool use_java_producer) { topic_name = Test::mk_topic_name("0098-consumer_txn-4.2", 1); c = create_consumer(topic_name, "READ_COMMITTED"); - Test::create_topic_wait_exists(c, topic_name.c_str(), 1, 3, 5000); + Test::create_topic_wait_exists(c, topic_name.c_str(), 1, -1, 5000); run_producer("producer3, 0, 0x10, 1, None, DoFlush", "producer1, 0, 0x20, 3, BeginOpen, DoFlush", @@ -1074,7 +1074,7 @@ static void do_test_consumer_txn_test(bool use_java_producer) { topic_name = Test::mk_topic_name("0098-consumer_txn-4.3", 1); c = create_consumer(topic_name, "READ_COMMITTED"); - Test::create_topic_wait_exists(c, topic_name.c_str(), 1, 3, 5000); + Test::create_topic_wait_exists(c, topic_name.c_str(), 1, -1, 5000); run_producer("producer3, 0, 0x10, 1, None, DoFlush", "producer1, 0, 0x20, 3, BeginOpen, DoFlush", @@ -1111,7 +1111,7 @@ static void do_test_consumer_txn_test(bool use_java_producer) { test5: topic_name = Test::mk_topic_name("0098-consumer_txn-5", 1); c = create_consumer(topic_name, "READ_COMMITTED"); - Test::create_topic_wait_exists(c, topic_name.c_str(), 1, 3, 5000); + Test::create_topic_wait_exists(c, topic_name.c_str(), 1, -1, 5000); run_producer("producer1, 0, 0x10, 2, BeginOpen, DontFlush", "sleep,200", "producer1, 0, 0x20, 2, ContinueAbort, DontFlush", @@ -1167,7 +1167,7 @@ static void do_test_consumer_txn_test(bool use_java_producer) { topic_name = Test::mk_topic_name("0098-consumer_txn-0", 1); c = create_consumer(topic_name, "READ_COMMITTED"); - Test::create_topic_wait_exists(c, topic_name.c_str(), 1, 3, 5000); + Test::create_topic_wait_exists(c, topic_name.c_str(), 1, -1, 5000); TestEventCb::topic = topic_name; run_producer("producer3, 0, 0x10, 1, None, DoFlush", diff --git a/tests/0099-commit_metadata.c b/tests/0099-commit_metadata.c index 0ca4a339f2..9f3c23fdb4 100644 --- a/tests/0099-commit_metadata.c +++ b/tests/0099-commit_metadata.c @@ -164,7 +164,7 @@ int main_0099_commit_metadata(int argc, char **argv) { test_str_id_generate(group_id, sizeof(group_id)); - test_create_topic_wait_exists(NULL, topic, 1, 1, 5000); + test_create_topic_wait_exists(NULL, topic, 1, -1, 5000); origin_toppar = rd_kafka_topic_partition_list_new(1); diff --git a/tests/0101-fetch-from-follower.cpp b/tests/0101-fetch-from-follower.cpp index f4968268ec..3ebbe1c03e 100644 --- a/tests/0101-fetch-from-follower.cpp +++ b/tests/0101-fetch-from-follower.cpp @@ -290,7 +290,7 @@ static void do_fff_test(void) { int msgcnt = 1000; const int msgsize = 100; std::string topic_str = Test::mk_topic_name("0101-fetch-from-follower", 1); - test_create_topic_wait_exists(NULL, topic_str.c_str(), 1, 3, 5000); + test_create_topic_wait_exists(NULL, topic_str.c_str(), 1, -1, 5000); test_produce_msgs_easy_size(topic_str.c_str(), 0, 0, msgcnt, msgsize); int leader_id; diff --git a/tests/0102-static_group_rebalance.c b/tests/0102-static_group_rebalance.c index 8f6c2a90c9..2b9d3a1be4 100644 --- a/tests/0102-static_group_rebalance.c +++ b/tests/0102-static_group_rebalance.c @@ -162,7 +162,7 @@ static void do_test_static_group_rebalance(void) { c[0].mv = &mv; c[1].mv = &mv; - test_create_topic_wait_exists(NULL, topic, 3, 1, 5000); + test_create_topic_wait_exists(NULL, topic, 3, -1, 5000); test_produce_msgs_easy(topic, testid, RD_KAFKA_PARTITION_UA, msgcnt); test_conf_set(conf, "max.poll.interval.ms", "9000"); @@ -259,7 +259,7 @@ static void do_test_static_group_rebalance(void) { * New topics matching the subscription pattern should cause * group rebalance */ - test_create_topic_wait_exists(c->rk, tsprintf("%snew", topic), 1, 1, + test_create_topic_wait_exists(c->rk, tsprintf("%snew", topic), 1, -1, 5000); /* Await revocation */ @@ -469,7 +469,7 @@ static void do_test_fenced_member_classic(void) { test_conf_init(&conf, NULL, 30); - test_create_topic(NULL, topic, 3, 1); + test_create_topic(NULL, topic, 3, -1); test_conf_set(conf, "group.instance.id", "consumer1"); test_conf_set(conf, "client.id", "consumer1"); @@ -562,7 +562,7 @@ static void do_test_fenced_member_consumer(void) { test_conf_init(&conf, NULL, 30); - test_create_topic(NULL, topic, 3, 1); + test_create_topic(NULL, topic, 3, -1); test_conf_set(conf, "group.instance.id", "consumer1"); test_conf_set(conf, "client.id", "consumer1"); diff --git a/tests/0103-transactions.c b/tests/0103-transactions.c index 0bc1664d83..f32187c4b6 100644 --- a/tests/0103-transactions.c +++ b/tests/0103-transactions.c @@ -143,7 +143,7 @@ static void do_test_basic_producer_txn(rd_bool_t enable_compression) { // FIXME: add testing were the txn id is reused (and thus fails) /* Create topic */ - test_create_topic_wait_exists(p, topic, partition_cnt, 3, 5000); + test_create_topic_wait_exists(p, topic, partition_cnt, -1, 5000); /* Create consumer */ c_conf = conf; @@ -348,8 +348,8 @@ void do_test_consumer_producer_txn(void) { p1 = test_create_handle(RD_KAFKA_PRODUCER, tmpconf); /* Create input and output topics */ - test_create_topic_wait_exists(p1, input_topic, 4, 3, 5000); - test_create_topic_wait_exists(p1, output_topic, 4, 3, 5000); + test_create_topic_wait_exists(p1, input_topic, 4, -1, 5000); + test_create_topic_wait_exists(p1, output_topic, 4, -1, 5000); /* Seed input topic with messages */ TEST_CALL_ERROR__(rd_kafka_init_transactions(p1, 30 * 1000)); @@ -879,7 +879,7 @@ static void do_test_fatal_idempo_error_without_kip360(void) { p = test_create_handle(RD_KAFKA_PRODUCER, conf); - test_create_topic_wait_exists(p, topic, 1, 3, 5000); + test_create_topic_wait_exists(p, topic, 1, -1, 5000); TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30 * 1000)); @@ -1029,7 +1029,7 @@ static void do_test_empty_txn(rd_bool_t send_offsets, rd_bool_t do_commit) { rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); p = test_create_handle(RD_KAFKA_PRODUCER, conf); - test_create_topic_wait_exists(p, topic, 1, 3, 5000); + test_create_topic_wait_exists(p, topic, 1, -1, 5000); /* Produce some non-txnn messages for the consumer to read and commit */ test_produce_msgs_easy(topic, testid, 0, msgcnt); @@ -1130,7 +1130,7 @@ static void do_test_txn_abort_control_message_leader_epoch(void) { rd_kafka_conf_set_dr_msg_cb(p_conf, test_dr_msg_cb); p = test_create_handle(RD_KAFKA_PRODUCER, p_conf); - test_create_topic_wait_exists(p, topic, 1, 3, 5000); + test_create_topic_wait_exists(p, topic, 1, -1, 5000); TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 5000)); @@ -1225,7 +1225,7 @@ static void do_test_wmark_isolation_level(void) { rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); p = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf)); - test_create_topic_wait_exists(p, topic, 1, 3, 5000); + test_create_topic_wait_exists(p, topic, 1, -1, 5000); /* Produce some non-txn messages to avoid 0 as the committed hwmark */ test_produce_msgs_easy(topic, testid, 0, 100); diff --git a/tests/0107-topic_recreate.c b/tests/0107-topic_recreate.c index 68b9784796..0f79a541fb 100644 --- a/tests/0107-topic_recreate.c +++ b/tests/0107-topic_recreate.c @@ -189,7 +189,7 @@ static void do_test_create_delete_create(int part_cnt_1, int part_cnt_2) { consumer = test_create_consumer(topic, NULL, NULL, NULL); /* Create topic */ - test_create_topic_wait_exists(consumer, topic, part_cnt_1, 3, 5000); + test_create_topic_wait_exists(consumer, topic, part_cnt_1, -1, 5000); /* Start consumer */ test_consumer_subscribe(consumer, topic); @@ -216,7 +216,7 @@ static void do_test_create_delete_create(int part_cnt_1, int part_cnt_2) { rd_sleep(5); /* Re-create topic */ - test_create_topic_wait_exists(consumer, topic, part_cnt_2, 3, 5000); + test_create_topic_wait_exists(consumer, topic, part_cnt_2, -1, 5000); mtx_lock(&value_mtx); value = "after"; diff --git a/tests/0109-auto_create_topics.cpp b/tests/0109-auto_create_topics.cpp index c5582aa072..b61a9bc5e7 100644 --- a/tests/0109-auto_create_topics.cpp +++ b/tests/0109-auto_create_topics.cpp @@ -99,10 +99,10 @@ static void do_test_consumer(bool allow_auto_create_topics, delete conf; /* Create topics */ - Test::create_topic(c, topic_exists.c_str(), 1, 1); + Test::create_topic(c, topic_exists.c_str(), 1, -1); if (test_unauthorized_topic) { - Test::create_topic(c, topic_unauth.c_str(), 1, 1); + Test::create_topic(c, topic_unauth.c_str(), 1, -1); /* Add denying ACL for unauth topic */ test_kafka_cmd( diff --git a/tests/0111-delay_create_topics.cpp b/tests/0111-delay_create_topics.cpp index a46282bd17..ad480b73d9 100644 --- a/tests/0111-delay_create_topics.cpp +++ b/tests/0111-delay_create_topics.cpp @@ -105,7 +105,7 @@ static void do_test_producer(bool timeout_too_short) { while (test_clock() < end_wait) p->poll(1000); - Test::create_topic(NULL, topic.c_str(), 1, 3); + Test::create_topic(NULL, topic.c_str(), 1, -1); p->flush(10 * 1000); diff --git a/tests/0112-assign_unknown_part.c b/tests/0112-assign_unknown_part.c index d5549c99e7..b35818f41e 100644 --- a/tests/0112-assign_unknown_part.c +++ b/tests/0112-assign_unknown_part.c @@ -50,7 +50,7 @@ int main_0112_assign_unknown_part(int argc, char **argv) { c = test_create_consumer(topic, NULL, NULL, NULL); TEST_SAY("Creating topic %s with 1 partition\n", topic); - test_create_topic_wait_exists(c, topic, 1, 1, 10 * 1000); + test_create_topic_wait_exists(c, topic, 1, -1, 10 * 1000); TEST_SAY("Producing message to partition 0\n"); test_produce_msgs_easy(topic, testid, 0, 1); diff --git a/tests/0113-cooperative_rebalance.cpp b/tests/0113-cooperative_rebalance.cpp index c9b068cfd6..c24a15c495 100644 --- a/tests/0113-cooperative_rebalance.cpp +++ b/tests/0113-cooperative_rebalance.cpp @@ -656,9 +656,9 @@ static void a_assign_tests() { const int msgsize2 = 200; std::string topic1_str = Test::mk_topic_name("0113-a1", 1); - test_create_topic(NULL, topic1_str.c_str(), 1, 1); + test_create_topic(NULL, topic1_str.c_str(), 1, -1); std::string topic2_str = Test::mk_topic_name("0113-a2", 1); - test_create_topic(NULL, topic2_str.c_str(), 1, 1); + test_create_topic(NULL, topic2_str.c_str(), 1, -1); test_wait_topic_exists(NULL, topic1_str.c_str(), 10 * 1000); test_wait_topic_exists(NULL, topic2_str.c_str(), 10 * 1000); @@ -907,7 +907,7 @@ static void b_subscribe_with_cb_test(rd_bool_t close_consumer) { std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1); std::string group_name = Test::mk_unique_group_name("0113-cooperative_rebalance"); - test_create_topic(NULL, topic_name.c_str(), 2, 1); + test_create_topic(NULL, topic_name.c_str(), 2, -1); DefaultRebalanceCb rebalance_cb1; RdKafka::KafkaConsumer *c1 = make_consumer( @@ -1088,7 +1088,7 @@ static void c_subscribe_no_cb_test(rd_bool_t close_consumer) { std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1); std::string group_name = Test::mk_unique_group_name("0113-cooperative_rebalance"); - test_create_topic(NULL, topic_name.c_str(), 2, 1); + test_create_topic(NULL, topic_name.c_str(), 2, -1); RdKafka::KafkaConsumer *c1 = make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 20); @@ -1144,10 +1144,10 @@ static void d_change_subscription_add_topic(rd_bool_t close_consumer) { std::string topic_name_1 = Test::mk_topic_name("0113-cooperative_rebalance", 1); - test_create_topic(NULL, topic_name_1.c_str(), 2, 1); + test_create_topic(NULL, topic_name_1.c_str(), 2, -1); std::string topic_name_2 = Test::mk_topic_name("0113-cooperative_rebalance", 1); - test_create_topic(NULL, topic_name_2.c_str(), 2, 1); + test_create_topic(NULL, topic_name_2.c_str(), 2, -1); std::string group_name = Test::mk_unique_group_name("0113-cooperative_rebalance"); @@ -1200,10 +1200,10 @@ static void e_change_subscription_remove_topic(rd_bool_t close_consumer) { std::string topic_name_1 = Test::mk_topic_name("0113-cooperative_rebalance", 1); - test_create_topic(NULL, topic_name_1.c_str(), 2, 1); + test_create_topic(NULL, topic_name_1.c_str(), 2, -1); std::string topic_name_2 = Test::mk_topic_name("0113-cooperative_rebalance", 1); - test_create_topic(NULL, topic_name_2.c_str(), 2, 1); + test_create_topic(NULL, topic_name_2.c_str(), 2, -1); std::string group_name = Test::mk_unique_group_name("0113-cooperative_rebalance"); @@ -1313,7 +1313,7 @@ static void f_assign_call_cooperative() { SUB_TEST(); std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1); - test_create_topic(NULL, topic_name.c_str(), 1, 1); + test_create_topic(NULL, topic_name.c_str(), 1, -1); std::string group_name = Test::mk_unique_group_name("0113-cooperative_rebalance"); @@ -1419,7 +1419,7 @@ static void g_incremental_assign_call_eager() { } std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1); - test_create_topic(NULL, topic_name.c_str(), 1, 1); + test_create_topic(NULL, topic_name.c_str(), 1, -1); std::string group_name = Test::mk_unique_group_name("0113-cooperative_rebalance"); @@ -1457,10 +1457,10 @@ static void h_delete_topic() { std::string topic_name_1 = Test::mk_topic_name("0113-cooperative_rebalance", 1); - test_create_topic(NULL, topic_name_1.c_str(), 1, 1); + test_create_topic(NULL, topic_name_1.c_str(), 1, -1); std::string topic_name_2 = Test::mk_topic_name("0113-cooperative_rebalance", 1); - test_create_topic(NULL, topic_name_2.c_str(), 1, 1); + test_create_topic(NULL, topic_name_2.c_str(), 1, -1); std::string group_name = Test::mk_unique_group_name("0113-cooperative_rebalance"); @@ -1530,7 +1530,7 @@ static void i_delete_topic_2() { std::string topic_name_1 = Test::mk_topic_name("0113-cooperative_rebalance", 1); - test_create_topic(NULL, topic_name_1.c_str(), 1, 1); + test_create_topic(NULL, topic_name_1.c_str(), 1, -1); std::string group_name = Test::mk_unique_group_name("0113-cooperative_rebalance"); @@ -1587,7 +1587,7 @@ static void j_delete_topic_no_rb_callback() { std::string topic_name_1 = Test::mk_topic_name("0113-cooperative_rebalance", 1); - test_create_topic(NULL, topic_name_1.c_str(), 1, 1); + test_create_topic(NULL, topic_name_1.c_str(), 1, -1); std::string group_name = Test::mk_unique_group_name("0113-cooperative_rebalance"); @@ -1637,7 +1637,7 @@ static void k_add_partition() { SUB_TEST(); std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1); - test_create_topic(NULL, topic_name.c_str(), 1, 1); + test_create_topic(NULL, topic_name.c_str(), 1, -1); std::string group_name = Test::mk_unique_group_name("0113-cooperative_rebalance"); @@ -1720,8 +1720,8 @@ static void l_unsubscribe() { Test::mk_topic_name("0113-cooperative_rebalance", 1); std::string group_name = Test::mk_unique_group_name("0113-cooperative_rebalance"); - test_create_topic(NULL, topic_name_1.c_str(), 2, 1); - test_create_topic(NULL, topic_name_2.c_str(), 2, 1); + test_create_topic(NULL, topic_name_1.c_str(), 2, -1); + test_create_topic(NULL, topic_name_2.c_str(), 2, -1); DefaultRebalanceCb rebalance_cb1; RdKafka::KafkaConsumer *c1 = make_consumer( @@ -1848,7 +1848,7 @@ static void m_unsubscribe_2() { std::string topic_name = Test::mk_topic_name("0113-cooperative_rebalance", 1); std::string group_name = Test::mk_unique_group_name("0113-cooperative_rebalance"); - test_create_topic(NULL, topic_name.c_str(), 2, 1); + test_create_topic(NULL, topic_name.c_str(), 2, -1); RdKafka::KafkaConsumer *c = make_consumer("C_1", group_name, "cooperative-sticky", NULL, NULL, 15); @@ -1941,8 +1941,8 @@ static void n_wildcard() { Test::assignment_partition_count(c2, NULL) == 0 && !created_topics) { Test::Say( "Creating two topics with 2 partitions each that match regex\n"); - test_create_topic(NULL, topic_name_1.c_str(), 2, 1); - test_create_topic(NULL, topic_name_2.c_str(), 2, 1); + test_create_topic(NULL, topic_name_1.c_str(), 2, -1); + test_create_topic(NULL, topic_name_2.c_str(), 2, -1); test_wait_topic_exists(NULL, topic_name_1.c_str(), 5000); test_wait_topic_exists(NULL, topic_name_2.c_str(), 5000); /* The consumers should autonomously discover these topics and start @@ -2096,8 +2096,8 @@ static void o_java_interop() { std::string topic_name_1 = Test::mk_topic_name("0113_o_2", 1); std::string topic_name_2 = Test::mk_topic_name("0113_o_6", 1); std::string group_name = Test::mk_unique_group_name("0113_o"); - test_create_topic(NULL, topic_name_1.c_str(), 2, 1); - test_create_topic(NULL, topic_name_2.c_str(), 6, 1); + test_create_topic(NULL, topic_name_1.c_str(), 2, -1); + test_create_topic(NULL, topic_name_2.c_str(), 6, -1); DefaultRebalanceCb rebalance_cb; RdKafka::KafkaConsumer *c = make_consumer( @@ -2204,9 +2204,9 @@ static void s_subscribe_when_rebalancing(int variation) { Test::mk_topic_name("0113-cooperative_rebalance", 1); std::string group_name = Test::mk_unique_group_name("0113-cooperative_rebalance"); - test_create_topic(NULL, topic_name_1.c_str(), 1, 1); - test_create_topic(NULL, topic_name_2.c_str(), 1, 1); - test_create_topic(NULL, topic_name_3.c_str(), 1, 1); + test_create_topic(NULL, topic_name_1.c_str(), 1, -1); + test_create_topic(NULL, topic_name_2.c_str(), 1, -1); + test_create_topic(NULL, topic_name_3.c_str(), 1, -1); DefaultRebalanceCb rebalance_cb; RdKafka::KafkaConsumer *c = make_consumer( @@ -2259,7 +2259,7 @@ static void t_max_poll_interval_exceeded(int variation) { Test::mk_topic_name("0113-cooperative_rebalance", 1); std::string group_name = Test::mk_unique_group_name("0113-cooperative_rebalance"); - test_create_topic(NULL, topic_name_1.c_str(), 2, 1); + test_create_topic(NULL, topic_name_1.c_str(), 2, -1); std::vector > additional_conf; additional_conf.push_back(std::pair( @@ -2416,8 +2416,8 @@ static void u_multiple_subscription_changes(bool use_rebalance_cb, string topic_name_2 = Test::mk_topic_name("0113u_2", 1); string group_name = Test::mk_unique_group_name("0113u"); - test_create_topic(NULL, topic_name_1.c_str(), N_PARTS_PER_TOPIC, 1); - test_create_topic(NULL, topic_name_2.c_str(), N_PARTS_PER_TOPIC, 1); + test_create_topic(NULL, topic_name_1.c_str(), N_PARTS_PER_TOPIC, -1); + test_create_topic(NULL, topic_name_2.c_str(), N_PARTS_PER_TOPIC, -1); Test::Say("Creating consumers\n"); DefaultRebalanceCb rebalance_cbs[N_CONSUMERS]; @@ -3247,7 +3247,7 @@ static void v_commit_during_rebalance(bool with_rebalance_cb, */ p = test_create_producer(); - test_create_topic_wait_exists(p, topic, partition_cnt, 1, 5000); + test_create_topic_wait_exists(p, topic, partition_cnt, -1, 5000); for (i = 0; i < partition_cnt; i++) { test_produce_msgs2(p, topic, testid, i, i * msgcnt_per_partition, @@ -3331,7 +3331,7 @@ static void x_incremental_rebalances(void) { SUB_TEST(); test_conf_init(&conf, NULL, 60); - test_create_topic_wait_exists(NULL, topic, 6, 1, 5000); + test_create_topic_wait_exists(NULL, topic, 6, -1, 5000); test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky"); for (i = 0; i < _NUM_CONS; i++) { diff --git a/tests/0114-sticky_partitioning.cpp b/tests/0114-sticky_partitioning.cpp index 90b30c2eda..a0cb478c0d 100644 --- a/tests/0114-sticky_partitioning.cpp +++ b/tests/0114-sticky_partitioning.cpp @@ -44,7 +44,7 @@ */ static void do_test_sticky_partitioning(int sticky_delay) { std::string topic = Test::mk_topic_name(__FILE__, 1); - Test::create_topic_wait_exists(NULL, topic.c_str(), 3, 1, 5000); + Test::create_topic_wait_exists(NULL, topic.c_str(), 3, -1, 5000); RdKafka::Conf *conf; Test::conf_init(&conf, NULL, 0); diff --git a/tests/0115-producer_auth.cpp b/tests/0115-producer_auth.cpp index ae839ab4da..dbd8af2ad4 100644 --- a/tests/0115-producer_auth.cpp +++ b/tests/0115-producer_auth.cpp @@ -86,7 +86,7 @@ static void do_test_producer(bool topic_known) { /* Create topic */ std::string topic_unauth = Test::mk_topic_name("0115-unauthorized", 1); - Test::create_topic_wait_exists(NULL, topic_unauth.c_str(), 3, 1, 5000); + Test::create_topic_wait_exists(NULL, topic_unauth.c_str(), 3, -1, 5000); int exp_dr_cnt = 0; diff --git a/tests/0119-consumer_auth.cpp b/tests/0119-consumer_auth.cpp index 01119d5200..1458692f8c 100644 --- a/tests/0119-consumer_auth.cpp +++ b/tests/0119-consumer_auth.cpp @@ -62,7 +62,7 @@ static void do_test_fetch_unauth() { /* Create topic */ const int partition_cnt = 3; - Test::create_topic_wait_exists(NULL, topic.c_str(), partition_cnt, 1, 5000); + Test::create_topic_wait_exists(NULL, topic.c_str(), partition_cnt, -1, 5000); /* Produce messages */ test_produce_msgs_easy(topic.c_str(), 0, RdKafka::Topic::PARTITION_UA, 1000); diff --git a/tests/0125-immediate_flush.c b/tests/0125-immediate_flush.c index 8d7f0dfcd3..f4b7e55907 100644 --- a/tests/0125-immediate_flush.c +++ b/tests/0125-immediate_flush.c @@ -48,7 +48,7 @@ void do_test_flush_overrides_linger_ms_time() { rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); rk = test_create_handle(RD_KAFKA_PRODUCER, conf); - test_create_topic_wait_exists(rk, topic, 1, 1, 5000); + test_create_topic_wait_exists(rk, topic, 1, -1, 5000); /* Produce half set of messages without waiting for delivery. */ test_produce_msgs2_nowait(rk, topic, 0, 0, 0, msgcnt / 2, NULL, 50, diff --git a/tests/0126-oauthbearer_oidc.c b/tests/0126-oauthbearer_oidc.c index 562bb4d965..e04e480e55 100644 --- a/tests/0126-oauthbearer_oidc.c +++ b/tests/0126-oauthbearer_oidc.c @@ -73,7 +73,7 @@ do_test_produce_consumer_with_OIDC(const char *test_name, p1 = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf)); topic = test_mk_topic_name("0126-oauthbearer_oidc", 1); - test_create_topic_wait_exists(p1, topic, 1, 3, 5000); + test_create_topic_wait_exists(p1, topic, 1, -1, 5000); TEST_SAY("Topic: %s is created\n", topic); test_produce_msgs2(p1, topic, testid, 0, 0, 1, NULL, 0); diff --git a/tests/0129-fetch_aborted_msgs.c b/tests/0129-fetch_aborted_msgs.c index 5d9b63b74f..96240ba382 100644 --- a/tests/0129-fetch_aborted_msgs.c +++ b/tests/0129-fetch_aborted_msgs.c @@ -56,7 +56,7 @@ int main_0129_fetch_aborted_msgs(int argc, char **argv) { rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); rk = test_create_handle(RD_KAFKA_PRODUCER, conf); - test_admin_create_topic(rk, topic, 1, 1, + test_admin_create_topic(rk, topic, 1, -1, (const char *[]) {"max.message.bytes", "10000", "segment.bytes", "20000", NULL}); diff --git a/tests/0132-strategy_ordering.c b/tests/0132-strategy_ordering.c index 379bed8c18..26edde94e2 100644 --- a/tests/0132-strategy_ordering.c +++ b/tests/0132-strategy_ordering.c @@ -125,7 +125,7 @@ static void do_test_strategy_ordering(const char *assignor, testid = test_id_generate(); topic = test_mk_topic_name("0132-strategy_ordering", 1); - test_create_topic_wait_exists(NULL, topic, _PART_CNT, 1, 5000); + test_create_topic_wait_exists(NULL, topic, _PART_CNT, -1, 5000); test_produce_msgs_easy(topic, testid, RD_KAFKA_PARTITION_UA, msgcnt); test_conf_init(&conf, NULL, 30); diff --git a/tests/0137-barrier_batch_consume.c b/tests/0137-barrier_batch_consume.c index 19bec387db..3ceba5536e 100644 --- a/tests/0137-barrier_batch_consume.c +++ b/tests/0137-barrier_batch_consume.c @@ -136,7 +136,7 @@ static void do_test_consume_batch_with_seek(void) { /* Produce messages */ topic = test_mk_topic_name("0137-barrier_batch_consume", 1); - test_create_topic_wait_exists(NULL, topic, partition_cnt, 1, 5000); + test_create_topic_wait_exists(NULL, topic, partition_cnt, -1, 5000); for (p = 0; p < partition_cnt; p++) test_produce_msgs_easy(topic, testid, p, @@ -226,7 +226,7 @@ static void do_test_consume_batch_with_pause_and_resume_different_batch(void) { /* Produce messages */ topic = test_mk_topic_name("0137-barrier_batch_consume", 1); - test_create_topic_wait_exists(NULL, topic, partition_cnt, 1, 5000); + test_create_topic_wait_exists(NULL, topic, partition_cnt, -1, 5000); for (p = 0; p < partition_cnt; p++) test_produce_msgs_easy(topic, testid, p, @@ -331,7 +331,7 @@ static void do_test_consume_batch_with_pause_and_resume_same_batch(void) { /* Produce messages */ topic = test_mk_topic_name("0137-barrier_batch_consume", 1); - test_create_topic_wait_exists(NULL, topic, partition_cnt, 1, 5000); + test_create_topic_wait_exists(NULL, topic, partition_cnt, -1, 5000); for (p = 0; p < partition_cnt; p++) test_produce_msgs_easy(topic, testid, p, @@ -427,7 +427,7 @@ static void do_test_consume_batch_store_offset(void) { /* Produce messages */ topic = test_mk_topic_name("0137-barrier_batch_consume", 1); - test_create_topic_wait_exists(NULL, topic, partition_cnt, 1, 5000); + test_create_topic_wait_exists(NULL, topic, partition_cnt, -1, 5000); for (p = 0; p < partition_cnt; p++) test_produce_msgs_easy(topic, testid, p, @@ -508,7 +508,7 @@ static void do_test_consume_batch_control_msgs(void) { producer = test_create_handle(RD_KAFKA_PRODUCER, conf); - test_create_topic_wait_exists(producer, topic, partition_cnt, 1, 5000); + test_create_topic_wait_exists(producer, topic, partition_cnt, -1, 5000); TEST_CALL_ERROR__(rd_kafka_init_transactions(producer, 30 * 1000)); diff --git a/tests/0140-commit_metadata.cpp b/tests/0140-commit_metadata.cpp index 03dc7d129c..e526335c33 100644 --- a/tests/0140-commit_metadata.cpp +++ b/tests/0140-commit_metadata.cpp @@ -54,7 +54,7 @@ static void test_commit_metadata() { delete conf; Test::Say("Create topic.\n"); - Test::create_topic_wait_exists(consumer, topic.c_str(), 1, 1, 5000); + Test::create_topic_wait_exists(consumer, topic.c_str(), 1, -1, 5000); Test::Say("Commit offsets.\n"); std::vector offsets; diff --git a/tests/0142-reauthentication.c b/tests/0142-reauthentication.c index eca0c4bd14..ee0f750eef 100644 --- a/tests/0142-reauthentication.c +++ b/tests/0142-reauthentication.c @@ -130,7 +130,7 @@ void do_test_consumer(int64_t reauth_time, const char *topic) { p1 = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf)); - test_create_topic_wait_exists(p1, topic, 1, 3, 5000); + test_create_topic_wait_exists(p1, topic, 1, -1, 5000); TEST_SAY("Topic: %s is created\n", topic); test_conf_set(conf, "auto.offset.reset", "earliest"); diff --git a/tests/test.c b/tests/test.c index 4dbef9d16e..73192b5cb8 100644 --- a/tests/test.c +++ b/tests/test.c @@ -6471,7 +6471,7 @@ rd_kafka_resp_err_t test_CreateTopics_simple(rd_kafka_t *rk, for (i = 0; i < topic_cnt; i++) { char errstr[512]; new_topics[i] = rd_kafka_NewTopic_new( - topics[i], num_partitions, 1, errstr, sizeof(errstr)); + topics[i], num_partitions, -1, errstr, sizeof(errstr)); TEST_ASSERT(new_topics[i], "Failed to NewTopic(\"%s\", %d) #%" PRIusz ": %s", topics[i], num_partitions, i, errstr); From 582fd8cfa9ce3a17ef05da0a5a267ac4d99fe41a Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 14 Apr 2025 09:50:58 +0200 Subject: [PATCH 2/5] Skip jobs without reporting them as failed --- .semaphore/run-all-tests.yml | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/.semaphore/run-all-tests.yml b/.semaphore/run-all-tests.yml index d306651f47..c7825a0e8f 100644 --- a/.semaphore/run-all-tests.yml +++ b/.semaphore/run-all-tests.yml @@ -35,21 +35,29 @@ blocks: type: s1-prod-ubuntu24-04-amd64-2 prologue: commands: - - if [[ "$TEST_ARCHES" != *"x86_64"* ]]; then exit 0; fi + # Semaphore CI uses return 130 + # to indicate that the job should be stopped. + - if [[ "$TEST_ARCHES" != *"x86_64"* ]]; then + return 130; + fi jobs: - name: "PLAINTEXT cluster (x86_64)" env_vars: - name: TEST_SSL value: "False" commands: - - if [[ "$TEST_TYPE" != *"plaintext"* ]]; then exit 0; fi + - if [[ "$TEST_TYPE" != *"plaintext"* ]]; then + return 130; + fi - ./tests/run-all-tests.sh - name: "SSL cluster (x86_64)" env_vars: - name: TEST_SSL value: "True" commands: - - if [[ "$TEST_TYPE" != *"ssl"* ]]; then exit 0; fi + - if [[ "$TEST_TYPE" != *"ssl"* ]]; then + return 130; + fi - ./tests/run-all-tests.sh - name: "Run all tests (aarch64)" dependencies: [] @@ -59,19 +67,25 @@ blocks: type: s1-prod-ubuntu24-04-arm64-2 prologue: commands: - - if [[ "$TEST_ARCHES" != *"aarch64"* ]]; then exit 0; fi + - if [[ "$TEST_ARCHES" != *"aarch64"* ]]; then + return 130; + fi jobs: - name: "PLAINTEXT cluster (aarch64)" env_vars: - name: TEST_SSL value: "False" commands: - - if [[ "$TEST_TYPE" != *"plaintext"* ]]; then exit 0; fi + - if [[ "$TEST_TYPE" != *"plaintext"* ]]; then + return 130; + fi - ./tests/run-all-tests.sh - name: "SSL cluster (aarch64)" env_vars: - name: TEST_SSL value: "True" commands: - - if [[ "$TEST_TYPE" != *"ssl"* ]]; then exit 0; fi + - if [[ "$TEST_TYPE" != *"ssl"* ]]; then + return 130; + fi - ./tests/run-all-tests.sh From 9f273d62eadbb10845727a7b97642722d496b0ba Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 16 Jul 2025 20:25:13 +0200 Subject: [PATCH 3/5] Script to run integration tests with CC or CP --- tests/requirements.txt | 1 + tests/run-tests-confluent-kafka.py | 101 +++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100755 tests/run-tests-confluent-kafka.py diff --git a/tests/requirements.txt b/tests/requirements.txt index 291f07aa68..b70643d3c0 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -1,2 +1,3 @@ trivup/trivup-0.13.0.tar.gz jsoncomment +httpx diff --git a/tests/run-tests-confluent-kafka.py b/tests/run-tests-confluent-kafka.py new file mode 100755 index 0000000000..44fd4d21f8 --- /dev/null +++ b/tests/run-tests-confluent-kafka.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +import os +import sys +import subprocess +import signal +import httpx +curr_dir = os.path.dirname(os.path.realpath(__file__)) +os.chdir(curr_dir) + +# This script is used to run all tests against Confluent Kafka. +# +# If `REST_ENDPOINT`, `CLIENT_KEY`, `CLIENT_SECRET`, `CLUSTER_LKC` are set, +# it will use the Confluent Kafka REST API to set `auto.create.topics.enable` +# to `true` (dedicated clusters only), otherwise it will skip that step. +# +# `num.partitions` cluster property for default partition number +# when auto-creating topics must be `4`. +# +# The script expects the `TEST_KAFKA_VERSION` environment variable to be set +# to the version of Apache Kafka that is compatible with the Confluent Kafka +# version being tested against. +# +# It will always skip local tests, it doesn't start a local clusters and +# requires a `test.conf` file to be present in the current directory instead. + +args = sys.argv[1:] + +if '-l' in args: + print('Local tests need to be excluded when running against' + ' Confluent Kafka', file=sys.stderr) + sys.exit(1) + +if 'TEST_KAFKA_VERSION' not in os.environ: + print('TEST_KAFKA_VERSION environment variable is not set,' + ' please set it to the AK compatible version used ' + 'in Confluent Kafka', + file=sys.stderr) + sys.exit(1) + +do_enable_auto_create_topics_enable = all([ + var in os.environ for var in + ['REST_ENDPOINT', 'CLIENT_KEY', 'CLIENT_SECRET', 'CLUSTER_LKC'] +]) +if not do_enable_auto_create_topics_enable: + print('WARNING: Not setting up auto.create.topics.enable for the cluster,' + ' missing environment variables', + file=sys.stderr) + +if not os.path.exists('test.conf'): + print('test.conf file does not exist', + file=sys.stderr) + sys.exit(1) + +# FIXME: verify these skipped tests +TESTS_SKIP = '0054,0081,0113,0122,0129' + + +def enable_auto_create_topics_enable(): + REST_ENDPOINT = os.environ['REST_ENDPOINT'] + CLUSTER_LKC = os.environ['CLUSTER_LKC'] + CLIENT_KEY = os.environ['CLIENT_KEY'] + CLIENT_SECRET = os.environ['CLIENT_SECRET'] + + r = httpx.put(f'{REST_ENDPOINT}/kafka/v3/clusters/{CLUSTER_LKC}' + '/broker-configs/auto.create.topics.enable', + auth=(CLIENT_KEY, CLIENT_SECRET), json={'value': 'true'}) + assert r.status_code == 204, ('Failed to enable auto.create.topics.enable' + f': {r.status_code} {r.text}') + + +def run_tests(): + if do_enable_auto_create_topics_enable: + enable_auto_create_topics_enable() + + interrupted = False + p = subprocess.Popen(['./run-test-batches.py', '-L', '-p1'] + args, + env={'CI': 'true', + 'TESTS_SKIP': TESTS_SKIP, + **os.environ}, + start_new_session=True) + try: + p.communicate() + return p.returncode + except BaseException: + interrupted = True + return 1 + finally: + if interrupted: + print('Terminating process group...', file=sys.stderr) + os.killpg(p.pid, signal.SIGINT) + try: + p.wait(10) + except subprocess.TimeoutExpired: + os.killpg(p.pid, signal.SIGKILL) + p.wait(10) + + +error = run_tests() +print('End of run-tests-confluent-kafka', file=sys.stderr) +if error: + sys.exit(error) From aff151c5f8fa0cc7c17214ec4c3bea4e1d7d2e1b Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 16 Jul 2025 20:30:03 +0200 Subject: [PATCH 4/5] Skip tests that require a local Kafka broker to run shell commands if there's no local broker --- tests/0052-msg_timestamps.c | 15 +++++------ tests/0077-compaction.c | 34 ++++++++++++++----------- tests/0098-consumer-txn.cpp | 3 +++ tests/0109-auto_create_topics.cpp | 3 +++ tests/0115-producer_auth.cpp | 3 +++ tests/0119-consumer_auth.cpp | 3 +++ tests/test.c | 42 ++++++++++++++++++++++--------- tests/test.h | 1 + tests/testshared.h | 2 ++ 9 files changed, 73 insertions(+), 33 deletions(-) diff --git a/tests/0052-msg_timestamps.c b/tests/0052-msg_timestamps.c index 26c2a464b4..d7548eb27f 100644 --- a/tests/0052-msg_timestamps.c +++ b/tests/0052-msg_timestamps.c @@ -144,8 +144,13 @@ static void test_timestamps(const char *broker_tstype, test_mk_topic_name(tsprintf("0052_msg_timestamps_%s_%s_%s", broker_tstype, broker_version, codec), 1); - const int msgcnt = 20; - uint64_t testid = test_id_generate(); + const int msgcnt = 20; + uint64_t testid = test_id_generate(); + const char *topic_configs[] = { + "message.timestamp.type", + broker_tstype, + NULL, + }; if ((!strncmp(broker_version, "0.9", 3) || !strncmp(broker_version, "0.8", 3)) && @@ -161,11 +166,7 @@ static void test_timestamps(const char *broker_tstype, TEST_SAY(_C_MAG "Timestamp test using %s\n", topic); test_timeout_set(30); - test_kafka_topics( - "--create --topic \"%s\" " - "--replication-factor 1 --partitions 1 " - "--config message.timestamp.type=%s", - topic, broker_tstype); + test_admin_create_topic(NULL, topic, 1, -1, topic_configs); test_wait_topic_exists(NULL, topic, 5000); TEST_SAY(_C_MAG "Producing %d messages to %s\n", msgcnt, topic); diff --git a/tests/0077-compaction.c b/tests/0077-compaction.c index 433c249b00..bdc4768316 100644 --- a/tests/0077-compaction.c +++ b/tests/0077-compaction.c @@ -172,8 +172,25 @@ static void do_test_compaction(int msgs_per_key, const char *compression) { int cnt = 0; test_msgver_t mv; test_msgver_t mv_correct; - int msgcounter = 0; - const int fillcnt = 20; + int msgcounter = 0; + const int fillcnt = 20; + const char *topic_configs[] = { + "cleanup.policy", + "compact", + "segment.ms", + "10000", + "segment.bytes", + "10000", + "min.cleanable.dirty.ratio", + "0.01", + "delete.retention.ms", + "86400", + "file.delete.delay.ms", + "10000", + "max.compaction.lag.ms", + "100", + NULL, + }; testid = test_id_generate(); @@ -182,18 +199,7 @@ static void do_test_compaction(int msgs_per_key, const char *compression) { "Test compaction on topic %s with %s compression (%d messages)\n", topic, compression ? compression : "no", msgcnt); - test_kafka_topics( - "--create --topic \"%s\" " - "--partitions %d " - "--replication-factor 1 " - "--config cleanup.policy=compact " - "--config segment.ms=10000 " - "--config segment.bytes=10000 " - "--config min.cleanable.dirty.ratio=0.01 " - "--config delete.retention.ms=86400 " - "--config file.delete.delay.ms=10000 " - "--config max.compaction.lag.ms=100", - topic, partition + 1); + test_admin_create_topic(NULL, topic, partition + 1, -1, topic_configs); test_wait_topic_exists(NULL, topic, 5000); test_conf_init(&conf, NULL, 120); diff --git a/tests/0098-consumer-txn.cpp b/tests/0098-consumer-txn.cpp index 059b7492d8..daa9c4e9c4 100644 --- a/tests/0098-consumer-txn.cpp +++ b/tests/0098-consumer-txn.cpp @@ -467,6 +467,9 @@ static void txn_producer(const std::string &brokers, static void do_test_consumer_txn_test(bool use_java_producer) { + if (use_java_producer && !test_can_kafka_cmd(0)) + return; + std::string errstr; std::string topic_name; RdKafka::KafkaConsumer *c; diff --git a/tests/0109-auto_create_topics.cpp b/tests/0109-auto_create_topics.cpp index b61a9bc5e7..37cd359a91 100644 --- a/tests/0109-auto_create_topics.cpp +++ b/tests/0109-auto_create_topics.cpp @@ -259,6 +259,9 @@ static void do_test_consumer(bool allow_auto_create_topics, extern "C" { int main_0109_auto_create_topics(int argc, char **argv) { + if (!test_can_kafka_cmd(1)) + return 0; /* Skip test if cannot create ACLs with command line */ + /* Parameters: * allow auto create, with wildcards, test unauthorized topic */ do_test_consumer(true, false, false); diff --git a/tests/0115-producer_auth.cpp b/tests/0115-producer_auth.cpp index dbd8af2ad4..2e2d22e9f8 100644 --- a/tests/0115-producer_auth.cpp +++ b/tests/0115-producer_auth.cpp @@ -168,6 +168,9 @@ static void do_test_producer(bool topic_known) { extern "C" { int main_0115_producer_auth(int argc, char **argv) { + if (!test_can_kafka_cmd(1)) + return 0; /* Skip test if cannot create ACLs with command line */ + /* We can't bother passing Java security config to kafka-acls.sh */ if (test_needs_auth()) { Test::Skip("Cluster authentication required\n"); diff --git a/tests/0119-consumer_auth.cpp b/tests/0119-consumer_auth.cpp index 1458692f8c..628ed4f73b 100644 --- a/tests/0119-consumer_auth.cpp +++ b/tests/0119-consumer_auth.cpp @@ -154,6 +154,9 @@ static void do_test_fetch_unauth() { extern "C" { int main_0119_consumer_auth(int argc, char **argv) { + if (!test_can_kafka_cmd(1)) + return 0; /* Skip test if cannot create ACLs with command line */ + /* We can't bother passing Java security config to kafka-acls.sh */ if (test_needs_auth()) { Test::Skip("Cluster authentication required\n"); diff --git a/tests/test.c b/tests/test.c index 73192b5cb8..3e3633b648 100644 --- a/tests/test.c +++ b/tests/test.c @@ -398,7 +398,10 @@ struct test tests[] = { #endif _TEST(0050_subscribe_adds, 0, TEST_BRKVER(0, 9, 0, 0)), _TEST(0051_assign_adds, 0, TEST_BRKVER(0, 9, 0, 0)), - _TEST(0052_msg_timestamps, 0, TEST_BRKVER(0, 10, 0, 0)), + _TEST(0052_msg_timestamps, + 0, + /* Can Create topics with AdminClient */ + TEST_BRKVER(0, 10, 2, 0)), _TEST(0053_stats_timing, TEST_F_LOCAL), _TEST(0053_stats, 0), _TEST(0054_offset_time, 0, TEST_BRKVER(0, 10, 1, 0)), @@ -523,7 +526,7 @@ struct test tests[] = { _TEST(0136_resolve_cb, TEST_F_LOCAL), _TEST(0137_barrier_batch_consume, 0), _TEST(0138_admin_mock, TEST_F_LOCAL, TEST_BRKVER(2, 4, 0, 0)), - _TEST(0139_offset_validation_mock, 0), + _TEST(0139_offset_validation_mock, TEST_F_LOCAL), _TEST(0140_commit_metadata, 0), _TEST(0142_reauthentication, 0, TEST_BRKVER(2, 2, 0, 0)), _TEST(0143_exponential_backoff_mock, TEST_F_LOCAL), @@ -532,7 +535,7 @@ struct test tests[] = { _TEST(0146_metadata_mock, TEST_F_LOCAL), _TEST(0147_consumer_group_consumer_mock, TEST_F_LOCAL), _TEST(0149_broker_same_host_port_mock, TEST_F_LOCAL), - _TEST(0150_telemetry_mock, 0), + _TEST(0150_telemetry_mock, TEST_F_LOCAL), _TEST(0151_purge_brokers_mock, TEST_F_LOCAL), _TEST(0152_rebootstrap_local, TEST_F_LOCAL), _TEST(0153_memberid, 0, TEST_BRKVER(0, 4, 0, 0)), @@ -5627,15 +5630,10 @@ void test_report_add(struct test *test, const char *fmt, ...) { * * If \p skip is set TEST_SKIP() will be called with a helpful message. */ -int test_can_create_topics(int skip) { +int test_can_kafka_cmd(int skip) { #ifndef _WIN32 const char *bootstrap; #endif - - /* Has AdminAPI */ - if (test_broker_version >= TEST_BRKVER(0, 10, 2, 0)) - return 1; - #ifdef _WIN32 if (skip) TEST_SKIP("Cannot create topics on Win32\n"); @@ -5649,17 +5647,31 @@ int test_can_create_topics(int skip) { if (!test_getenv("KAFKA_PATH", NULL) || !test_getenv(bootstrap, NULL)) { if (skip) TEST_SKIP( - "Cannot create topics " + "Cannot execute command line tools " "(set KAFKA_PATH and %s)\n", bootstrap); return 0; } - return 1; #endif } +/** + * If AdminAPI supports topic creation, returns 1. + * If not, calls test_can_kafka_cmd() to check if we can use + * the command line to create topics. + * + * If \p skip is set TEST_SKIP() will be called with a helpful message. + */ +int test_can_create_topics(int skip) { + /* Has AdminAPI */ + if (test_broker_version >= TEST_BRKVER(0, 10, 2, 0)) + return 1; + + return test_can_kafka_cmd(skip); +} + /** * Wait for \p event_type, discarding all other events prior to it. @@ -7006,7 +7018,7 @@ test_IncrementalAlterConfigs_simple(rd_kafka_t *rk, * @remark Fails the current test on failure. */ -rd_kafka_resp_err_t test_CreateAcls_simple(rd_kafka_t *rk, +rd_kafka_resp_err_t test_CreateAcls_simple(rd_kafka_t *use_rk, rd_kafka_queue_t *useq, rd_kafka_AclBinding_t **acls, size_t acl_cnt, @@ -7014,8 +7026,12 @@ rd_kafka_resp_err_t test_CreateAcls_simple(rd_kafka_t *rk, rd_kafka_AdminOptions_t *options; rd_kafka_queue_t *q; rd_kafka_resp_err_t err; + rd_kafka_t *rk; const int tmout = 30 * 1000; + if (!(rk = use_rk)) + rk = test_create_producer(); + options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATEACLS); rd_kafka_AdminOptions_set_opaque(options, opaque); @@ -7038,6 +7054,8 @@ rd_kafka_resp_err_t test_CreateAcls_simple(rd_kafka_t *rk, NULL, tmout + 5000); rd_kafka_queue_destroy(q); + if (!use_rk) + rd_kafka_destroy(rk); if (err) TEST_FAIL("Failed to create %d acl(s): %s", (int)acl_cnt, diff --git a/tests/test.h b/tests/test.h index a3d36db3c9..db08988940 100644 --- a/tests/test.h +++ b/tests/test.h @@ -762,6 +762,7 @@ int test_get_partition_count(rd_kafka_t *rk, char *tsprintf(const char *fmt, ...) RD_FORMAT(printf, 1, 2); void test_report_add(struct test *test, const char *fmt, ...); +int test_can_kafka_cmd(int skip); int test_can_create_topics(int skip); rd_kafka_event_t *test_wait_event(rd_kafka_queue_t *eventq, diff --git a/tests/testshared.h b/tests/testshared.h index 07c0367f5c..351611aa70 100644 --- a/tests/testshared.h +++ b/tests/testshared.h @@ -72,6 +72,8 @@ extern int test_on_ci; const char *test_mk_topic_name(const char *suffix, int randomized); +int test_can_kafka_cmd(int skip); + void test_delete_topic(rd_kafka_t *use_rk, const char *topicname); void test_create_topic(rd_kafka_t *use_rk, From e08c8a69ff4bc1321a404fcc4e9bace5427152c8 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 16 Jul 2025 20:30:09 +0200 Subject: [PATCH 5/5] Disable 714 metrics for idleness test. --- tests/0123-connections_max_idle.c | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/0123-connections_max_idle.c b/tests/0123-connections_max_idle.c index 6c7eb8eef9..281bdfc033 100644 --- a/tests/0123-connections_max_idle.c +++ b/tests/0123-connections_max_idle.c @@ -63,6 +63,7 @@ static void do_test_idle(rd_bool_t set_idle) { test_conf_init(&conf, NULL, 10); test_conf_set(conf, "debug", "broker"); + test_conf_set(conf, "enable.metrics.push", "false"); test_conf_set(conf, "connections.max.idle.ms", set_idle ? "5000" : "0"); rd_atomic32_init(&log_cnt, 0); rd_kafka_conf_set_log_cb(conf, log_cb);