Skip to content

Commit 7d356de

Browse files
committed
Changes to name the configuration property "query" instead of "params" as in other implementations
and to make it optional if the default endpoint is overridden.
1 parent 14afbfb commit 7d356de

File tree

3 files changed

+83
-46
lines changed

3 files changed

+83
-46
lines changed

src/rdkafka_conf.c

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3940,6 +3940,38 @@ char **rd_kafka_conf_kv_split(const char **input, size_t incnt, size_t *cntp) {
39403940
return out;
39413941
}
39423942

3943+
/**
3944+
* @brief Get value for the config param corresponding to \p key in
3945+
* \p config, using \p pairs_sep for splitting it
3946+
* into key-value pairs and '=' for splitting keys and values.
3947+
*/
3948+
char *rd_kafka_conf_kv_get(const char *config,
3949+
const char *key,
3950+
const char pairs_sep) {
3951+
size_t i, config_pair_cnt, config_key_value_cnt;
3952+
char *ret = NULL;
3953+
char **config_key_values;
3954+
if (!config)
3955+
return NULL;
3956+
3957+
char **config_pairs =
3958+
rd_string_split(config, pairs_sep, rd_true, &config_pair_cnt);
3959+
3960+
config_key_values =
3961+
rd_kafka_conf_kv_split((const char **)config_pairs, config_pair_cnt,
3962+
&config_key_value_cnt);
3963+
for (i = 0; i < config_key_value_cnt / 2; i += 2) {
3964+
char *config_key = config_key_values[i];
3965+
if (!rd_strcmp(config_key, key)) {
3966+
ret = rd_strdup(config_key_values[i + 1]);
3967+
break;
3968+
}
3969+
}
3970+
rd_free(config_key_values);
3971+
rd_free(config_pairs);
3972+
return ret;
3973+
}
3974+
39433975
const char *
39443976
rd_kafka_conf_finalize_oauthbearer_oidc_grant_type(rd_kafka_conf_t *conf) {
39453977
switch (conf->sasl.oauthbearer.grant_type) {
@@ -4066,17 +4098,24 @@ const char *rd_kafka_conf_finalize_oauthbearer_oidc(rd_kafka_conf_t *conf) {
40664098
"`sasl.oauthbearer.method=oidc` are "
40674099
"mutually exclusive";
40684100

4069-
if (conf->sasl.oauthbearer.metadata_authentication.type ==
4070-
RD_KAFKA_SASL_OAUTHBEARER_METADATA_AUTHENTICATION_TYPE_AZURE_IMDS &&
4071-
!conf->sasl.oauthbearer.token_endpoint_url) {
4072-
conf->sasl.oauthbearer.token_endpoint_url =
4073-
RD_KAFKA_SASL_OAUTHBEARER_METADATA_AUTHENTICATION_URL_AZURE_IMDS;
4074-
}
4075-
40764101
if (!conf->sasl.oauthbearer.token_endpoint_url) {
4077-
return "`sasl.oauthbearer.token.endpoint.url` "
4078-
"is mandatory when "
4079-
"`sasl.oauthbearer.method=oidc` is set";
4102+
const char *errstr =
4103+
"`sasl.oauthbearer.token.endpoint.url` "
4104+
"is mandatory when "
4105+
"`sasl.oauthbearer.method=oidc` is set";
4106+
if (conf->sasl.oauthbearer.metadata_authentication.type ==
4107+
RD_KAFKA_SASL_OAUTHBEARER_METADATA_AUTHENTICATION_TYPE_AZURE_IMDS) {
4108+
char *query = rd_kafka_conf_kv_get(
4109+
conf->sasl.oauthbearer_config, "query", ',');
4110+
if (!query)
4111+
return "`sasl.oauthbearer.token.endpoint.url` "
4112+
"is mandatory for Azure IMDS "
4113+
"authentication "
4114+
"when `query` isn't set";
4115+
rd_free(query);
4116+
} else {
4117+
return errstr;
4118+
}
40804119
}
40814120

40824121
if (conf->sasl.oauthbearer.metadata_authentication.type ==

src/rdkafka_conf.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,6 @@ struct rd_kafka_conf_s {
372372
struct {
373373
rd_kafka_oauthbearer_metadata_authentication_type_t
374374
type;
375-
const char *query;
376375
} metadata_authentication;
377376

378377

@@ -699,6 +698,9 @@ struct rd_kafka_topic_conf_s {
699698

700699
char **rd_kafka_conf_kv_split(const char **input, size_t incnt, size_t *cntp);
701700

701+
char *
702+
rd_kafka_conf_kv_get(const char *config, const char *key, const char pairs_sep);
703+
702704
void rd_kafka_anyconf_destroy(int scope, void *conf);
703705

704706
rd_bool_t rd_kafka_conf_is_modified(const rd_kafka_conf_t *conf,

src/rdkafka_sasl_oauthbearer_oidc.c

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,8 +1024,9 @@ void rd_kafka_oidc_token_metadata_azure_imds_refresh_cb(
10241024

10251025
struct curl_slist *headers = NULL;
10261026

1027-
char *token_endpoint_url = NULL;
1028-
char *sub = NULL;
1027+
const char *token_endpoint_url_initial = NULL;
1028+
char *token_endpoint_url = NULL;
1029+
char *sub = NULL;
10291030

10301031
size_t extension_cnt;
10311032
size_t extension_key_value_cnt = 0;
@@ -1034,45 +1035,39 @@ void rd_kafka_oidc_token_metadata_azure_imds_refresh_cb(
10341035

10351036
char **extensions = NULL;
10361037
char **extension_key_value = NULL;
1038+
char *query = NULL;
10371039
static char *headers_array[] = {"Metadata: true"};
10381040

10391041
if (rd_kafka_terminating(rk))
10401042
return;
10411043

1042-
if (rk->rk_conf.sasl.oauthbearer_config &&
1043-
!rk->rk_conf.sasl.oauthbearer.metadata_authentication.query) {
1044-
size_t i, oauthbearer_config_cnt;
1045-
char **config_pairs =
1046-
rd_string_split(rk->rk_conf.sasl.oauthbearer_config, ',',
1047-
rd_true, &oauthbearer_config_cnt);
1048-
for (i = 0; i < oauthbearer_config_cnt; i++) {
1049-
char *config_pair = config_pairs[i];
1050-
char *query_pos = strstr(config_pair, "query=");
1051-
if (query_pos == config_pair) {
1052-
rk->rk_conf.sasl.oauthbearer
1053-
.metadata_authentication.query =
1054-
query_pos + strlen("query=");
1055-
break;
1056-
}
1044+
if (rk->rk_conf.sasl.oauthbearer_config)
1045+
query = rd_kafka_conf_kv_get(
1046+
rk->rk_conf.sasl.oauthbearer_config, "query", ',');
1047+
token_endpoint_url_initial =
1048+
rk->rk_conf.sasl.oauthbearer.token_endpoint_url;
1049+
if (!token_endpoint_url_initial)
1050+
token_endpoint_url_initial =
1051+
RD_KAFKA_SASL_OAUTHBEARER_METADATA_AUTHENTICATION_URL_AZURE_IMDS;
1052+
if (query && *query) {
1053+
token_endpoint_url = rd_http_get_params_append(
1054+
token_endpoint_url_initial, query);
1055+
1056+
if (token_endpoint_url == NULL) {
1057+
rd_snprintf(
1058+
set_token_errstr, sizeof(set_token_errstr),
1059+
"Failed to append params \"%s\" to token endpoint "
1060+
"URL \"%s\"",
1061+
query,
1062+
rk->rk_conf.sasl.oauthbearer.token_endpoint_url);
1063+
rd_kafka_log(rk, LOG_ERR, "OIDC", "%s",
1064+
set_token_errstr);
1065+
rd_kafka_oauthbearer_set_token_failure(
1066+
rk, set_token_errstr);
1067+
goto done;
10571068
}
1058-
if (!rk->rk_conf.sasl.oauthbearer.metadata_authentication.query)
1059-
rk->rk_conf.sasl.oauthbearer.metadata_authentication
1060-
.query = "";
1061-
}
1062-
1063-
token_endpoint_url = rd_http_get_params_append(
1064-
rk->rk_conf.sasl.oauthbearer.token_endpoint_url,
1065-
rk->rk_conf.sasl.oauthbearer.metadata_authentication.query);
1066-
if (token_endpoint_url == NULL) {
1067-
rd_snprintf(
1068-
set_token_errstr, sizeof(set_token_errstr),
1069-
"Failed to append params \"%s\" to token endpoint "
1070-
"URL \"%s\"",
1071-
rk->rk_conf.sasl.oauthbearer.metadata_authentication.query,
1072-
rk->rk_conf.sasl.oauthbearer.token_endpoint_url);
1073-
rd_kafka_log(rk, LOG_ERR, "OIDC", "%s", set_token_errstr);
1074-
rd_kafka_oauthbearer_set_token_failure(rk, set_token_errstr);
1075-
goto done;
1069+
} else {
1070+
token_endpoint_url = rd_strdup(token_endpoint_url_initial);
10761071
}
10771072

10781073
herr = rd_http_get_json(rk, token_endpoint_url, headers_array, 1,
@@ -1120,6 +1115,7 @@ void rd_kafka_oidc_token_metadata_azure_imds_refresh_cb(
11201115
RD_IF_FREE(extensions, rd_free);
11211116
RD_IF_FREE(extension_key_value, rd_free);
11221117
RD_IF_FREE(token_endpoint_url, rd_free);
1118+
RD_IF_FREE(query, rd_free);
11231119
}
11241120

11251121
/**

0 commit comments

Comments
 (0)