Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
* Additional OAuth configuration options have been added for 'oauth' authentication on the listener and the client.
On the listener `clientGrantType` has been added.
On the client `grantType` has been added.
* Kafka nodes are now configured with PEM certificates instead of P12/JKS for keystore and truststore.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be moved to the 0.49.0 section.


### Major changes, deprecations and removals

* Fix RBAC naming for `KafkaMirrorMaker2` to avoid `RoleBinding` collisions when a `KafkaConnect` with the same name exists in the same namespace. `KafkaMirrorMaker2` now uses dedicated `RoleBinding` names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ public static String clientsCaKeySecretName(String clusterName) {
return clusterName + "-clients-ca";
}

/**
* Get the name of the Kafka role binding given the name of the {@code cluster}.
*
* @param clusterName The cluster name.
*
* @return The name of Kafka role binding.
*/
public static String kafkaRoleBindingName(String clusterName) {
return kafkaComponentName(clusterName) + "-role";
}

////////
// Kafka methods
////////
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyPeer;
import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
import io.fabric8.kubernetes.api.model.rbac.PolicyRule;
import io.fabric8.kubernetes.api.model.rbac.PolicyRuleBuilder;
import io.fabric8.kubernetes.api.model.rbac.Role;
import io.fabric8.kubernetes.api.model.rbac.RoleBinding;
import io.fabric8.kubernetes.api.model.rbac.RoleRef;
import io.fabric8.kubernetes.api.model.rbac.RoleRefBuilder;
import io.fabric8.kubernetes.api.model.rbac.Subject;
import io.fabric8.kubernetes.api.model.rbac.SubjectBuilder;
import io.fabric8.openshift.api.model.Route;
import io.fabric8.openshift.api.model.RouteBuilder;
import io.strimzi.api.kafka.model.common.CertAndKeySecretSource;
import io.strimzi.api.kafka.model.common.Condition;
import io.strimzi.api.kafka.model.common.Rack;
import io.strimzi.api.kafka.model.common.metrics.JmxPrometheusExporterMetrics;
Expand All @@ -63,6 +66,7 @@
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationCustom;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuth;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationTls;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType;
import io.strimzi.api.kafka.model.kafka.quotas.QuotasPlugin;
import io.strimzi.api.kafka.model.kafka.quotas.QuotasPluginStrimzi;
Expand Down Expand Up @@ -97,6 +101,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -142,6 +147,7 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp

protected static final String ENV_VAR_KAFKA_INIT_EXTERNAL_ADDRESS = "EXTERNAL_ADDRESS";
private static final String ENV_VAR_KAFKA_JMX_EXPORTER_ENABLED = "KAFKA_JMX_EXPORTER_ENABLED";
private static final String ENV_VAR_KAFKA_CLUSTER_NAME = "KAFKA_CLUSTER_NAME";
private static final String ENV_VAR_STRIMZI_OPA_AUTHZ_TRUSTED_CERTS = "STRIMZI_OPA_AUTHZ_TRUSTED_CERTS";
private static final String ENV_VAR_STRIMZI_KEYCLOAK_AUTHZ_TRUSTED_CERTS = "STRIMZI_KEYCLOAK_AUTHZ_TRUSTED_CERTS";

Expand Down Expand Up @@ -172,12 +178,6 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
public static final int INGRESS_PORT = 443;

