Skip to content

Commit 8ae072a

Browse files
committed
Address comments from Kate, fix system tests
Signed-off-by: Gantigmaa Selenge <[email protected]>
1 parent 80a2eae commit 8ae072a

File tree

5 files changed

+42
-25
lines changed

5 files changed

+42
-25
lines changed

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,7 @@ private void configProviders(KafkaConfiguration userConfig) {
801801
private String getConfigProviderAliases(KafkaConfiguration userConfig) {
802802
Collection<String> strimziAliases = new ArrayList<>();
803803
strimziAliases.add("strimzienv");
804-
strimziAliases.add("strimzisecrets")
804+
strimziAliases.add("strimzisecrets");
805805
if (node.broker()) {
806806
// File and Directory providers are used only on broker nodes
807807
strimziAliases.add("strimzifile");

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import io.strimzi.api.kafka.model.kafka.Kafka;
5757
import io.strimzi.api.kafka.model.kafka.KafkaAuthorization;
5858
import io.strimzi.api.kafka.model.kafka.KafkaAuthorizationKeycloak;
59+
import io.strimzi.api.kafka.model.kafka.KafkaAuthorizationOpa;
5960
import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec;
6061
import io.strimzi.api.kafka.model.kafka.KafkaClusterTemplate;
6162
import io.strimzi.api.kafka.model.kafka.KafkaResources;
@@ -64,8 +65,10 @@
6465
import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlResources;
6566
import io.strimzi.api.kafka.model.kafka.exporter.KafkaExporterResources;
6667
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener;
68+
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthentication;
6769
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationCustom;
6870
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuth;
71+
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationTls;
6972
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType;
7073
import io.strimzi.api.kafka.model.kafka.quotas.QuotasPlugin;
7174
import io.strimzi.api.kafka.model.kafka.quotas.QuotasPluginStrimzi;
@@ -100,6 +103,7 @@
100103
import java.util.ArrayList;
101104
import java.util.Collections;
102105
import java.util.HashMap;
106+
import java.util.HashSet;
103107
import java.util.LinkedHashSet;
104108
import java.util.List;
105109
import java.util.Map;
@@ -145,8 +149,6 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
145149

146150
protected static final String ENV_VAR_KAFKA_INIT_EXTERNAL_ADDRESS = "EXTERNAL_ADDRESS";
147151
private static final String ENV_VAR_KAFKA_JMX_EXPORTER_ENABLED = "KAFKA_JMX_EXPORTER_ENABLED";
148-
private static final String ENV_VAR_STRIMZI_OPA_AUTHZ_TRUSTED_CERTS = "STRIMZI_OPA_AUTHZ_TRUSTED_CERTS";
149-
private static final String ENV_VAR_STRIMZI_KEYCLOAK_AUTHZ_TRUSTED_CERTS = "STRIMZI_KEYCLOAK_AUTHZ_TRUSTED_CERTS";
150152
private static final String ENV_VAR_KAFKA_CLUSTER_NAME = "KAFKA_CLUSTER_NAME";
151153

152154
// For port names in services, a 'tcp-' prefix is added to support Istio protocol selection
@@ -1479,7 +1481,7 @@ private List<VolumeMount> getVolumeMounts(Storage storage, ContainerTemplate con
14791481
volumeMountList.add(VolumeUtils.createVolumeMount("custom-" + identifier + "-certs", "/opt/kafka/certificates/custom-" + identifier + "-certs"));
14801482
}
14811483

1482-
if (ListenersUtils.isListenerWithOAuth(listener)) {
1484+
if (ListenersUtils.isListenerWithOAuth(listener) && listener.getAuth() instanceof KafkaListenerAuthenticationOAuth oauth && oauth.getTlsTrustedCertificates() != null) {
14831485
String oauthTrustedCertsSecret = KafkaResources.internalOauthTrustedCertsSecretName(cluster);
14841486
volumeMountList.add(VolumeUtils.createVolumeMount(oauthTrustedCertsSecret, TRUSTED_CERTS_BASE_VOLUME_MOUNT + "/" + oauthTrustedCertsSecret));
14851487
}
@@ -1678,23 +1680,40 @@ public ClusterRoleBinding generateClusterRoleBinding(String assemblyNamespace) {
16781680
**
16791681
* @return role for the Kafka Cluster
16801682
*/
1683+
@SuppressWarnings("deprecation") // OPA Authorization is deprecated
16811684
public Role generateRole() {
1682-
List<String> certSecretNames = new ArrayList<>();
1685+
Set<String> certSecretNames = new HashSet<>();
16831686
certSecretNames.add(KafkaResources.clusterCaCertificateSecretName(cluster));
1684-
certSecretNames.add(KafkaResources.clientsCaCertificateSecretName(cluster));
1685-
certSecretNames.add(KafkaResources.internalAuthzTrustedCertsSecretName(cluster));
1686-
certSecretNames.add(KafkaResources.internalOauthTrustedCertsSecretName(cluster));
16871687
certSecretNames.addAll(nodes().stream().map(NodeRef::podName).toList());
16881688

1689+
for (GenericKafkaListener listener : listeners) {
1690+
if (listener.isTls()) {
1691+
if (listener.getConfiguration() != null) {
1692+
certSecretNames.add(listener.getConfiguration().getBrokerCertChainAndKey().getSecretName());
1693+
}
1694+
}
1695+
1696+
KafkaListenerAuthentication auth = listener.getAuth();
1697+
if (auth instanceof KafkaListenerAuthenticationOAuth) {
1698+
certSecretNames.add(KafkaResources.internalOauthTrustedCertsSecretName(cluster));
1699+
} else if (auth instanceof KafkaListenerAuthenticationTls) {
1700+
certSecretNames.add(KafkaResources.clientsCaCertificateSecretName(cluster));
1701+
}
1702+
}
1703+
1704+
if ((authorization instanceof KafkaAuthorizationOpa opa && opa.getTlsTrustedCertificates() != null && !opa.getTlsTrustedCertificates().isEmpty())
1705+
|| (authorization instanceof KafkaAuthorizationKeycloak kc && kc.getTlsTrustedCertificates() != null && !kc.getTlsTrustedCertificates().isEmpty())) {
1706+
certSecretNames.add(KafkaResources.internalAuthzTrustedCertsSecretName(cluster));
1707+
}
1708+
16891709
List<PolicyRule> rules = List.of(new PolicyRuleBuilder()
16901710
.withApiGroups("")
16911711
.withResources("secrets")
16921712
.withVerbs("get")
1693-
.withResourceNames(certSecretNames)
1713+
.withResourceNames(certSecretNames.stream().toList())
16941714
.build());
16951715

1696-
Role role = RbacUtils.createRole(componentName, namespace, rules, labels, ownerReference, null);
1697-
return role;
1716+
return RbacUtils.createRole(componentName, namespace, rules, labels, ownerReference, null);
16981717
}
16991718

17001719
/**
@@ -1715,10 +1734,8 @@ public RoleBinding generateRoleBindingForRole() {
17151734
.withKind("Role")
17161735
.build();
17171736

1718-
RoleBinding rb = RbacUtils
1737+
return RbacUtils
17191738
.createRoleBinding(KafkaResources.kafkaRoleBindingName(cluster), namespace, roleRef, List.of(subject), labels, ownerReference, null);
1720-
1721-
return rb;
17221739
}
17231740

17241741
/**

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ protected Future<Void> tlsTrustedCertsSecret(Reconciliation reconciliation, Kafk
284284
return Future.succeededFuture();
285285
}
286286

287-
return ReconcilerUtils.generateTlsTrustedCertsSecret(reconciliation, secretsToCopy, KafkaConnectResources.internalTlsTrustedCertsSecretName(connect.getCluster()), secretOperations, connect::generateSecret).mapEmpty();
287+
return ReconcilerUtils.reconcileTlsTrustedCertsSecret(reconciliation, secretsToCopy, KafkaConnectResources.internalTlsTrustedCertsSecretName(connect.getCluster()), secretOperations, connect::generateSecret).mapEmpty();
288288
}
289289

290290
/**
@@ -306,7 +306,7 @@ protected Future<Void> oauthTrustedCertsSecret(Reconciliation reconciliation, Ka
306306
return Future.succeededFuture();
307307
}
308308

309-
return ReconcilerUtils.generateOauthTrustedCertsSecret(reconciliation, secretsToCopy, KafkaConnectResources.internalOauthTrustedCertsSecretName(connect.getCluster()), secretOperations, connect::generateSecret);
309+
return ReconcilerUtils.reconcileOauthTrustedCertsSecret(reconciliation, secretsToCopy, KafkaConnectResources.internalOauthTrustedCertsSecretName(connect.getCluster()), secretOperations, connect::generateSecret);
310310
}
311311

312312
/**

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -885,7 +885,7 @@ protected Future<Void> authzTrustedCertsSecret() {
885885
return Future.succeededFuture();
886886
}
887887

888-
return ReconcilerUtils.generateTlsTrustedCertsSecret(reconciliation, secretsToCopy, KafkaResources.internalAuthzTrustedCertsSecretName(kafka.getCluster()), secretOperator, kafka::generateSecret)
888+
return ReconcilerUtils.reconcileTlsTrustedCertsSecret(reconciliation, secretsToCopy, KafkaResources.internalAuthzTrustedCertsSecretName(kafka.getCluster()), secretOperator, kafka::generateSecret)
889889
.compose(certHashes -> {
890890
authorizerServerCertificateHash = certHashes;
891891
return Future.succeededFuture();
@@ -913,7 +913,7 @@ protected Future<Void> oauthTrustedCertsSecret() {
913913
return Future.succeededFuture();
914914
}
915915

916-
return ReconcilerUtils.generateOauthTrustedCertsSecret(reconciliation, secretsToCopy, KafkaResources.internalOauthTrustedCertsSecretName(kafka.getCluster()), secretOperator, kafka::generateSecret);
916+
return ReconcilerUtils.reconcileOauthTrustedCertsSecret(reconciliation, secretsToCopy, KafkaResources.internalOauthTrustedCertsSecretName(kafka.getCluster()), secretOperator, kafka::generateSecret);
917917
}
918918

919919
/**

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/ReconcilerUtils.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -443,13 +443,13 @@ public static String hashSecretContent(Secret secret) {
443443
*
444444
* @return Future which completes when the reconciliation is done with the hash generated from the certificates
445445
*/
446-
public static Future<Integer> generateTlsTrustedCertsSecret(Reconciliation reconciliation, Set<String> certSecretsToCopy,
447-
String internalTlsTrustedCertsSecretName, SecretOperator secretOperations,
448-
BiFunction<Map<String, String>, String, Secret> secretProvider) {
446+
public static Future<Integer> reconcileTlsTrustedCertsSecret(Reconciliation reconciliation, Set<String> certSecretsToCopy,
447+
String internalTlsTrustedCertsSecretName, SecretOperator secretOperations,
448+
BiFunction<Map<String, String>, String, Secret> secretProvider) {
449449
List<Integer> certHashes = new ArrayList<>();
450450
ConcurrentHashMap<String, String> secretData = new ConcurrentHashMap<>();
451451
return Future.join(certSecretsToCopy.stream()
452-
.map(secretName -> secretOperations.getAsync(reconciliation.namespace(), secretName)
452+
.map(secretName -> getSecret(secretOperations, reconciliation.namespace(), secretName)
453453
.compose(secret -> {
454454
if (secret == null) {
455455
return Future.failedFuture("Secret " + secretName + " not found");
@@ -486,9 +486,9 @@ public static Future<Integer> generateTlsTrustedCertsSecret(Reconciliation recon
486486
*
487487
* @return Future which completes when the reconciliation is done
488488
*/
489-
public static Future<Void> generateOauthTrustedCertsSecret(Reconciliation reconciliation, Set<String> oauthCertSecretsToCopy,
490-
String internalOauthTrustedCertsSecretName, SecretOperator secretOperations,
491-
BiFunction<Map<String, String>, String, Secret> secretProvider) {
489+
public static Future<Void> reconcileOauthTrustedCertsSecret(Reconciliation reconciliation, Set<String> oauthCertSecretsToCopy,
490+
String internalOauthTrustedCertsSecretName, SecretOperator secretOperations,
491+
BiFunction<Map<String, String>, String, Secret> secretProvider) {
492492
List<String> certs = new ArrayList<>();
493493
return Future.join(oauthCertSecretsToCopy.stream()
494494
.map(secretName -> getSecret(secretOperations, reconciliation.namespace(), secretName)

0 commit comments

Comments
 (0)