Skip to content

Commit 91eaf20

Browse files
committed
Use PEM certificates loaded from secrets for Kafka
Signed-off-by: Gantigmaa Selenge <[email protected]>
1 parent 7b3635f commit 91eaf20

File tree

15 files changed

+1162
-1028
lines changed

15 files changed

+1162
-1028
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* Add support for Kafka 3.9.1
77
* Fixed MirrorMaker 2 client rack init container override being ignored.
88
* Support for Kubernetes Image Volumes to mount custom plugins
9+
* Kafka nodes are now configured with PEM certificates instead of P12/JKS for keystore and truststore.
910

1011
### Major changes, deprecations and removals
1112

api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaResources.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,46 @@ public static String clientsCaKeySecretName(String clusterName) {
4949
return clusterName + "-clients-ca";
5050
}
5151

52+
/**
53+
* Get the name of the Kafka role binding given the name of the {@code cluster}.
54+
*
55+
* @param clusterName The cluster name.
56+
*
57+
* @return The name of Kafka role binding.
58+
*/
59+
public static String kafkaRoleBindingName(String clusterName) {
60+
return kafkaComponentName(clusterName) + "-role";
61+
}
62+
63+
/**
64+
* Get the name of the internal secret that contains TLS trusted certificates
65+
* for connecting to Authorization server.
66+
* The operator copies user specified secrets for Authorization server
67+
* trusted certificates into a single secret with this name.
68+
* It is then used when configuring Authorization server for Kafka cluster.
69+
*
70+
* @param clusterName The cluster name.
71+
*
72+
* @return Name of the internal secret that contains trusted certificates for Authz.
73+
*/
74+
public static String internalAuthzTrustedCertsSecretName(String clusterName) {
75+
return kafkaComponentName(clusterName) + "-authz-trusted-certs";
76+
}
77+
78+
/**
79+
* Get the name of the internal secret that contains TLS trusted certificates for OAuth server.
80+
* The operator copies user specified secrets for OAuth trusted certificates into
81+
* a single secret with this name.
82+
* It is then used when configuring Kafka listeners with OAuth server.
83+
*
84+
* @param clusterName The cluster name.
85+
*
86+
* @return Name of the internal secret that contains OAuth trusted certificates.
87+
*/
88+
public static String internalOauthTrustedCertsSecretName(String clusterName) {
89+
return kafkaComponentName(clusterName) + "-oauth-trusted-certs";
90+
}
91+
5292
////////
5393
// Kafka methods
5494
////////

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java

Lines changed: 56 additions & 58 deletions
Large diffs are not rendered by default.

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java

Lines changed: 80 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@
3232
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyPeer;
3333
import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
3434
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
35+
import io.fabric8.kubernetes.api.model.rbac.PolicyRule;
36+
import io.fabric8.kubernetes.api.model.rbac.PolicyRuleBuilder;
37+
import io.fabric8.kubernetes.api.model.rbac.Role;
38+
import io.fabric8.kubernetes.api.model.rbac.RoleBinding;
3539
import io.fabric8.kubernetes.api.model.rbac.RoleRef;
3640
import io.fabric8.kubernetes.api.model.rbac.RoleRefBuilder;
3741
import io.fabric8.kubernetes.api.model.rbac.Subject;
@@ -52,7 +56,6 @@
5256
import io.strimzi.api.kafka.model.kafka.Kafka;
5357
import io.strimzi.api.kafka.model.kafka.KafkaAuthorization;
5458
import io.strimzi.api.kafka.model.kafka.KafkaAuthorizationKeycloak;
55-
import io.strimzi.api.kafka.model.kafka.KafkaAuthorizationOpa;
5659
import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec;
5760
import io.strimzi.api.kafka.model.kafka.KafkaClusterTemplate;
5861
import io.strimzi.api.kafka.model.kafka.KafkaResources;
@@ -144,6 +147,7 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
144147
private static final String ENV_VAR_KAFKA_JMX_EXPORTER_ENABLED = "KAFKA_JMX_EXPORTER_ENABLED";
145148
private static final String ENV_VAR_STRIMZI_OPA_AUTHZ_TRUSTED_CERTS = "STRIMZI_OPA_AUTHZ_TRUSTED_CERTS";
146149
private static final String ENV_VAR_STRIMZI_KEYCLOAK_AUTHZ_TRUSTED_CERTS = "STRIMZI_KEYCLOAK_AUTHZ_TRUSTED_CERTS";
150+
private static final String ENV_VAR_KAFKA_CLUSTER_NAME = "KAFKA_CLUSTER_NAME";
147151