protected static final String KAFKA_NAME = "kafka";
protected static final String CLUSTER_CA_CERTS_VOLUME = "cluster-ca";
protected static final String BROKER_CERTS_VOLUME = "broker-certs";
protected static final String CLIENT_CA_CERTS_VOLUME = "client-ca-cert";
protected static final String CLUSTER_CA_CERTS_VOLUME_MOUNT = "/opt/kafka/cluster-ca-certs";
protected static final String BROKER_CERTS_VOLUME_MOUNT = "/opt/kafka/broker-certs";
protected static final String CLIENT_CA_CERTS_VOLUME_MOUNT = "/opt/kafka/client-ca-certs";
protected static final String TRUSTED_CERTS_BASE_VOLUME_MOUNT = "/opt/kafka/certificates";
protected static final String CUSTOM_AUTHN_SECRETS_VOLUME_MOUNT = "/opt/kafka/custom-authn-secrets";
private static final String LOG_AND_METRICS_CONFIG_VOLUME_NAME = "kafka-metrics-and-logging";
Expand Down Expand Up @@ -1368,9 +1368,6 @@ private List<Volume> getNonDataVolumes(boolean isOpenShift, NodeRef node, PodTem
List<Volume> volumeList = new ArrayList<>();

volumeList.add(VolumeUtils.createTempDirVolume(templatePod));
volumeList.add(VolumeUtils.createSecretVolume(CLUSTER_CA_CERTS_VOLUME, AbstractModel.clusterCaCertSecretName(cluster), isOpenShift));
volumeList.add(VolumeUtils.createSecretVolume(BROKER_CERTS_VOLUME, node.podName(), isOpenShift));
volumeList.add(VolumeUtils.createSecretVolume(CLIENT_CA_CERTS_VOLUME, KafkaResources.clientsCaCertificateSecretName(cluster), isOpenShift));
volumeList.add(VolumeUtils.createConfigMapVolume(LOG_AND_METRICS_CONFIG_VOLUME_NAME, node.podName()));
volumeList.add(VolumeUtils.createEmptyDirVolume("ready-files", "1Ki", "Memory"));

Expand All @@ -1383,25 +1380,6 @@ private List<Volume> getNonDataVolumes(boolean isOpenShift, NodeRef node, PodTem

// Listener specific volumes related to their specific authentication or encryption settings
for (GenericKafkaListener listener : listeners) {
if (listener.isTls()
&& listener.getConfiguration() != null
&& listener.getConfiguration().getBrokerCertChainAndKey() != null) {
CertAndKeySecretSource secretSource = listener.getConfiguration().getBrokerCertChainAndKey();

Map<String, String> items = new HashMap<>(2);
items.put(secretSource.getKey(), "tls.key");
items.put(secretSource.getCertificate(), "tls.crt");

volumeList.add(
VolumeUtils.createSecretVolume(
"custom-" + ListenersUtils.identifier(listener) + "-certs",
secretSource.getSecretName(),
items,
isOpenShift
)
);
}

if (ListenersUtils.isListenerWithOAuth(listener)) {
KafkaListenerAuthenticationOAuth oauth = (KafkaListenerAuthenticationOAuth) listener.getAuth();
CertUtils.createTrustedCertificatesVolumes(volumeList, oauth.getTlsTrustedCertificates(), isOpenShift, "oauth-" + ListenersUtils.identifier(listener));
Expand Down Expand Up @@ -1460,9 +1438,6 @@ private List<Volume> getPodSetVolumes(NodeRef node, Storage storage, PodTemplate
private List<VolumeMount> getVolumeMounts(Storage storage, ContainerTemplate containerTemplate, boolean isBroker) {
List<VolumeMount> volumeMountList = new ArrayList<>(VolumeUtils.createVolumeMounts(storage, false));
volumeMountList.add(VolumeUtils.createTempDirVolumeMount());
volumeMountList.add(VolumeUtils.createVolumeMount(CLUSTER_CA_CERTS_VOLUME, CLUSTER_CA_CERTS_VOLUME_MOUNT));
volumeMountList.add(VolumeUtils.createVolumeMount(BROKER_CERTS_VOLUME, BROKER_CERTS_VOLUME_MOUNT));
volumeMountList.add(VolumeUtils.createVolumeMount(CLIENT_CA_CERTS_VOLUME, CLIENT_CA_CERTS_VOLUME_MOUNT));
volumeMountList.add(VolumeUtils.createVolumeMount(LOG_AND_METRICS_CONFIG_VOLUME_NAME, LOG_AND_METRICS_CONFIG_VOLUME_MOUNT));
volumeMountList.add(VolumeUtils.createVolumeMount("ready-files", "/var/opt/kafka"));

Expand All @@ -1477,12 +1452,6 @@ private List<VolumeMount> getVolumeMounts(Storage storage, ContainerTemplate con
for (GenericKafkaListener listener : listeners) {
String identifier = ListenersUtils.identifier(listener);

if (listener.isTls()
&& listener.getConfiguration() != null
&& listener.getConfiguration().getBrokerCertChainAndKey() != null) {
volumeMountList.add(VolumeUtils.createVolumeMount("custom-" + identifier + "-certs", "/opt/kafka/certificates/custom-" + identifier + "-certs"));
}

if (ListenersUtils.isListenerWithOAuth(listener)) {
KafkaListenerAuthenticationOAuth oauth = (KafkaListenerAuthenticationOAuth) listener.getAuth();
CertUtils.createTrustedCertificatesVolumeMounts(volumeMountList, oauth.getTlsTrustedCertificates(), TRUSTED_CERTS_BASE_VOLUME_MOUNT + "/oauth-" + identifier + "-certs/", "oauth-" + identifier);
Expand Down Expand Up @@ -1626,6 +1595,7 @@ private List<EnvVar> getEnvVars(KafkaPool pool) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_JMX_EXPORTER_ENABLED,
String.valueOf(metrics instanceof JmxPrometheusExporterModel)));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KAFKA_GC_LOG_ENABLED, String.valueOf(pool.gcLoggingEnabled)));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_CLUSTER_NAME, cluster));

JvmOptionUtils.heapOptions(varList, 50, 5L * 1024L * 1024L * 1024L, pool.jvmOptions, pool.resources);
JvmOptionUtils.jvmPerformanceOptions(varList, pool.jvmOptions);
Expand Down Expand Up @@ -1699,6 +1669,61 @@ public ClusterRoleBinding generateClusterRoleBinding(String assemblyNamespace) {
}
}

/**
* Creates a Role for reading TLS certificate secrets in the same namespace as the resource.
* This is used for loading certificates from secrets directly.
**
* @return role for the Kafka Cluster
*/
public Role generateRole() {
Set<String> certSecretNames = new HashSet<>();
certSecretNames.add(KafkaResources.clusterCaCertificateSecretName(cluster));
certSecretNames.addAll(nodes().stream().map(NodeRef::podName).toList());

for (GenericKafkaListener listener : listeners) {
if (listener.isTls()) {
if (listener.getConfiguration() != null && listener.getConfiguration().getBrokerCertChainAndKey() != null) {
certSecretNames.add(listener.getConfiguration().getBrokerCertChainAndKey().getSecretName());
}
}

if (listener.getAuth() instanceof KafkaListenerAuthenticationTls) {
certSecretNames.add(KafkaResources.clientsCaCertificateSecretName(cluster));
}
}

List<PolicyRule> rules = List.of(new PolicyRuleBuilder()
.withApiGroups("")
.withResources("secrets")
.withVerbs("get")
.withResourceNames(certSecretNames.stream().toList())
.build());

return RbacUtils.createRole(componentName, namespace, rules, labels, ownerReference, null);
}

/**
* Generates the Kafka Cluster Role Binding
*
* @return Role Binding for the Kafka Cluster
*/
public RoleBinding generateRoleBindingForRole() {
Subject subject = new SubjectBuilder()
.withKind("ServiceAccount")
.withName(componentName)
.withNamespace(namespace)
.build();

RoleRef roleRef = new RoleRefBuilder()
.withName(componentName)
.withApiGroup("rbac.authorization.k8s.io")
.withKind("Role")
.build();

return RbacUtils
.createRoleBinding(KafkaResources.kafkaRoleBindingName(cluster), namespace, roleRef, List.of(subject), labels, ownerReference, null);
}

/**
* Generates the NetworkPolicies relevant for Kafka brokers
*
Expand Down Expand Up @@ -1828,7 +1853,6 @@ private String generatePerBrokerConfiguration(NodeRef node, KafkaPool pool, Map<
.withKRaftMetadataLogDir(VolumeUtils.kraftMetadataPath(pool.storage))
.withLogDirs(VolumeUtils.createVolumeMounts(pool.storage, false))
.withListeners(cluster,
kafkaVersion,
namespace,
listeners,
listenerId -> advertisedHostnames.get(node.nodeId()).get(listenerId),
Expand Down Expand Up @@ -1896,6 +1920,18 @@ public List<ConfigMap> generatePerBrokerConfigurationConfigMaps(MetricsAndLoggin
return configMaps;
}

/**
* Generates a Secret with the given name and data in Kafka Cluster's namespace
*
* @param secretData Secret data
* @param secretName Secret name
*
* @return Secret that is generated
*/
public Secret generateSecret(Map<String, String> secretData, String secretName) {
return ModelUtils.createSecret(secretName, namespace, labels, ownerReference, secretData, Map.of(), Map.of());
}

/**
* @return Kafka version
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import io.strimzi.operator.cluster.operator.resource.kubernetes.PodDisruptionBudgetOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.PodOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.PvcOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.RoleBindingOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.RoleOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.RouteOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.SecretOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.ServiceAccountOperator;
Expand Down Expand Up @@ -143,6 +145,8 @@ public class KafkaReconciler {
private final PodDisruptionBudgetOperator podDisruptionBudgetOperator;
private final PodOperator podOperator;
private final ClusterRoleBindingOperator clusterRoleBindingOperator;
private final RoleOperator roleOperator;
private final RoleBindingOperator roleBindingOperator;
private final RouteOperator routeOperator;
private final IngressOperator ingressOperator;
private final NodeOperator nodeOperator;
Expand Down Expand Up @@ -218,6 +222,8 @@ public KafkaReconciler(
this.podDisruptionBudgetOperator = supplier.podDisruptionBudgetOperator;
this.podOperator = supplier.podOperations;
this.clusterRoleBindingOperator = supplier.clusterRoleBindingOperator;
this.roleBindingOperator = supplier.roleBindingOperations;
this.roleOperator = supplier.roleOperations;
this.routeOperator = supplier.routeOperations;
this.ingressOperator = supplier.ingressOperations;
this.nodeOperator = supplier.nodeOperator;
Expand Down Expand Up @@ -248,6 +254,8 @@ public Future<Void> reconcile(KafkaStatus kafkaStatus, Clock clock) {
.compose(i -> pvcs(kafkaStatus))
.compose(i -> serviceAccount())
.compose(i -> initClusterRoleBinding())
.compose(i -> kafkaRole())
.compose(i -> kafkaRoleBinding())
.compose(i -> scaleDown())
.compose(i -> updateNodePoolStatuses(kafkaStatus))
.compose(i -> listeners())
Expand Down Expand Up @@ -537,6 +545,40 @@ protected Future<Void> initClusterRoleBinding() {
).mapEmpty();
}

/**
* Manages the Kafka cluster role. When the desired Cluster Role Binding is null, and we get an RBAC error,
* we ignore it. This is to allow users to run the operator only inside a namespace when no features requiring
* Cluster Role are needed.
*
* @return Completes when the Cluster Role was successfully created or updated
*/
protected Future<Void> kafkaRole() {
return roleOperator
.reconcile(
reconciliation,
reconciliation.namespace(),
kafka.getComponentName(),
kafka.generateRole()
).mapEmpty();
}

/**
* Manages the Kafka cluster role binding. When the desired Cluster Role Binding is null, and we get an RBAC error,
* we ignore it. This is to allow users to run the operator only inside a namespace when no features requiring
* Cluster Role Bindings are needed.
*
* @return Completes when the Cluster Role Binding was successfully created or updated
*/
protected Future<Void> kafkaRoleBinding() {
return roleBindingOperator
.reconcile(
reconciliation,
reconciliation.namespace(),
KafkaResources.kafkaRoleBindingName(reconciliation.name()),
kafka.generateRoleBindingForRole())
.mapEmpty();
}

/**
* Scales down the Kafka cluster if needed. Kafka scale-down is done in one go.
*
Expand Down Expand Up @@ -878,7 +920,7 @@ private Future<Void> waitForNewNodes() {
}

/**
* Roles the Kafka brokers (if needed).
* Rolls the Kafka brokers (if needed).
*
* @param podSetDiffs Map with the PodSet reconciliation results
*
Expand Down
Loading
Loading