Skip to content

Commit 62b1d4d

Browse files
MichaelMorrisEstsee-quick
authored andcommitted
Ignore user config provider aliases with conflict (strimzi#11486)
Signed-off-by: MichaelMorris <[email protected]>
1 parent 98ccd5b commit 62b1d4d

File tree

2 files changed

+81
-19
lines changed

2 files changed

+81
-19
lines changed

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java

Lines changed: 64 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlMetricsReporter;
3333
import io.strimzi.operator.cluster.model.metrics.MetricsModel;
3434
import io.strimzi.operator.cluster.model.metrics.StrimziMetricsReporterModel;
35+
import io.strimzi.operator.common.InvalidConfigurationException;
3536
import io.strimzi.operator.common.Reconciliation;
3637
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters;
3738

@@ -40,6 +41,7 @@
4041
import java.time.Duration;
4142
import java.util.ArrayList;
4243
import java.util.Arrays;
44+
import java.util.Collection;
4345
import java.util.Comparator;
4446
import java.util.HashMap;
4547
import java.util.LinkedHashMap;
@@ -772,25 +774,9 @@ private void configureCustomAuthorization(KafkaAuthorizationCustom authorization
772774
*/
773775
private void configProviders(KafkaConfiguration userConfig) {
774776
printSectionHeader("Config providers");
775-
776-
String strimziConfigProviders;
777-
if (node.broker()) {
778-
// File and Directory providers are used only on broker nodes
779-
strimziConfigProviders = "strimzienv,strimzifile,strimzidir";
780-
} else {
781-
strimziConfigProviders = "strimzienv";
782-
}
783-
784-
if (userConfig != null
785-
&& !userConfig.getConfiguration().isEmpty()
786-
&& userConfig.getConfigOption("config.providers") != null) {
787-
writer.println("# Configuration providers configured by the user and by Strimzi");
788-
writer.println("config.providers=" + userConfig.getConfigOption("config.providers") + "," + strimziConfigProviders);
789-
userConfig.removeConfigOption("config.providers");
790-
} else {
791-
writer.println("# Configuration providers configured by Strimzi");
792-
writer.println("config.providers=" + strimziConfigProviders);
793-
}
777+
778+
writer.println("# Configuration providers configured by the user and by Strimzi");
779+
writer.println("config.providers=" + getConfigProviderAliases(userConfig));
794780

795781
writer.println("config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider");
796782
writer.println("config.providers.strimzienv.param.allowlist.pattern=.*");
@@ -805,6 +791,65 @@ private void configProviders(KafkaConfiguration userConfig) {
805791

806792
writer.println();
807793
}
794+
795+
/**
796+
* Get the Kafka configuration provider aliases, throwing an InvalidConfigurationException if any user provided aliases are found that would overwrite the Strimzi defined configuration providers
797+
*
798+
* @param userConfig The user configuration to extract the possible user-provided config provider configuration from
799+
* @return The Kafka configuration provider aliases
800+
*/
801+
private String getConfigProviderAliases(KafkaConfiguration userConfig) {
802+
Collection<String> strimziAliases = new ArrayList<>();
803+
strimziAliases.add("strimzienv");
804+
if (node.broker()) {
805+
// File and Directory providers are used only on broker nodes
806+
strimziAliases.add("strimzifile");
807+
strimziAliases.add("strimzidir");
808+
}
809+
810+
if (userConfig != null
811+
&& !userConfig.getConfiguration().isEmpty()
812+
&& userConfig.getConfigOption("config.providers") != null) {
813+
String userAliases = userConfig.getConfigOption("config.providers");
814+
815+
Arrays.asList(userAliases.split(",")).stream().forEach(userAlias -> {
816+
if (strimziAliases.contains(userAlias)) {
817+
throw new InvalidConfigurationException("config.provider " + userAlias + " not permitted as it reserved for Strimzi. Not permitted aliases: " + strimziAliases);
818+
}
819+
});
820+
821+
userConfig.removeConfigOption("config.providers");
822+
return userAliases + "," + String.join(",", strimziAliases);
823+
} else {
824+
return String.join(",", strimziAliases);
825+
}
826+
827+
}
828+
829+
/**
830+
* Get the user provided Kafka configuration provider aliases, throwing an InvalidConfigurationException if any are found that would overwrite the Strimzi defined configuration providers
831+
*
832+
* @param strimziConfigProviders The Strimzi defined configuration providers
833+
* @param userConfig The user configuration to extract the possible user-provided config provider configuration from
834+
* @return The user defined Kafka configuration provider aliases or empty string
835+
*/
836+
private String getUserConfigProviderAliases(Collection<String> strimziAliases, KafkaConfiguration userConfig) {
837+
String userConfigProviderAliases = "";
838+
if (userConfig != null
839+
&& !userConfig.getConfiguration().isEmpty()
840+
&& userConfig.getConfigOption("config.providers") != null) {
841+
userConfigProviderAliases = userConfig.getConfigOption("config.providers");
842+
Collection<String> userAliases = Arrays.asList(userConfigProviderAliases.split(","));
843+
844+
userAliases.stream().forEach(alias -> {
845+
if (strimziAliases.contains(alias)) {
846+
throw new InvalidConfigurationException("config.provider " + alias + " not permitted as it reserved for Strimzi. Not permitted aliases: " + strimziAliases);
847+
}
848+
});
849+
userConfig.removeConfigOption("config.providers");
850+
}
851+
return userConfigProviderAliases;
852+
}
808853

809854
/**
810855
* Adds the configurations passed by the user in the Kafka CR, injecting Strimzi configurations when needed.

cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlMetricsReporter;
3737
import io.strimzi.operator.cluster.model.metrics.MetricsModel;
3838
import io.strimzi.operator.cluster.model.metrics.StrimziMetricsReporterModel;
39+
import io.strimzi.operator.common.InvalidConfigurationException;
3940
import io.strimzi.operator.common.Reconciliation;
4041
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters;
4142
import io.strimzi.test.annotations.ParallelSuite;
@@ -600,6 +601,22 @@ public void testUserConfigurationWithConfigProviders() {
600601
"config.providers.strimzienv.param.allowlist.pattern=.*",
601602
"config.providers.env.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider"));
602603
}
604+
605+
@ParallelTest
606+
public void testUserConfigurationWithInvalidConfigProviders() {
607+
Map<String, Object> userConfiguration = new HashMap<>();
608+
userConfiguration.put("config.providers", "env,strimzienv");
609+
userConfiguration.put("config.providers.env.class", "org.apache.kafka.common.config.provider.EnvVarConfigProvider");
610+
userConfiguration.put("config.providers.strimzienv.class", "org.apache.kafka.common.config.provider.UserConfigProvider");
611+
612+
KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(Reconciliation.DUMMY_RECONCILIATION, userConfiguration.entrySet());
613+
614+
assertThrows(InvalidConfigurationException.class, () -> {
615+
new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF)
616+
.withUserConfiguration(kafkaConfiguration, false, false, false)
617+
.build();
618+
}, "InvalidConfigurationException was expected");
619+
}
603620

604621
@ParallelTest
605622
public void testNullUserConfigurationWithJmxMetricsReporter() {

0 commit comments

Comments
 (0)