148152
// For port names in services, a 'tcp-' prefix is added to support Istio protocol selection
149153
// This helps Istio to avoid using a wildcard listener and instead present IP:PORT pairs which effects
@@ -169,12 +173,6 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
169173
public static final int INGRESS_PORT = 443;
170174

171175
protected static final String KAFKA_NAME = "kafka";
172-
protected static final String CLUSTER_CA_CERTS_VOLUME = "cluster-ca";
173-
protected static final String BROKER_CERTS_VOLUME = "broker-certs";
174-
protected static final String CLIENT_CA_CERTS_VOLUME = "client-ca-cert";
175-
protected static final String CLUSTER_CA_CERTS_VOLUME_MOUNT = "/opt/kafka/cluster-ca-certs";
176-
protected static final String BROKER_CERTS_VOLUME_MOUNT = "/opt/kafka/broker-certs";
177-
protected static final String CLIENT_CA_CERTS_VOLUME_MOUNT = "/opt/kafka/client-ca-certs";
178176
protected static final String TRUSTED_CERTS_BASE_VOLUME_MOUNT = "/opt/kafka/certificates";
179177
protected static final String CUSTOM_AUTHN_SECRETS_VOLUME_MOUNT = "/opt/kafka/custom-authn-secrets";
180178
private static final String LOG_AND_METRICS_CONFIG_VOLUME_NAME = "kafka-metrics-and-logging";
@@ -1377,9 +1375,6 @@ private List<Volume> getNonDataVolumes(boolean isOpenShift, NodeRef node, PodTem
13771375
List<Volume> volumeList = new ArrayList<>();
13781376

13791377
volumeList.add(VolumeUtils.createTempDirVolume(templatePod));
1380-
volumeList.add(VolumeUtils.createSecretVolume(CLUSTER_CA_CERTS_VOLUME, AbstractModel.clusterCaCertSecretName(cluster), isOpenShift));
1381-
volumeList.add(VolumeUtils.createSecretVolume(BROKER_CERTS_VOLUME, node.podName(), isOpenShift));
1382-
volumeList.add(VolumeUtils.createSecretVolume(CLIENT_CA_CERTS_VOLUME, KafkaResources.clientsCaCertificateSecretName(cluster), isOpenShift));
13831378
volumeList.add(VolumeUtils.createConfigMapVolume(LOG_AND_METRICS_CONFIG_VOLUME_NAME, node.podName()));
13841379
volumeList.add(VolumeUtils.createEmptyDirVolume("ready-files", "1Ki", "Memory"));
13851380

@@ -1413,7 +1408,10 @@ private List<Volume> getNonDataVolumes(boolean isOpenShift, NodeRef node, PodTem
14131408

14141409
if (ListenersUtils.isListenerWithOAuth(listener)) {
14151410
KafkaListenerAuthenticationOAuth oauth = (KafkaListenerAuthenticationOAuth) listener.getAuth();
1416-
CertUtils.createTrustedCertificatesVolumes(volumeList, oauth.getTlsTrustedCertificates(), isOpenShift, "oauth-" + ListenersUtils.identifier(listener));
1411+
String oauthTrustedCertsSecret = KafkaResources.internalOauthTrustedCertsSecretName(cluster);
1412+
if (oauth.getTlsTrustedCertificates() != null && !oauth.getTlsTrustedCertificates().isEmpty() && volumeList.stream().noneMatch(v -> v.getName().equals(oauthTrustedCertsSecret))) {
1413+
volumeList.add(VolumeUtils.createSecretVolume(oauthTrustedCertsSecret, oauthTrustedCertsSecret, isOpenShift));
1414+
}
14171415
}
14181416

14191417
if (ListenersUtils.isListenerWithCustomAuth(listener)) {
@@ -1423,14 +1421,6 @@ private List<Volume> getNonDataVolumes(boolean isOpenShift, NodeRef node, PodTem
14231421
}
14241422
}
14251423

1426-
if (authorization instanceof KafkaAuthorizationOpa opaAuthz) {
1427-
CertUtils.createTrustedCertificatesVolumes(volumeList, opaAuthz.getTlsTrustedCertificates(), isOpenShift, "authz-opa");
1428-
}
1429-
1430-
if (authorization instanceof KafkaAuthorizationKeycloak keycloakAuthz) {
1431-
CertUtils.createTrustedCertificatesVolumes(volumeList, keycloakAuthz.getTlsTrustedCertificates(), isOpenShift, "authz-keycloak");
1432-
}
1433-
14341424
TemplateUtils.addAdditionalVolumes(templatePod, volumeList);
14351425

14361426
return volumeList;
@@ -1469,9 +1459,6 @@ private List<Volume> getPodSetVolumes(NodeRef node, Storage storage, PodTemplate
14691459
private List<VolumeMount> getVolumeMounts(Storage storage, ContainerTemplate containerTemplate, boolean isBroker) {
14701460
List<VolumeMount> volumeMountList = new ArrayList<>(VolumeUtils.createVolumeMounts(storage, false));
14711461
volumeMountList.add(VolumeUtils.createTempDirVolumeMount());
1472-
volumeMountList.add(VolumeUtils.createVolumeMount(CLUSTER_CA_CERTS_VOLUME, CLUSTER_CA_CERTS_VOLUME_MOUNT));
1473-
volumeMountList.add(VolumeUtils.createVolumeMount(BROKER_CERTS_VOLUME, BROKER_CERTS_VOLUME_MOUNT));
1474-
volumeMountList.add(VolumeUtils.createVolumeMount(CLIENT_CA_CERTS_VOLUME, CLIENT_CA_CERTS_VOLUME_MOUNT));
14751462
volumeMountList.add(VolumeUtils.createVolumeMount(LOG_AND_METRICS_CONFIG_VOLUME_NAME, LOG_AND_METRICS_CONFIG_VOLUME_MOUNT));
14761463
volumeMountList.add(VolumeUtils.createVolumeMount("ready-files", "/var/opt/kafka"));
14771464

@@ -1493,8 +1480,8 @@ private List<VolumeMount> getVolumeMounts(Storage storage, ContainerTemplate con
14931480
}
14941481

14951482
if (ListenersUtils.isListenerWithOAuth(listener)) {
1496-
KafkaListenerAuthenticationOAuth oauth = (KafkaListenerAuthenticationOAuth) listener.getAuth();
1497-
CertUtils.createTrustedCertificatesVolumeMounts(volumeMountList, oauth.getTlsTrustedCertificates(), TRUSTED_CERTS_BASE_VOLUME_MOUNT + "/oauth-" + identifier + "-certs/", "oauth-" + identifier);
1483+
String oauthTrustedCertsSecret = KafkaResources.internalOauthTrustedCertsSecretName(cluster);
1484+
volumeMountList.add(VolumeUtils.createVolumeMount(oauthTrustedCertsSecret, TRUSTED_CERTS_BASE_VOLUME_MOUNT + "/" + oauthTrustedCertsSecret));
14981485
}
14991486

15001487
if (ListenersUtils.isListenerWithCustomAuth(listener)) {
@@ -1503,15 +1490,7 @@ private List<VolumeMount> getVolumeMounts(Storage storage, ContainerTemplate con
15031490
}
15041491
}
15051492
}
1506-
1507-
if (authorization instanceof KafkaAuthorizationOpa opaAuthz) {
1508-
CertUtils.createTrustedCertificatesVolumeMounts(volumeMountList, opaAuthz.getTlsTrustedCertificates(), TRUSTED_CERTS_BASE_VOLUME_MOUNT + "/authz-opa-certs/", "authz-opa");
1509-
}
1510-
1511-
if (authorization instanceof KafkaAuthorizationKeycloak keycloakAuthz) {
1512-
CertUtils.createTrustedCertificatesVolumeMounts(volumeMountList, keycloakAuthz.getTlsTrustedCertificates(), TRUSTED_CERTS_BASE_VOLUME_MOUNT + "/authz-keycloak-certs/", "authz-keycloak");
1513-
}
1514-
1493+
15151494
TemplateUtils.addAdditionalVolumeMounts(volumeMountList, containerTemplate);
15161495

15171496
return volumeMountList;
@@ -1635,6 +1614,7 @@ private List<EnvVar> getEnvVars(KafkaPool pool) {
16351614
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_JMX_EXPORTER_ENABLED,
16361615
String.valueOf(metrics instanceof JmxPrometheusExporterModel)));
16371616
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KAFKA_GC_LOG_ENABLED, String.valueOf(pool.gcLoggingEnabled)));
1617+
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_CLUSTER_NAME, cluster));
16381618

16391619
JvmOptionUtils.heapOptions(varList, 50, 5L * 1024L * 1024L * 1024L, pool.jvmOptions, pool.resources);
16401620
JvmOptionUtils.jvmPerformanceOptions(varList, pool.jvmOptions);
@@ -1646,29 +1626,13 @@ private List<EnvVar> getEnvVars(KafkaPool pool) {
16461626
if (ListenersUtils.isListenerWithOAuth(listener)) {
16471627
KafkaListenerAuthenticationOAuth oauth = (KafkaListenerAuthenticationOAuth) listener.getAuth();
16481628

1649-
if (oauth.getTlsTrustedCertificates() != null && !oauth.getTlsTrustedCertificates().isEmpty()) {
1650-
varList.add(ContainerUtils.createEnvVar("STRIMZI_" + ListenersUtils.envVarIdentifier(listener) + "_OAUTH_TRUSTED_CERTS", CertUtils.trustedCertsEnvVar(oauth.getTlsTrustedCertificates())));
1651-
}
1652-
16531629
if (oauth.getClientSecret() != null) {
16541630
varList.add(ContainerUtils.createEnvVarFromSecret("STRIMZI_" + ListenersUtils.envVarIdentifier(listener) + "_OAUTH_CLIENT_SECRET", oauth.getClientSecret().getSecretName(), oauth.getClientSecret().getKey()));
16551631
}
16561632
}
16571633
}
16581634
}
16591635

1660-
if (authorization instanceof KafkaAuthorizationOpa opaAuthz
1661-
&& opaAuthz.getTlsTrustedCertificates() != null
1662-
&& !opaAuthz.getTlsTrustedCertificates().isEmpty()) {
1663-
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_OPA_AUTHZ_TRUSTED_CERTS, CertUtils.trustedCertsEnvVar(opaAuthz.getTlsTrustedCertificates())));
1664-
}
1665-
1666-
if (authorization instanceof KafkaAuthorizationKeycloak keycloakAuthz
1667-
&& keycloakAuthz.getTlsTrustedCertificates() != null
1668-
&& !keycloakAuthz.getTlsTrustedCertificates().isEmpty()) {
1669-
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KEYCLOAK_AUTHZ_TRUSTED_CERTS, CertUtils.trustedCertsEnvVar(keycloakAuthz.getTlsTrustedCertificates())));
1670-
}
1671-
16721636
varList.addAll(jmx.envVars());
16731637

16741638
// Add shared environment variables used for all containers
@@ -1708,6 +1672,54 @@ public ClusterRoleBinding generateClusterRoleBinding(String assemblyNamespace) {
17081672
}
17091673
}
17101674

1675+
/**
1676+
* Creates a Role for reading TLS certificate secrets in the same namespace as the resource.
1677+
* This is used for loading certificates from secrets directly.
1678+
**
1679+
* @return role for the Kafka Cluster
1680+
*/
1681+
public Role generateRole() {
1682+
List<String> certSecretNames = new ArrayList<>();
1683+
certSecretNames.add(KafkaResources.clusterCaCertificateSecretName(cluster));
1684+
certSecretNames.add(KafkaResources.internalAuthzTrustedCertsSecretName(cluster));
1685+
certSecretNames.add(KafkaResources.internalOauthTrustedCertsSecretName(cluster));
1686+
certSecretNames.addAll(nodes().stream().map(NodeRef::podName).toList());
1687+
1688+
List<PolicyRule> rules = List.of(new PolicyRuleBuilder()
1689+
.withApiGroups("")
1690+
.withResources("secrets")
1691+
.withVerbs("get")
1692+
.withResourceNames(certSecretNames)
1693+
.build());
1694+
1695+
Role role = RbacUtils.createRole(componentName, namespace, rules, labels, ownerReference, null);
1696+
return role;
1697+
}
1698+
1699+
/**
1700+
* Generates the Kafka Cluster Role Binding
1701+
*
1702+
* @return Role Binding for the Kafka Cluster
1703+
*/
1704+
public RoleBinding generateRoleBindingForRole() {
1705+
Subject subject = new SubjectBuilder()
1706+
.withKind("ServiceAccount")
1707+
.withName(componentName)
1708+
.withNamespace(namespace)
1709+
.build();
1710+
1711+
RoleRef roleRef = new RoleRefBuilder()
1712+
.withName(componentName)
1713+
.withApiGroup("rbac.authorization.k8s.io")
1714+
.withKind("Role")
1715+
.build();
1716+
1717+
RoleBinding rb = RbacUtils
1718+
.createRoleBinding(KafkaResources.kafkaRoleBindingName(cluster), namespace, roleRef, List.of(subject), labels, ownerReference, null);
1719+
1720+
return rb;
1721+
}
1722+
17111723
/**
17121724
* Generates the NetworkPolicies relevant for Kafka brokers
17131725
*
@@ -1778,6 +1790,13 @@ public List<GenericKafkaListener> getListeners() {
17781790
return listeners;
17791791
}
17801792

1793+
/**
1794+
* @return The authorization
1795+
*/
1796+
public KafkaAuthorization getAuthorization() {
1797+
return authorization;
1798+
}
1799+
17811800
/**
17821801
* Returns true when the Kafka cluster is exposed to the outside using NodePort type services
17831802
*
@@ -1837,7 +1856,6 @@ private String generatePerBrokerConfiguration(NodeRef node, KafkaPool pool, Map<
18371856
.withKRaftMetadataLogDir(VolumeUtils.kraftMetadataPath(pool.storage))
18381857
.withLogDirs(VolumeUtils.createVolumeMounts(pool.storage, false))
18391858
.withListeners(cluster,
1840-
kafkaVersion,
18411859
namespace,
18421860
listeners,
18431861
listenerId -> advertisedHostnames.get(node.nodeId()).get(listenerId),
@@ -1902,6 +1920,18 @@ public List<ConfigMap> generatePerBrokerConfigurationConfigMaps(MetricsAndLoggin
19021920
return configMaps;
19031921
}
19041922

1923+
/**
1924+
* Generates a Secret with the given name and data in Kafka Cluster's namespace
1925+
*
1926+
* @param secretData Secret data
1927+
* @param secretName Secret name
1928+
*
1929+
* @return Secret that is generated
1930+
*/
1931+
public Secret generateSecret(Map<String, String> secretData, String secretName) {
1932+
return ModelUtils.createSecret(secretName, namespace, labels, ownerReference, secretData, Map.of(), Map.of());
1933+
}
1934+
19051935
/**
19061936
* @return Kafka version
19071937
*/

0 commit comments

Comments
 (0)