From d932f1037bdea7228760d1f2bbf151194b91dcd8 Mon Sep 17 00:00:00 2001 From: Kyle Liberti Date: Wed, 16 Jul 2025 09:31:01 -0400 Subject: [PATCH] Refactor CC capacity configuration classes Signed-off-by: Kyle Liberti --- .../operator/cluster/model/CruiseControl.java | 8 +- .../model/cruisecontrol/BrokerCapacity.java | 106 ---- .../cluster/model/cruisecontrol/Capacity.java | 455 ---------------- .../cruisecontrol/CapacityConfiguration.java | 255 +++++++++ .../model/cruisecontrol/CpuCapacity.java | 94 +++- .../model/cruisecontrol/DiskCapacity.java | 115 +++- .../cruisecontrol/InboundNetworkCapacity.java | 56 ++ .../model/cruisecontrol/NetworkCapacity.java | 43 ++ .../OutboundNetworkCapacity.java | 56 ++ .../cluster/model/CruiseControlTest.java | 363 ++----------- .../CapacityConfigurationTest.java | 505 ++++++++++++++++++ 11 files changed, 1149 insertions(+), 907 deletions(-) delete mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/BrokerCapacity.java delete mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/Capacity.java create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfiguration.java create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/InboundNetworkCapacity.java create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/NetworkCapacity.java create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/OutboundNetworkCapacity.java create mode 100644 cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfigurationTest.java diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java index 21d2179bdde..069ceb7468c 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java @@ -35,7 +35,7 @@ import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlTemplate; import io.strimzi.certs.CertAndKey; import io.strimzi.operator.cluster.ClusterOperatorConfig; -import io.strimzi.operator.cluster.model.cruisecontrol.Capacity; +import io.strimzi.operator.cluster.model.cruisecontrol.CapacityConfiguration; import io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlConfiguration; import io.strimzi.operator.cluster.model.cruisecontrol.HashLoginServiceApiCredentials; import io.strimzi.operator.cluster.model.logging.LoggingModel; @@ -111,7 +111,7 @@ public class CruiseControl extends AbstractModel implements SupportsMetrics, Sup private boolean authEnabled; private HashLoginServiceApiCredentials apiCredentials; @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the fromCrd method - protected Capacity capacity; + private CapacityConfiguration capacityConfiguration; @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the fromCrd method private MetricsModel metrics; private LoggingModel logging; @@ -197,6 +197,7 @@ public static CruiseControl fromCrd( result.image = image; KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(reconciliation, kafkaClusterSpec.getConfig().entrySet()); + result.capacityConfiguration = new CapacityConfiguration(reconciliation, ccSpec, kafkaBrokerNodes, kafkaStorage, kafkaBrokerResources); result.updateConfigurationWithDefaults(ccSpec, kafkaConfiguration); CruiseControlConfiguration ccConfiguration = result.configuration; @@ -207,7 +208,6 @@ public static CruiseControl fromCrd( // To avoid illegal storage configurations provided by the user, // we rely on the storage configuration provided by the KafkaAssemblyOperator - result.capacity = new Capacity(reconciliation, kafkaCr.getSpec(), kafkaBrokerNodes, kafkaStorage, kafkaBrokerResources); result.readinessProbeOptions = ProbeUtils.extractReadinessProbeOptionsOrDefault(ccSpec, ProbeUtils.DEFAULT_HEALTHCHECK_OPTIONS); result.livenessProbeOptions = ProbeUtils.extractLivenessProbeOptionsOrDefault(ccSpec, ProbeUtils.DEFAULT_HEALTHCHECK_OPTIONS); result.gcLoggingEnabled = ccSpec.getJvmOptions() == null ? JvmOptions.DEFAULT_GC_LOGGING_ENABLED : ccSpec.getJvmOptions().isGcLoggingEnabled(); @@ -530,7 +530,7 @@ public LoggingModel logging() { public ConfigMap generateConfigMap(MetricsAndLogging metricsAndLogging) { Map configMapData = new HashMap<>(); configMapData.put(SERVER_CONFIG_FILENAME, configuration.asOrderedProperties().asPairs()); - configMapData.put(CAPACITY_CONFIG_FILENAME, capacity.toString()); + configMapData.put(CAPACITY_CONFIG_FILENAME, capacityConfiguration.toJson()); configMapData.putAll(ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging)); return ConfigMapUtils diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/BrokerCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/BrokerCapacity.java deleted file mode 100644 index ff45addd045..00000000000 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/BrokerCapacity.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.operator.cluster.model.cruisecontrol; - -/** - * Configures the Kafka broker capacity - */ -public class BrokerCapacity { - // CC allows specifying a generic "default" broker entry in the capacity configuration to apply to all brokers without a specific broker entry. - // CC designates the id of this default broker entry as "-1". - /** - * Default broker ID - */ - public static final int DEFAULT_BROKER_ID = -1; - - /** - * Default outbound network capacity - */ - public static final String DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000"; - - private static final String DEFAULT_BROKER_DOC = "This is the default capacity. Capacity unit used for disk is in MiB, cpu is in number of cores, network throughput is in KiB."; - /** - * Default cpu core capacity - */ - public static final String DEFAULT_CPU_CORE_CAPACITY = "1.0"; - protected static final String DEFAULT_DISK_CAPACITY_IN_MIB = "100000"; - protected static final String DEFAULT_INBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000"; - - private int id; - private CpuCapacity cpu; - private DiskCapacity disk; - private String inboundNetwork; - private String outboundNetwork; - private final String doc; - - /** - * Constructor - * - * @param brokerId ID of the broker - * @param cpu CPU capacity - * @param disk Disk capacity - * @param inboundNetwork Inbound network capacity - * @param outboundNetwork Outbound network capacity - */ - public BrokerCapacity(int brokerId, CpuCapacity cpu, DiskCapacity disk, String inboundNetwork, String outboundNetwork) { - this.id = brokerId; - this.cpu = cpu; - this.disk = disk; - this.inboundNetwork = inboundNetwork; - this.outboundNetwork = outboundNetwork; - this.doc = brokerId == -1 ? DEFAULT_BROKER_DOC : "Capacity for Broker " + brokerId; - } - - /** - * @return Broker ID - */ - protected Integer getId() { - return id; - } - - /** - * @return CPU capacity - */ - public CpuCapacity getCpu() { - return cpu; - } - - /** - * @return Disk capacity - */ - protected DiskCapacity getDisk() { - return disk; - } - - /** - * @return Inbound network capacity - */ - public String getInboundNetwork() { - return inboundNetwork; - } - - /** - * @return Outbound network capacity - */ - public String getOutboundNetwork() { - return outboundNetwork; - } - - protected String getDoc() { - return doc; - } - - protected void setCpu(CpuCapacity cpu) { - this.cpu = cpu; - } - - protected void setInboundNetwork(String inboundNetwork) { - this.inboundNetwork = inboundNetwork; - } - - protected void setOutboundNetwork(String outboundNetwork) { - this.outboundNetwork = outboundNetwork; - } -} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/Capacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/Capacity.java deleted file mode 100644 index e1bc6e26e26..00000000000 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/Capacity.java +++ /dev/null @@ -1,455 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.operator.cluster.model.cruisecontrol; - -import io.fabric8.kubernetes.api.model.Quantity; -import io.fabric8.kubernetes.api.model.ResourceRequirements; -import io.strimzi.api.kafka.model.kafka.EphemeralStorage; -import io.strimzi.api.kafka.model.kafka.JbodStorage; -import io.strimzi.api.kafka.model.kafka.KafkaSpec; -import io.strimzi.api.kafka.model.kafka.PersistentClaimStorage; -import io.strimzi.api.kafka.model.kafka.SingleVolumeStorage; -import io.strimzi.api.kafka.model.kafka.Storage; -import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; -import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlSpec; -import io.strimzi.operator.cluster.model.NodeRef; -import io.strimzi.operator.cluster.model.Quantities; -import io.strimzi.operator.cluster.model.StorageUtils; -import io.strimzi.operator.cluster.model.VolumeUtils; -import io.strimzi.operator.common.Reconciliation; -import io.strimzi.operator.common.ReconciliationLogger; -import io.vertx.core.json.JsonArray; -import io.vertx.core.json.JsonObject; - -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; - -/** - * Uses information in a Kafka Custom Resource to generate a capacity configuration file to be used for - * Cruise Control's Broker Capacity File Resolver. - * - * - * For example, it takes a Kafka Custom Resource like the following: - * - * spec: - * kafka: - * replicas: 3 - * storage: - * type: jbod - * volumes: - * - id: 0 - * type: persistent-claim - * size: 100Gi - * deleteClaim: false - * - id: 1 - * type: persistent-claim - * size: 200Gi - * deleteClaim: false - * cruiseControl: - * brokerCapacity: - * cpu: "1" - * inboundNetwork: 10000KB/s - * outboundNetwork: 10000KB/s - * overrides: - * - brokers: [0] - * cpu: "2.345" - * outboundNetwork: 40000KB/s - * - brokers: [1, 2] - * cpu: 4000m - * inboundNetwork: 60000KB/s - * outboundNetwork: 20000KB/s - * - * and uses the information to create Cruise Control BrokerCapacityFileResolver config file like the following: - * - * { - * "brokerCapacities":[ - * { - * "brokerId": "-1", - * "capacity": { - * "DISK": { - * "/var/lib/kafka0/kafka-log-1": "100000", - * "/var/lib/kafka1/kafka-log-1": "200000" - * }, - * "CPU": {"num.cores": "1"}, - * "NW_IN": "10000", - * "NW_OUT": "10000" - * }, - * "doc": "This is the default capacity. Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB." - * }, - * { - * "brokerId": "0", - * "capacity": { - * "DISK": { - * "/var/lib/kafka0/kafka-log0": "100000", - * "/var/lib/kafka1/kafka-log0": "200000" - * }, - * "CPU": {"num.cores": "2.345"}, - * "NW_IN": "10000", - * "NW_OUT": "40000" - * }, - * "doc": "Capacity for Broker 0" - * }, - * { - * "brokerId": "1", - * "capacity": { - * "DISK": { - * "/var/lib/kafka0/kafka-log1": "100000", - * "/var/lib/kafka1/kafka-log1": "200000" - * }, - * "CPU": {"num.cores": "4"}, - * "NW_IN": "60000", - * "NW_OUT": "20000" - * }, - * "doc": "Capacity for Broker 1" - * }, - * "brokerId": "2", - * "capacity": { - * "DISK": { - * "/var/lib/kafka0/kafka-log2": "100000", - * "/var/lib/kafka1/kafka-log2": "200000" - * }, - * "CPU": {"num.cores": "4"}, - * "NW_IN": "60000", - * "NW_OUT": "20000" - * }, - * "doc": "Capacity for Broker 2" - * } - * ] - * } - */ -public class Capacity { - protected static final ReconciliationLogger LOGGER = ReconciliationLogger.create(Capacity.class.getName()); - private final Reconciliation reconciliation; - private final TreeMap capacityEntries; - - /** - * Broker capacities key - */ - public static final String CAPACITIES_KEY = "brokerCapacities"; - - /** - * Capacity key - */ - public static final String CAPACITY_KEY = "capacity"; - - /** - * Disk key - */ - public static final String DISK_KEY = "DISK"; - - /** - * CPU key - */ - public static final String CPU_KEY = "CPU"; - - /** - * Inbound network key - */ - public static final String INBOUND_NETWORK_KEY = "NW_IN"; - - /** - * Outbound network key - */ - public static final String OUTBOUND_NETWORK_KEY = "NW_OUT"; - - /** - * Resource type - */ - public static final String RESOURCE_TYPE = "cpu"; - - private static final String KAFKA_MOUNT_PATH = "/var/lib/kafka"; - private static final String KAFKA_LOG_DIR = "kafka-log"; - private static final String BROKER_ID_KEY = "brokerId"; - private static final String DOC_KEY = "doc"; - - private enum ResourceRequirementType { - REQUEST, - LIMIT; - - private Quantity getQuantity(ResourceRequirements resources) { - Map resourceRequirement = switch (this) { - case REQUEST -> resources.getRequests(); - case LIMIT -> resources.getLimits(); - }; - if (resourceRequirement != null) { - return resourceRequirement.get(RESOURCE_TYPE); - } - return null; - } - } - - /** - * Constructor - * - * @param reconciliation Reconciliation marker - * @param spec Spec of the Kafka custom resource - * @param kafkaBrokerNodes List of the broker nodes which are part of the Kafka cluster - * @param kafkaStorage A map with storage configuration used by the Kafka cluster and its node pools - * @param kafkaBrokerResources A map with resource configuration used by the Kafka cluster and its broker pools - */ - public Capacity( - Reconciliation reconciliation, - KafkaSpec spec, - Set kafkaBrokerNodes, - Map kafkaStorage, - Map kafkaBrokerResources - ) { - this.reconciliation = reconciliation; - this.capacityEntries = new TreeMap<>(); - - processCapacityEntries(spec.getCruiseControl(), kafkaBrokerNodes, kafkaStorage, kafkaBrokerResources); - } - - private static Integer getResourceRequirement(ResourceRequirements resources, ResourceRequirementType requirementType) { - if (resources != null) { - Quantity quantity = requirementType.getQuantity(resources); - if (quantity != null) { - return Quantities.parseCpuAsMilliCpus(quantity.toString()); - } - } - return null; - } - - private static CpuCapacity getCpuBasedOnRequirements(ResourceRequirements resourceRequirements) { - Integer request = getResourceRequirement(resourceRequirements, ResourceRequirementType.REQUEST); - Integer limit = getResourceRequirement(resourceRequirements, ResourceRequirementType.LIMIT); - - if (request != null) { - return new CpuCapacity(CpuCapacity.milliCpuToCpu(request)); - } else if (limit != null) { - return new CpuCapacity(CpuCapacity.milliCpuToCpu(limit)); - } else { - return new CpuCapacity(BrokerCapacity.DEFAULT_CPU_CORE_CAPACITY); - } - } - - /** - * The brokerCapacity overrides per broker take top precedence, then general brokerCapacity configuration, and then the Kafka resource requests, then the Kafka resource limits. - * For example: - * (1) brokerCapacity overrides - * (2) brokerCapacity - * (3) Kafka resource requests - * (4) Kafka resource limits - * When none of Cruise Control CPU capacity configurations mentioned above are configured, CPU capacity will be set to `1`. - * - * @param override brokerCapacity overrides (per broker) - * @param bc brokerCapacity (for all brokers) - * @param resourceRequirements Kafka resource requests and limits (for all brokers) - * @return A {@link CpuCapacity} object containing the specified capacity for a broker - */ - private CpuCapacity processCpu(BrokerCapacityOverride override, io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity bc, ResourceRequirements resourceRequirements) { - if (override != null && override.getCpu() != null) { - return new CpuCapacity(override.getCpu()); - } else if (bc != null && bc.getCpu() != null) { - return new CpuCapacity(bc.getCpu()); - } - return getCpuBasedOnRequirements(resourceRequirements); - } - - private static DiskCapacity processDisk(Storage storage, int brokerId) { - if (storage instanceof JbodStorage) { - return generateJbodDiskCapacity(storage, brokerId); - } else { - return generateDiskCapacity(storage); - } - } - - private static String processInboundNetwork(io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity bc, BrokerCapacityOverride override) { - if (override != null && override.getInboundNetwork() != null) { - return getThroughputInKiB(override.getInboundNetwork()); - } else if (bc != null && bc.getInboundNetwork() != null) { - return getThroughputInKiB(bc.getInboundNetwork()); - } else { - return BrokerCapacity.DEFAULT_INBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND; - } - } - - private static String processOutboundNetwork(io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity bc, BrokerCapacityOverride override) { - if (override != null && override.getOutboundNetwork() != null) { - return getThroughputInKiB(override.getOutboundNetwork()); - } else if (bc != null && bc.getOutboundNetwork() != null) { - return getThroughputInKiB(bc.getOutboundNetwork()); - } else { - return BrokerCapacity.DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND; - } - } - - /** - * Generate JBOD disk capacity configuration for a broker using the supplied storage configuration - * - * @param storage Storage configuration for Kafka cluster - * @param brokerId Id of the broker - * @return Disk capacity configuration value for broker brokerId - */ - private static DiskCapacity generateJbodDiskCapacity(Storage storage, int brokerId) { - DiskCapacity disks = new DiskCapacity(); - String size = ""; - - for (SingleVolumeStorage volume : ((JbodStorage) storage).getVolumes()) { - String name = VolumeUtils.createVolumePrefix(volume.getId(), true); - String path = KAFKA_MOUNT_PATH + "/" + name + "/" + KAFKA_LOG_DIR + brokerId; - - if (volume instanceof PersistentClaimStorage) { - size = ((PersistentClaimStorage) volume).getSize(); - } else if (volume instanceof EphemeralStorage) { - size = ((EphemeralStorage) volume).getSizeLimit(); - } - disks.add(path, String.valueOf(getSizeInMiB(size))); - } - return disks; - } - - /** - * Generate total disk capacity using the supplied storage configuration - * - * @param storage Storage configuration for Kafka cluster - * @return Disk capacity per broker - */ - private static DiskCapacity generateDiskCapacity(Storage storage) { - if (storage instanceof PersistentClaimStorage) { - return DiskCapacity.of(getSizeInMiB(((PersistentClaimStorage) storage).getSize())); - } else if (storage instanceof EphemeralStorage) { - if (((EphemeralStorage) storage).getSizeLimit() != null) { - return DiskCapacity.of(getSizeInMiB(((EphemeralStorage) storage).getSizeLimit())); - } else { - return DiskCapacity.of(BrokerCapacity.DEFAULT_DISK_CAPACITY_IN_MIB); - } - } else if (storage == null) { - throw new IllegalStateException("The storage declaration is missing"); - } else { - throw new IllegalStateException("The declared storage '" + storage.getType() + "' is not supported"); - } - } - - /* - * Parse a K8S-style representation of a disk size, such as {@code 100Gi}, - * into the equivalent number of mebibytes represented as a String. - * - * @param size The String representation of the volume size. - * @return The equivalent number of mebibytes. - */ - private static String getSizeInMiB(String size) { - if (size == null) { - return BrokerCapacity.DEFAULT_DISK_CAPACITY_IN_MIB; - } - return String.valueOf(StorageUtils.convertTo(size, "Mi")); - } - - /** - * Parse Strimzi representation of throughput, such as {@code 10000KB/s}, - * into the equivalent number of kibibytes represented as a String. - * - * @param throughput The String representation of the throughput. - * @return The equivalent number of kibibytes. - */ - public static String getThroughputInKiB(String throughput) { - String size = throughput.substring(0, throughput.indexOf("B")); - return String.valueOf(StorageUtils.convertTo(size, "Ki")); - } - - private void processCapacityEntries(CruiseControlSpec spec, Set kafkaBrokerNodes, Map kafkaStorage, Map kafkaBrokerResources) { - io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity brokerCapacity = spec.getBrokerCapacity(); - - String inboundNetwork = processInboundNetwork(brokerCapacity, null); - String outboundNetwork = processOutboundNetwork(brokerCapacity, null); - - // Initialize default capacities - for (NodeRef node : kafkaBrokerNodes) { - DiskCapacity disk = processDisk(kafkaStorage.get(node.poolName()), node.nodeId()); - CpuCapacity cpu = processCpu(null, brokerCapacity, kafkaBrokerResources.get(node.poolName())); - - BrokerCapacity broker = new BrokerCapacity(node.nodeId(), cpu, disk, inboundNetwork, outboundNetwork); - capacityEntries.put(node.nodeId(), broker); - } - - // Override default capacities - if (brokerCapacity != null) { - List overrides = brokerCapacity.getOverrides(); - // Override broker entries - if (overrides != null) { - if (overrides.isEmpty()) { - LOGGER.warnCr(reconciliation, "Ignoring empty overrides list"); - } else { - // For checking for duplicate brokerIds - Set overrideIds = new HashSet<>(); - for (BrokerCapacityOverride override : overrides) { - List ids = override.getBrokers(); - inboundNetwork = processInboundNetwork(brokerCapacity, override); - outboundNetwork = processOutboundNetwork(brokerCapacity, override); - for (int id : ids) { - if (id == BrokerCapacity.DEFAULT_BROKER_ID) { - LOGGER.warnCr(reconciliation, "Ignoring broker capacity override with illegal broker id -1."); - } else { - if (capacityEntries.containsKey(id)) { - if (overrideIds.add(id)) { - BrokerCapacity brokerCapacityEntry = capacityEntries.get(id); - brokerCapacityEntry.setCpu(processCpu(override, brokerCapacity, kafkaBrokerResources.get(Integer.toString(id)))); - brokerCapacityEntry.setInboundNetwork(inboundNetwork); - brokerCapacityEntry.setOutboundNetwork(outboundNetwork); - } else { - LOGGER.warnCr(reconciliation, "Duplicate broker id {} found in overrides, using first occurrence.", id); - } - } else { - LOGGER.warnCr(reconciliation, "Ignoring broker capacity override for unknown node ID {}", id); - overrideIds.add(id); - } - } - } - } - } - } - } - } - /** - * Generate broker capacity entry for capacity configuration. - * - * @param brokerCapacity Broker capacity object - * @return Broker entry as a JsonObject - */ - private JsonObject generateBrokerCapacity(BrokerCapacity brokerCapacity) { - return new JsonObject() - .put(BROKER_ID_KEY, brokerCapacity.getId()) - .put(CAPACITY_KEY, new JsonObject() - .put(DISK_KEY, brokerCapacity.getDisk().getJson()) - .put(CPU_KEY, brokerCapacity.getCpu().getJson()) - .put(INBOUND_NETWORK_KEY, brokerCapacity.getInboundNetwork()) - .put(OUTBOUND_NETWORK_KEY, brokerCapacity.getOutboundNetwork()) - ) - .put(DOC_KEY, brokerCapacity.getDoc()); - } - - /** - * Generate a capacity configuration for cluster - * - * @return Cruise Control capacity configuration as a JsonObject - */ - public JsonObject generateCapacityConfig() { - JsonArray brokerList = new JsonArray(); - for (BrokerCapacity brokerCapacity : capacityEntries.values()) { - JsonObject brokerEntry = generateBrokerCapacity(brokerCapacity); - brokerList.add(brokerEntry); - } - - JsonObject config = new JsonObject(); - config.put("brokerCapacities", brokerList); - - return config; - } - - @Override - public String toString() { - return generateCapacityConfig().encodePrettily(); - } - - /** - * @return Capacity entries - */ - public TreeMap getCapacityEntries() { - return capacityEntries; - } -} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfiguration.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfiguration.java new file mode 100644 index 00000000000..14213d8eace --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfiguration.java @@ -0,0 +1,255 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.strimzi.api.kafka.model.kafka.Storage; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlSpec; +import io.strimzi.operator.cluster.model.NodeRef; +import io.strimzi.operator.common.Reconciliation; +import io.strimzi.operator.common.ReconciliationLogger; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +/** + * Uses information in `Kafka` and `KafkaNodePool` custom resources to generate a capacity configuration file to + * be used for Cruise Control's Broker Capacity File Resolver. + * + * For example, it takes a `Kafka` custom resource like the following: + * + * kind: Kafka + * metadata: + * name: my-cluster + * annotations: + * strimzi.io/node-pools: enabled + * strimzi.io/kraft: enabled + * spec: + * kafka: + * ... + * cruiseControl: + * brokerCapacity: + * cpu: "1" + * inboundNetwork: 10000KB/s + * outboundNetwork: 10000KB/s + * overrides: + * - brokers: [0] + * cpu: "2.345" + * outboundNetwork: 40000KB/s + * - brokers: [1, 2] + * cpu: 4000m + * inboundNetwork: 60000KB/s + * outboundNetwork: 20000KB/s + * + * and `KafkaNodePool` custom resources like the following: + * + * kind: KafkaNodePool + * metadata: + * name: controller + * labels: + * strimzi.io/cluster: my-cluster + * spec: + * replicas: 3 + * roles: + * - controller + * storage: + * type: jbod + * volumes: + * - id: 0 + * type: ephemeral + * kraftMetadata: shared + * --- + * + * apiVersion: kafka.strimzi.io/v1beta2 + * kind: KafkaNodePool + * metadata: + * name: broker + * labels: + * strimzi.io/cluster: my-cluster + * spec: + * replicas: 3 + * roles: + * - broker + * storage: + * type: jbod + * volumes: + * - id: 0 + * type: persistent-claim + * size: 100Gi + * deleteClaim: false + * - id: 1 + * type: persistent-claim + * size: 200Gi + * deleteClaim: false + * --- + * + * Using this information, this class generates Cruise Control BrokerCapacityFileResolver config file like the following: + * + * { + * "brokerCapacities": [ + * { + * "brokerId": "0", + * "capacity": { + * "DISK": { + * "/var/lib/kafka0/kafka-log0": "100000", + * "/var/lib/kafka1/kafka-log0": "200000" + * }, + * "CPU": { "num.cores": "2.345" }, + * "NW_IN": "10000", + * "NW_OUT": "40000" + * }, + * "doc": "Capacity for Broker 0" + * }, + * { + * "brokerId": "1", + * "capacity": { + * "DISK": { + * "/var/lib/kafka0/kafka-log1": "100000", + * "/var/lib/kafka1/kafka-log1": "200000" + * }, + * "CPU": { "num.cores": "4" }, + * "NW_IN": "60000", + * "NW_OUT": "20000" + * }, + * "doc": "Capacity for Broker 1" + * }, + * { + * "brokerId": "2", + * "capacity": { + * "DISK": { + * "/var/lib/kafka0/kafka-log2": "100000", + * "/var/lib/kafka1/kafka-log2": "200000" + * }, + * "CPU": { "num.cores": "4" }, + * "NW_IN": "60000", + * "NW_OUT": "20000" + * }, + * "doc": "Capacity for Broker 2" + * } + * ] + * } + */ +public class CapacityConfiguration { + private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(CapacityConfiguration.class.getName()); + + private final TreeMap capacityEntries; + + /** + * Constructor + * + * @param reconciliation Reconciliation marker. + * @param spec Spec of Cruise Control in the `Kafka` custom resource. + * @param kafkaBrokerNodes List of the broker nodes which are part of the Kafka cluster. + * @param kafkaStorage A map with storage configuration used by the Kafka cluster and its node pools. + * @param kafkaBrokerResources A map with resource configuration used by the Kafka cluster and its broker pools. + */ + public CapacityConfiguration( + Reconciliation reconciliation, + CruiseControlSpec spec, + Set kafkaBrokerNodes, + Map kafkaStorage, + Map kafkaBrokerResources + ) { + this.capacityEntries = generateCapacityEntries(reconciliation, spec, kafkaBrokerNodes, + kafkaStorage, kafkaBrokerResources); + } + + private static Map processBrokerCapacityOverrides(Reconciliation reconciliation, + Set kafkaBrokerNodes, + BrokerCapacity brokerCapacity) { + Map overrideMap = new HashMap<>(); + + if (brokerCapacity != null && brokerCapacity.getOverrides() != null) { + for (BrokerCapacityOverride override : brokerCapacity.getOverrides()) { + List ids = override.getBrokers(); + + for (int id : ids) { + if (overrideMap.containsKey(id)) { + LOGGER.warnCr(reconciliation, "Duplicate broker brokerId {} found in overrides, using first occurrence.", id); + } else if (kafkaBrokerNodes.stream().noneMatch(node -> node.nodeId() == id)) { + LOGGER.warnCr(reconciliation, "Ignoring broker capacity override for unknown node ID {}", id); + } else { + overrideMap.put(id, override); + } + } + } + } + + return overrideMap; + } + + private static TreeMap generateCapacityEntries(Reconciliation reconciliation, + CruiseControlSpec spec, + Set kafkaBrokerNodes, + Map kafkaStorage, + Map kafkaBrokerResources) { + TreeMap capacityEntries = new TreeMap<>(); + BrokerCapacity commonBrokerCapacity = spec.getBrokerCapacity(); + Map brokerCapacityOverrideMap = processBrokerCapacityOverrides(reconciliation, + kafkaBrokerNodes, commonBrokerCapacity); + + for (NodeRef node : kafkaBrokerNodes) { + BrokerCapacityOverride brokerCapacityOverride = brokerCapacityOverrideMap.get(node.nodeId()); + + DiskCapacity disk = new DiskCapacity(kafkaStorage.get(node.poolName()), node.nodeId()); + CpuCapacity cpu = new CpuCapacity(commonBrokerCapacity, brokerCapacityOverride, kafkaBrokerResources.get(node.poolName())); + InboundNetworkCapacity inboundNetwork = new InboundNetworkCapacity(commonBrokerCapacity, brokerCapacityOverride); + OutboundNetworkCapacity outboundNetwork = new OutboundNetworkCapacity(commonBrokerCapacity, brokerCapacityOverride); + + capacityEntries.put(node.nodeId(), new CapacityEntry(node.nodeId(), disk, cpu, inboundNetwork, outboundNetwork)); + } + + return capacityEntries; + } + + /** + * Generate a capacity configuration for cluster. + * + * @return Cruise Control capacity configuration as a formatted JSON String. + */ + public String toJson() { + JsonArray capacityList = new JsonArray(); + + for (CapacityEntry capacityEntry : capacityEntries.values()) { + + JsonObject capacityEntryJson = new JsonObject() + .put("brokerId", capacityEntry.brokerId) + .put("capacity", new JsonObject() + .put("DISK", capacityEntry.disk.getJson()) + .put("CPU", capacityEntry.cpu.getJson()) + .put("NW_IN", capacityEntry.inboundNetwork.getJson()) + .put("NW_OUT", capacityEntry.outboundNetwork.getJson())) + .put("doc", "Capacity for Broker " + capacityEntry.brokerId); + + capacityList.add(capacityEntryJson); + } + + return new JsonObject().put("brokerCapacities", capacityList).encodePrettily(); + } + + /** + * Represents a Cruise Control capacity entry configuration for a Kafka broker. + * + * @param brokerId The broker ID. + * @param disk The disk capacity configuration. + * @param cpu The CPU capacity configuration. + * @param inboundNetwork The inbound network capacity configuration. + * @param outboundNetwork The outbound network capacity configuration. + */ + private record CapacityEntry( + int brokerId, + DiskCapacity disk, + CpuCapacity cpu, + InboundNetworkCapacity inboundNetwork, + OutboundNetworkCapacity outboundNetwork + ) { } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java index 352b70b3cb3..d30fb84a55f 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java @@ -4,41 +4,109 @@ */ package io.strimzi.operator.cluster.model.cruisecontrol; +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; import io.strimzi.operator.cluster.model.Quantities; import io.vertx.core.json.JsonObject; /** - * Cruise Control CPU Capacity class + * Cruise Control CPU capacity configuration for broker. */ public class CpuCapacity { + /** + * Default capacity value + */ + /* test */ static final String DEFAULT_CPU_CORE_CAPACITY = "1.0"; + private static final String CORES_KEY = "num.cores"; - private final String cores; + private final JsonObject config = new JsonObject(); /** * Constructor * - * @param cores CPU cores configuration + * Given the configured brokerCapacity, broker-specific capacity override, and broker resource requirements, + * returns the capacity for the resource. + * + * @param brokerCapacity The general brokerCapacity configuration. + * @param brokerCapacityOverride The brokerCapacityOverride for specific broker. + * @param resourceRequirements The Kafka resource requests and limits (for all brokers). */ - public CpuCapacity(String cores) { - this.cores = milliCpuToCpu(Quantities.parseCpuAsMilliCpus(cores)); + protected CpuCapacity(BrokerCapacity brokerCapacity, + BrokerCapacityOverride brokerCapacityOverride, + ResourceRequirements resourceRequirements) { + String cores = processResourceCapacity(brokerCapacity, brokerCapacityOverride, resourceRequirements); + config.put(CORES_KEY, milliCpuToCpu(Quantities.parseCpuAsMilliCpus(cores))); } - protected static String milliCpuToCpu(int milliCPU) { + private static String milliCpuToCpu(int milliCPU) { return String.valueOf(milliCPU / 1000.0); } /** - * Returns CpuCapacity object as a JsonObject + * Returns capacity value as a JsonObject. * - * @return The CpuCapacity object as a JsonObject + * @return The capacity value as a JsonObject. */ - public JsonObject getJson() { - return new JsonObject().put(CORES_KEY, this.cores); + protected JsonObject getJson() { + return config; } - @Override - public String toString() { - return this.getJson().toString(); + /** + * Given the configured brokerCapacity, broker-specific capacity override, and broker resource requirements, + * returns the capacity for the resource. + * + *

+ * The broker-specific capacity override takes top precedence, then general brokerCapacity configuration, + * and then the Kafka resource requests, then the Kafka resource limits, then resource + * default. + * + * For example: + *

    + *
  • (1) The brokerCapacityOverride for a specific broker. + *
  • (2) The general brokerCapacity configuration. + *
  • (3) Kafka resource requests + *
  • (4) Kafka resource limits + *
  • (5) The resource default. + *
+ * + * @param brokerCapacity The general brokerCapacity configuration. + * @param brokerCapacityOverride The brokerCapacityOverride for specific broker. + * @param resourceRequirements The Kafka resource requests and limits (for all brokers). + * + * @return The capacity of resource represented as a String. + */ + public static String processResourceCapacity(BrokerCapacity brokerCapacity, + BrokerCapacityOverride brokerCapacityOverride, + ResourceRequirements resourceRequirements) { + if (brokerCapacityOverride != null && brokerCapacityOverride.getCpu() != null) { + return brokerCapacityOverride.getCpu(); + } else if (brokerCapacity != null && brokerCapacity.getCpu() != null) { + return brokerCapacity.getCpu(); + } else { + String cpuBasedOnRequirements = getCpuBasedOnRequirements(resourceRequirements); + if (cpuBasedOnRequirements != null) { + return cpuBasedOnRequirements; + } else { + return DEFAULT_CPU_CORE_CAPACITY; + } + } + } + + /** + * Derives the CPU capacity from the resource requirements section of Strimzi custom resource. + * + * @param resources The Strimzi custom resource requirements containing CPU requests and/or limits. + * @return The CPU capacity as a {@link String}, or {@code null} if no CPU values are defined. + */ + private static String getCpuBasedOnRequirements(ResourceRequirements resources) { + if (resources != null && resources.getRequests() != null && resources.getRequests().get("cpu") != null) { + return resources.getRequests().get("cpu").toString(); + } else if (resources != null && resources.getLimits() != null && resources.getLimits().get("cpu") != null) { + return resources.getLimits().get("cpu").toString(); + } else { + return null; + } } } diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/DiskCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/DiskCapacity.java index 56eb91757c5..52dee9bf9b8 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/DiskCapacity.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/DiskCapacity.java @@ -4,49 +4,132 @@ */ package io.strimzi.operator.cluster.model.cruisecontrol; +import io.strimzi.api.kafka.model.kafka.EphemeralStorage; +import io.strimzi.api.kafka.model.kafka.JbodStorage; +import io.strimzi.api.kafka.model.kafka.PersistentClaimStorage; +import io.strimzi.api.kafka.model.kafka.SingleVolumeStorage; +import io.strimzi.api.kafka.model.kafka.Storage; +import io.strimzi.operator.cluster.model.StorageUtils; +import io.strimzi.operator.cluster.model.VolumeUtils; import io.vertx.core.json.JsonObject; import java.util.HashMap; import java.util.Map; /** - * Cruise Control disk capacity + * Cruise Control disk capacity configuration for broker. */ public class DiskCapacity { + private static final String DEFAULT_DISK_CAPACITY_IN_MIB = "100000"; + private static final String KAFKA_MOUNT_PATH = "/var/lib/kafka"; + private static final String KAFKA_LOG_DIR = "kafka-log"; private static final String SINGLE_DISK = ""; - private Map map; + + private final Map config; /** * Constructor + * + * Generate JBOD disk capacity configuration for a broker using the supplied storage configuration. + * + * @param storage Storage configuration for Kafka cluster. + * @param brokerId Id of the broker. */ - public DiskCapacity() { - map = new HashMap<>(1); + protected DiskCapacity(Storage storage, int brokerId) { + if (storage instanceof JbodStorage jbodStorage) { + config = generateJbodDiskConfig(jbodStorage, brokerId); + } else { + config = generateNonJbodDiskConfig(storage); + } } - private DiskCapacity(String size) { - this(); - map.put(SINGLE_DISK, size); + /** + * Parse a K8S-style representation of a disk size, such as {@code 100Gi}, + * into the equivalent number of mebibytes represented as a String. + * + * @param size The String representation of the volume size. + * @return The equivalent number of mebibytes. + */ + private static String getSizeInMiB(String size) { + if (size == null) { + return DEFAULT_DISK_CAPACITY_IN_MIB; + } else { + return String.valueOf(StorageUtils.convertTo(size, "Mi")); + } } - protected static DiskCapacity of(String size) { - return new DiskCapacity(size); + /** + * Generate JBOD disk capacity configuration for a broker using the supplied storage configuration. + * + * @param storage Storage configuration for Kafka cluster. + * @param brokerId Id of the broker. + * + * @return Disk capacity configuration value for broker brokerId. + */ + private Map generateJbodDiskConfig(Storage storage, int brokerId) { + String size = ""; + Map diskConfig = new HashMap<>(); + + for (SingleVolumeStorage volume : ((JbodStorage) storage).getVolumes()) { + String name = VolumeUtils.createVolumePrefix(volume.getId(), true); + String path = KAFKA_MOUNT_PATH + "/" + name + "/" + KAFKA_LOG_DIR + brokerId; + + if (volume instanceof PersistentClaimStorage ps) { + size = ps.getSize(); + } else if (volume instanceof EphemeralStorage es) { + size = es.getSizeLimit(); + } + + diskConfig.put(path, String.valueOf(getSizeInMiB(size))); + } + + return diskConfig; } - protected void add(String path, String size) { - if (path == null || SINGLE_DISK.equals(path)) { - throw new IllegalArgumentException("The disk path cannot be null or empty"); + /** + * Generate total disk capacity using the supplied storage configuration. + * + * @param storage Storage configuration for Kafka cluster. + * + * @return Disk capacity configuration value for broker. + */ + private static Map generateNonJbodDiskConfig(Storage storage) { + String size; + + if (storage instanceof PersistentClaimStorage) { + size = getSizeInMiB(((PersistentClaimStorage) storage).getSize()); + } else if (storage instanceof EphemeralStorage es) { + + if (es.getSizeLimit() != null) { + size = getSizeInMiB(es.getSizeLimit()); + } else { + size = DEFAULT_DISK_CAPACITY_IN_MIB; + } + + } else if (storage == null) { + throw new IllegalStateException("The storage declaration is missing"); + } else { + throw new IllegalStateException("The declared storage '" + storage.getType() + "' is not supported"); } - map.put(path, size); + + return Map.of(SINGLE_DISK, size); } + /** + * Returns capacity value as a JsonObject. + * + * @return The capacity value as a JsonObject. + */ protected Object getJson() { - if (map.size() == 1 && map.containsKey(SINGLE_DISK)) { - return map.get(SINGLE_DISK); + if (config.size() == 1 && config.containsKey(SINGLE_DISK)) { + return config.get(SINGLE_DISK); } else { JsonObject disks = new JsonObject(); - for (Map.Entry e : map.entrySet()) { + + for (Map.Entry e : config.entrySet()) { disks.put(e.getKey(), e.getValue()); } + return disks; } } diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/InboundNetworkCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/InboundNetworkCapacity.java new file mode 100644 index 00000000000..10b9728d81c --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/InboundNetworkCapacity.java @@ -0,0 +1,56 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; + +/** + * Cruise Control inbound network capacity configuration for broker. + */ +public class InboundNetworkCapacity extends NetworkCapacity { + /** + * Constructor + * + * Given the configured brokerCapacity, broker-specific capacity override, returns the capacity for the resource. + * + * @param brokerCapacity The general brokerCapacity configuration. + * @param brokerCapacityOverride The brokerCapacityOverride for specific broker. + */ + protected InboundNetworkCapacity(BrokerCapacity brokerCapacity, BrokerCapacityOverride brokerCapacityOverride) { + super(getThroughputInKiB(processResourceCapacity(brokerCapacity, brokerCapacityOverride))); + } + + /** + * Given the configured brokerCapacity, broker-specific capacity override, and broker resource requirements, + * returns the capacity for the resource. + * + *

+ * The broker-specific capacity override takes top precedence, then general brokerCapacity configuration, + * then resource default. + * + * For example: + *

    + *
  • (1) The brokerCapacityOverride for a specific broker. + *
  • (2) The general brokerCapacity configuration. + *
  • (3) The resource default. + *
+ * + * @param brokerCapacity The general brokerCapacity configuration. + * @param brokerCapacityOverride The brokerCapacityOverride for specific broker. + * + * @return The capacity of resource represented as a String. + */ + private static String processResourceCapacity(BrokerCapacity brokerCapacity, + BrokerCapacityOverride brokerCapacityOverride) { + if (brokerCapacityOverride != null && brokerCapacityOverride.getInboundNetwork() != null) { + return brokerCapacityOverride.getInboundNetwork(); + } else if (brokerCapacity != null && brokerCapacity.getInboundNetwork() != null) { + return brokerCapacity.getInboundNetwork(); + } else { + return DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND; + } + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/NetworkCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/NetworkCapacity.java new file mode 100644 index 00000000000..02115e2230d --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/NetworkCapacity.java @@ -0,0 +1,43 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + +import io.strimzi.operator.cluster.model.StorageUtils; + +/** + * Cruise Control network capacity configuration for broker. + */ +public abstract class NetworkCapacity { + /** + * Default capacity value + */ + protected static final String DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000KiB/s"; + protected String config; + + protected NetworkCapacity(String config) { + this.config = config; + } + + /** + * Parse Strimzi representation of throughput, such as {@code 10000KB/s}, + * into the equivalent number of kibibytes represented as a String. + * + * @param throughput The String representation of the throughput. + * @return The equivalent number of kibibytes. + */ + protected static String getThroughputInKiB(String throughput) { + String size = throughput.substring(0, throughput.indexOf("B")); + return String.valueOf(StorageUtils.convertTo(size, "Ki")); + } + + /** + * Returns capacity value as a JsonObject. + * + * @return The capacity value as a JsonObject. + */ + protected String getJson() { + return config; + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/OutboundNetworkCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/OutboundNetworkCapacity.java new file mode 100644 index 00000000000..166d30a1bfc --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/OutboundNetworkCapacity.java @@ -0,0 +1,56 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride; + +/** + * Cruise Control outbound network capacity configuration for broker. + */ +public class OutboundNetworkCapacity extends NetworkCapacity { + /** + * Constructor + * + * Given the configured brokerCapacity, broker-specific capacity override, returns the capacity for the resource. + * + * @param brokerCapacity The general brokerCapacity configuration. + * @param brokerCapacityOverride The brokerCapacityOverride for specific broker. + */ + protected OutboundNetworkCapacity(BrokerCapacity brokerCapacity, BrokerCapacityOverride brokerCapacityOverride) { + super(getThroughputInKiB(processResourceCapacity(brokerCapacity, brokerCapacityOverride))); + } + + /** + * Given the configured brokerCapacity, broker-specific capacity override, and broker resource requirements, + * returns the capacity for the resource. + * + *

+ * The broker-specific capacity override takes top precedence, then general brokerCapacity configuration, + * then resource default. + * + * For example: + *

    + *
  • (1) The brokerCapacityOverride for a specific broker. + *
  • (2) The general brokerCapacity configuration. + *
  • (3) The resource default. + *
+ * + * @param brokerCapacity The general brokerCapacity configuration. + * @param brokerCapacityOverride The brokerCapacityOverride for specific broker. + * + * @return The capacity of resource represented as a String. + */ + private static String processResourceCapacity(BrokerCapacity brokerCapacity, + BrokerCapacityOverride brokerCapacityOverride) { + if (brokerCapacityOverride != null && brokerCapacityOverride.getOutboundNetwork() != null) { + return brokerCapacityOverride.getOutboundNetwork(); + } else if (brokerCapacity != null && brokerCapacity.getOutboundNetwork() != null) { + return brokerCapacity.getOutboundNetwork(); + } else { + return DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND; + } + } +} diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java index 260fa716f2e..672b351e783 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java @@ -49,16 +49,12 @@ import io.strimzi.api.kafka.model.kafka.KafkaResources; import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder; import io.strimzi.api.kafka.model.kafka.Storage; -import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityBuilder; import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlResources; import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType; import io.strimzi.operator.cluster.KafkaVersionTestUtils; import io.strimzi.operator.cluster.PlatformFeaturesAvailability; import io.strimzi.operator.cluster.ResourceUtils; -import io.strimzi.operator.cluster.model.cruisecontrol.BrokerCapacity; -import io.strimzi.operator.cluster.model.cruisecontrol.Capacity; -import io.strimzi.operator.cluster.model.cruisecontrol.CpuCapacity; import io.strimzi.operator.cluster.model.metrics.JmxPrometheusExporterModel; import io.strimzi.operator.common.Annotations; import io.strimzi.operator.common.Reconciliation; @@ -71,9 +67,6 @@ import io.strimzi.test.TestUtils; import io.strimzi.test.annotations.ParallelSuite; import io.strimzi.test.annotations.ParallelTest; -import io.vertx.core.json.JsonArray; -import io.vertx.core.json.JsonObject; -import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterAll; import java.io.IOException; @@ -103,6 +96,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.hasProperty; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @SuppressWarnings({ @@ -146,241 +140,62 @@ public class CruiseControlTest { ); private static final Map STORAGE = Map.of("brokers", new JbodStorageBuilder().withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("100Gi").build()).build()); - @AfterAll public static void cleanUp() { ResourceUtils.cleanUpTemporaryTLSFiles(); } @ParallelTest - public void testBrokerCapacity() { - // Test user defined capacities - String userDefinedCpuCapacity = "2575m"; - JsonObject expectedCpuCapacity = new CpuCapacity(userDefinedCpuCapacity).getJson(); - - io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity userDefinedBrokerCapacity = new io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity(); - userDefinedBrokerCapacity.setCpu(userDefinedCpuCapacity); - userDefinedBrokerCapacity.setInboundNetwork("50000KB/s"); - userDefinedBrokerCapacity.setOutboundNetwork("50000KB/s"); - - Kafka kafka = new KafkaBuilder(KAFKA) - .editSpec() - .withNewCruiseControl() - .withBrokerCapacity(userDefinedBrokerCapacity) - .endCruiseControl() - .endSpec() - .build(); - Map storage = Map.of("brokers", new JbodStorageBuilder().withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("50Gi").build(), new PersistentClaimStorageBuilder().withId(1).withSize("60Gi").build()).build()); - Map resources = Map.of("brokers", new ResourceRequirementsBuilder().withRequests(Map.of(Capacity.RESOURCE_TYPE, new Quantity("400m"))).withLimits(Map.of(Capacity.RESOURCE_TYPE, new Quantity("0.5"))).build()); - CruiseControl cc = createCruiseControl(kafka, NODES, storage, resources); - - ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); - JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME)); - JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY); - - for (Object brokerEntry : brokerEntries) { - JsonObject brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(Capacity.CAPACITY_KEY); - Object diskCapacity = brokerCapacity.getValue(Capacity.DISK_KEY); - JsonObject cpuCapacity = brokerCapacity.getJsonObject(Capacity.CPU_KEY); - - assertThat(isJBOD(diskCapacity), is(true)); - assertThat(cpuCapacity, is(expectedCpuCapacity)); - } - } - - @ParallelTest - public void testBrokerCapacityOverrides() { - // Test capacity overrides - String userDefinedCpuCapacity = "2575m"; - String userDefinedCpuCapacityOverride0 = "1.222"; - String inboundNetwork = "50000KB/s"; - String inboundNetworkOverride0 = "25000KB/s"; - String inboundNetworkOverride1 = "10000KiB/s"; - String outboundNetworkOverride1 = "15000KB/s"; - - int broker0 = 0; - int broker1 = 1; - int broker2 = 2; - - List overrideList0 = List.of(broker0, broker1, broker2, broker0); - List overrideList1 = List.of(broker1); - - Kafka kafka = new KafkaBuilder(KAFKA) - .editSpec() - .withNewCruiseControl() - .withNewBrokerCapacity() - .withCpu(userDefinedCpuCapacity) - .withInboundNetwork(inboundNetwork) - .addNewOverride() - .withBrokers(overrideList0) - .withCpu(userDefinedCpuCapacityOverride0) - .withInboundNetwork(inboundNetworkOverride0) - .endOverride() - .addNewOverride() - .withBrokers(overrideList1) - .withInboundNetwork(inboundNetworkOverride1) - .withOutboundNetwork(outboundNetworkOverride1) - .endOverride() - .endBrokerCapacity() - .endCruiseControl() - .endSpec() - .build(); - Map storage = Map.of("brokers", new PersistentClaimStorageBuilder().withId(0).withSize("50Gi").build()); - Map resources = Map.of("brokers", new ResourceRequirementsBuilder().withRequests(Map.of(Capacity.RESOURCE_TYPE, new Quantity("400m"))).withLimits(Map.of(Capacity.RESOURCE_TYPE, new Quantity("0.5"))).build()); - CruiseControl cc = createCruiseControl(kafka, NODES, storage, resources); - - ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); - JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME)); - JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY); - - for (Object brokerEntry : brokerEntries) { - JsonObject brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(Capacity.CAPACITY_KEY); - Object diskCapacity = brokerCapacity.getValue(Capacity.DISK_KEY); - - assertThat(isJBOD(diskCapacity), is(false)); - } - - JsonObject brokerEntry0 = brokerEntries.getJsonObject(broker0).getJsonObject(Capacity.CAPACITY_KEY); - assertThat(brokerEntry0.getJsonObject(Capacity.CPU_KEY), is(new CpuCapacity(userDefinedCpuCapacityOverride0).getJson())); - assertThat(brokerEntry0.getString(Capacity.INBOUND_NETWORK_KEY), is(Capacity.getThroughputInKiB(inboundNetworkOverride0))); - assertThat(brokerEntry0.getString(Capacity.OUTBOUND_NETWORK_KEY), is(BrokerCapacity.DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND)); - - // When the same broker id is specified in brokers list of multiple overrides, use the value specified in the first override. - JsonObject brokerEntry1 = brokerEntries.getJsonObject(broker1).getJsonObject(Capacity.CAPACITY_KEY); - assertThat(brokerEntry1.getJsonObject(Capacity.CPU_KEY), is(new CpuCapacity(userDefinedCpuCapacityOverride0).getJson())); - assertThat(brokerEntry1.getString(Capacity.INBOUND_NETWORK_KEY), is(Capacity.getThroughputInKiB(inboundNetworkOverride0))); - assertThat(brokerEntry1.getString(Capacity.OUTBOUND_NETWORK_KEY), is(BrokerCapacity.DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND)); - - JsonObject brokerEntry2 = brokerEntries.getJsonObject(broker2).getJsonObject(Capacity.CAPACITY_KEY); - assertThat(brokerEntry2.getJsonObject(Capacity.CPU_KEY), is(new CpuCapacity(userDefinedCpuCapacityOverride0).getJson())); - assertThat(brokerEntry2.getString(Capacity.INBOUND_NETWORK_KEY), is(Capacity.getThroughputInKiB(inboundNetworkOverride0))); - } - - @ParallelTest - public void testBrokerCapacityGeneratedCpu() { - String userDefinedCpuCapacity = "2575m"; - String userDefinedCpuCapacityOverride0 = "1.222"; - String inboundNetwork = "50000KB/s"; - String inboundNetworkOverride0 = "25000KB/s"; - String inboundNetworkOverride1 = "10000KiB/s"; - String outboundNetworkOverride1 = "15000KB/s"; - - int broker0 = 0; - int broker1 = 1; - int broker2 = 2; - - List overrideList0 = List.of(broker0, broker1, broker2, broker0); - List overrideList1 = List.of(broker1); - - Kafka kafka = new KafkaBuilder(KAFKA) - .editSpec() - .withNewCruiseControl() - .withNewBrokerCapacity() - .withCpu(userDefinedCpuCapacity) - .withInboundNetwork(inboundNetwork) - .addNewOverride() - .withBrokers(overrideList0) - .withCpu(userDefinedCpuCapacityOverride0) - .withInboundNetwork(inboundNetworkOverride0) - .endOverride() - .addNewOverride() - .withBrokers(overrideList1) - .withInboundNetwork(inboundNetworkOverride1) - .withOutboundNetwork(outboundNetworkOverride1) - .endOverride() - .endBrokerCapacity() - .endCruiseControl() - .endSpec() - .build(); - Map storage = Map.of("brokers", new JbodStorageBuilder().withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("50Gi").build(), new PersistentClaimStorageBuilder().withId(1).withSize("60Gi").build()).build()); - Map resources = Map.of("brokers", new ResourceRequirementsBuilder().withRequests(Map.of(Capacity.RESOURCE_TYPE, new Quantity("500m"))).withLimits(Map.of(Capacity.RESOURCE_TYPE, new Quantity("0.5"))).build()); - CruiseControl cc = createCruiseControl(kafka, NODES, storage, resources); - - ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); - JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME)); - JsonObject expectedCpuCapacity = new CpuCapacity(userDefinedCpuCapacityOverride0).getJson(); - JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY); - for (Object brokerEntry : brokerEntries) { - JsonObject brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(Capacity.CAPACITY_KEY); - JsonObject cpuCapacity = brokerCapacity.getJsonObject(Capacity.CPU_KEY); - assertThat(cpuCapacity, is(expectedCpuCapacity)); - } - } - - @ParallelTest - public void testBrokerCapacitiesWithPools() { - Set nodes = Set.of( - new NodeRef("my-cluster-pool1-0", 0, "pool1", false, true), - new NodeRef("my-cluster-pool1-1", 1, "pool1", false, true), - new NodeRef("my-cluster-pool1-2", 2, "pool1", false, true), - new NodeRef("my-cluster-pool2-10", 10, "pool2", false, true), - new NodeRef("my-cluster-pool2-11", 11, "pool2", false, true), - new NodeRef("my-cluster-pool2-12", 12, "pool2", false, true) - ); - - Map storage = new HashMap<>(); - storage.put("pool1", new JbodStorageBuilder().withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("100Gi").build()).build()); - storage.put("pool2", new JbodStorageBuilder().withVolumes(new PersistentClaimStorageBuilder().withId(1).withSize("1Ti").build()).build()); - - Map resources = new HashMap<>(); - resources.put("pool1", new ResourceRequirementsBuilder().withLimits(Map.of("cpu", new Quantity("4"), "memory", new Quantity("16Gi"))).build()); - resources.put("pool2", new ResourceRequirementsBuilder().withLimits(Map.of("cpu", new Quantity("5"), "memory", new Quantity("20Gi"))).build()); - - // Test the capacity - CruiseControl cc = createCruiseControl(KAFKA, nodes, storage, resources); + public void testGenerateCapacityConfig() { + CruiseControl cc = createCruiseControl(KAFKA, NODES, STORAGE, Map.of()); ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); - JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME)); - JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY); - - assertThat(brokerEntries.size(), is(6)); - - // Broker 0 - JsonObject brokerEntry = brokerEntries.getJsonObject(0); - assertThat(brokerEntry.getInteger("brokerId"), is(0)); - JsonObject brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); - assertThat(brokerCpuCapacity.getString("num.cores"), is("4.0")); - JsonObject brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); - assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-0/kafka-log0"), is("102400.0")); - - // Broker 1 - brokerEntry = brokerEntries.getJsonObject(1); - assertThat(brokerEntry.getInteger("brokerId"), is(1)); - brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); - assertThat(brokerCpuCapacity.getString("num.cores"), is("4.0")); - brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); - assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-0/kafka-log1"), is("102400.0")); - - // Broker 2 - brokerEntry = brokerEntries.getJsonObject(2); - assertThat(brokerEntry.getInteger("brokerId"), is(2)); - brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); - assertThat(brokerCpuCapacity.getString("num.cores"), is("4.0")); - brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); - assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-0/kafka-log2"), is("102400.0")); - - // Broker 10 - brokerEntry = brokerEntries.getJsonObject(3); - assertThat(brokerEntry.getInteger("brokerId"), is(10)); - brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); - assertThat(brokerCpuCapacity.getString("num.cores"), is("5.0")); - brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); - assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-1/kafka-log10"), is("1048576.0")); - - // Broker 11 - brokerEntry = brokerEntries.getJsonObject(4); - assertThat(brokerEntry.getInteger("brokerId"), is(11)); - brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); - assertThat(brokerCpuCapacity.getString("num.cores"), is("5.0")); - brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); - assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-1/kafka-log11"), is("1048576.0")); - - // Broker 12 - brokerEntry = brokerEntries.getJsonObject(5); - assertThat(brokerEntry.getInteger("brokerId"), is(12)); - brokerCpuCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY); - assertThat(brokerCpuCapacity.getString("num.cores"), is("5.0")); - brokerDiskCapacity = brokerEntry.getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.DISK_KEY); - assertThat(brokerDiskCapacity.getString("/var/lib/kafka/data-1/kafka-log12"), is("1048576.0")); + String actualCapacityConfig = configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME); + String expectedCapacityConfig = """ + { + "brokerCapacities" : [ { + "brokerId" : 0, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-0/kafka-log0" : "102400.0" + }, + "CPU" : { + "num.cores" : "1.0" + }, + "NW_IN" : "10000.0", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 0" + }, { + "brokerId" : 1, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-0/kafka-log1" : "102400.0" + }, + "CPU" : { + "num.cores" : "1.0" + }, + "NW_IN" : "10000.0", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 1" + }, { + "brokerId" : 2, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-0/kafka-log2" : "102400.0" + }, + "CPU" : { + "num.cores" : "1.0" + }, + "NW_IN" : "10000.0", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 2" + } ] + }"""; + + assertThat(actualCapacityConfig, notNullValue()); + assertEquals(expectedCapacityConfig, actualCapacityConfig); } @ParallelTest @@ -744,81 +559,6 @@ public void testResources() { assertThat(ccContainer.getResources().getRequests(), is(requests)); } - @ParallelTest - public void testCpuCapacityGeneration() { - String brokerCpuCapacity = "6.0"; - String brokerCpuCapacityOverride = "2.0"; - String resourceLimitCpu = "3.0"; - String resourceRequestCpu = "4.0"; - - Map storage = Map.of("brokers", new JbodStorageBuilder().withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("100Gi").build()).build()); - - /* In this test case, when override is set for a broker, it takes precedence over the general broker capacity setting for that - specific broker, but the general broker capacity takes precedence over the Kafka resource request */ - Map resources = Map.of("brokers", new ResourceRequirementsBuilder().addToLimits("cpu", new Quantity(resourceLimitCpu)).addToRequests("cpu", new Quantity(resourceRequestCpu)).build()); - - io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity brokerCapacityOne = new BrokerCapacityBuilder() - .withCpu(brokerCpuCapacity) - .withOverrides() - .addNewOverride() - .addToBrokers(0) - .withCpu(brokerCpuCapacityOverride) - .endOverride() - .build(); - - verifyBrokerCapacity(storage, resources, brokerCapacityOne, brokerCpuCapacityOverride, brokerCpuCapacity, brokerCpuCapacity); - - // In this test case, when override is set for a broker, it takes precedence over the Kafka resource request - io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity brokerCapacityTwo = new BrokerCapacityBuilder() - .withOverrides() - .addNewOverride() - .addToBrokers(0) - .withCpu(brokerCpuCapacityOverride) - .endOverride() - .build(); - - verifyBrokerCapacity(storage, resources, brokerCapacityTwo, brokerCpuCapacityOverride, resourceRequestCpu, resourceRequestCpu); - - /* In this test case, when neither the override nor the CPU resource request are configured but the CPU - resource limit for CPU is set for the Kafka brokers; therefore, resource limit will be used as the CPU capacity */ - resources = Map.of("brokers", new ResourceRequirementsBuilder().addToLimits("cpu", new Quantity(resourceLimitCpu)).build()); - - io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity brokerCapacityThree = new BrokerCapacityBuilder().build(); - - verifyBrokerCapacity(storage, resources, brokerCapacityThree, resourceLimitCpu, resourceLimitCpu, resourceLimitCpu); - - /* In this test case, when neither the override nor the Kafka resource requests or limits for CPU are configured, - the CPU capacity will be set to DEFAULT_CPU_CORE_CAPACITY */ - resources = Map.of("brokers", new ResourceRequirementsBuilder().build()); - - verifyBrokerCapacity(storage, resources, brokerCapacityThree, BrokerCapacity.DEFAULT_CPU_CORE_CAPACITY, BrokerCapacity.DEFAULT_CPU_CORE_CAPACITY, BrokerCapacity.DEFAULT_CPU_CORE_CAPACITY); - } - - private void verifyBrokerCapacity(Map storage, - Map resources, - io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity brokerCapacity, - String brokerOneCpuValue, - String brokerTwoCpuValue, - String brokerThreeCpuValue) { - Kafka kafka = new KafkaBuilder(KAFKA) - .editSpec() - .withNewCruiseControl() - .withBrokerCapacity(brokerCapacity) - .endCruiseControl() - .endSpec() - .build(); - - CruiseControl cc = createCruiseControl(kafka, NODES, storage, resources); - - ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null)); - JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME)); - JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY); - - assertThat(brokerEntries.getJsonObject(0).getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY).getString("num.cores"), is(Matchers.equalTo(brokerOneCpuValue))); - assertThat(brokerEntries.getJsonObject(1).getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY).getString("num.cores"), is(Matchers.equalTo(brokerTwoCpuValue))); - assertThat(brokerEntries.getJsonObject(2).getJsonObject(Capacity.CAPACITY_KEY).getJsonObject(Capacity.CPU_KEY).getString("num.cores"), is(Matchers.equalTo(brokerThreeCpuValue))); - } - @ParallelTest public void testApiSecurity() { // Test with security enabled @@ -1236,7 +976,4 @@ private List getExpectedEnvVars() { return expected; } - private static boolean isJBOD(Object diskCapacity) { - return diskCapacity instanceof JsonObject; - } } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfigurationTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfigurationTest.java new file mode 100644 index 00000000000..392d116e597 --- /dev/null +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfigurationTest.java @@ -0,0 +1,505 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder; +import io.strimzi.api.kafka.model.kafka.JbodStorageBuilder; +import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder; +import io.strimzi.api.kafka.model.kafka.Storage; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityBuilder; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlSpec; +import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlSpecBuilder; +import io.strimzi.operator.cluster.model.NodeRef; +import io.strimzi.operator.common.Reconciliation; +import io.strimzi.test.annotations.ParallelTest; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CapacityConfigurationTest { + private static final Map RESOURCES = createResources("400m", "0.5"); + private static final Set NODES = Set.of( + new NodeRef("my-cluster-brokers-0", 0, "brokers", false, true), + new NodeRef("my-cluster-brokers-1", 1, "brokers", false, true), + new NodeRef("my-cluster-brokers-2", 2, "brokers", false, true) + ); + + @ParallelTest + public void testBrokerCapacity() { + CruiseControlSpec spec = new CruiseControlSpecBuilder() + .withNewBrokerCapacity() + .withCpu("2575m") + .withInboundNetwork("50000KB/s") + .withOutboundNetwork("50000KB/s") + .endBrokerCapacity() + .build(); + + Map storage = Map.of( + "brokers", + new JbodStorageBuilder() + .withVolumes( + new PersistentClaimStorageBuilder() + .withId(0) + .withSize("50Gi") + .build(), + new PersistentClaimStorageBuilder() + .withId(1) + .withSize("60Gi") + .build() + ) + .build() + ); + + CapacityConfiguration config = new CapacityConfiguration(Reconciliation.DUMMY_RECONCILIATION, spec, NODES, storage, RESOURCES); + String actualCapacityConfig = config.toJson(); + String expectedCapacityConfig = """ + { + "brokerCapacities" : [ { + "brokerId" : 0, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-0/kafka-log0" : "51200.0", + "/var/lib/kafka/data-1/kafka-log0" : "61440.0" + }, + "CPU" : { + "num.cores" : "2.575" + }, + "NW_IN" : "48828.125", + "NW_OUT" : "48828.125" + }, + "doc" : "Capacity for Broker 0" + }, { + "brokerId" : 1, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-1/kafka-log1" : "61440.0", + "/var/lib/kafka/data-0/kafka-log1" : "51200.0" + }, + "CPU" : { + "num.cores" : "2.575" + }, + "NW_IN" : "48828.125", + "NW_OUT" : "48828.125" + }, + "doc" : "Capacity for Broker 1" + }, { + "brokerId" : 2, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-0/kafka-log2" : "51200.0", + "/var/lib/kafka/data-1/kafka-log2" : "61440.0" + }, + "CPU" : { + "num.cores" : "2.575" + }, + "NW_IN" : "48828.125", + "NW_OUT" : "48828.125" + }, + "doc" : "Capacity for Broker 2" + } ] + }"""; + + assertEquals(expectedCapacityConfig, actualCapacityConfig); + } + + @ParallelTest + public void testBrokerCapacityOverrides() { + CruiseControlSpec spec = new CruiseControlSpecBuilder() + .withNewBrokerCapacity() + .withCpu("2575m") + .withInboundNetwork("50000KB/s") + .addNewOverride() + .withBrokers(List.of(0, 1, 2, 0)) + .withCpu("1.222") + .withInboundNetwork("25000KB/s") + .endOverride() + .addNewOverride() + .withBrokers(List.of(1)) + .withInboundNetwork("10000KiB/s") + .withOutboundNetwork("15000KB/s") + .endOverride() + .endBrokerCapacity() + .build(); + + Map storage = Map.of( + "brokers", + new PersistentClaimStorageBuilder() + .withId(0) + .withSize("50Gi") + .build() + ); + + CapacityConfiguration config = new CapacityConfiguration(Reconciliation.DUMMY_RECONCILIATION, spec, NODES, storage, RESOURCES); + String actualCapacityConfig = config.toJson(); + String expectedCapacityConfig = """ + { + "brokerCapacities" : [ { + "brokerId" : 0, + "capacity" : { + "DISK" : "51200.0", + "CPU" : { + "num.cores" : "1.222" + }, + "NW_IN" : "24414.0625", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 0" + }, { + "brokerId" : 1, + "capacity" : { + "DISK" : "51200.0", + "CPU" : { + "num.cores" : "1.222" + }, + "NW_IN" : "24414.0625", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 1" + }, { + "brokerId" : 2, + "capacity" : { + "DISK" : "51200.0", + "CPU" : { + "num.cores" : "1.222" + }, + "NW_IN" : "24414.0625", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 2" + } ] + }"""; + + assertEquals(expectedCapacityConfig, actualCapacityConfig); + } + + @ParallelTest + public void testBrokerCapacityGeneratedCpu() { + CruiseControlSpec spec = new CruiseControlSpecBuilder() + .withNewBrokerCapacity() + .withCpu("2575m") + .withInboundNetwork("50000KB/s") + .addNewOverride() + .withBrokers(List.of(0, 1, 2, 0)) + .withCpu("1.222") + .withInboundNetwork("25000KB/s") + .endOverride() + .addNewOverride() + .withBrokers(List.of(1)) + .withInboundNetwork("10000KiB/s") + .withOutboundNetwork("15000KB/s") + .endOverride() + .endBrokerCapacity() + .build(); + + Map storage = Map.of( + "brokers", + new JbodStorageBuilder() + .withVolumes( + new PersistentClaimStorageBuilder().withId(0).withSize("50Gi").build(), + new PersistentClaimStorageBuilder().withId(1).withSize("60Gi").build() + ) + .build() + ); + + CapacityConfiguration config = new CapacityConfiguration(Reconciliation.DUMMY_RECONCILIATION, spec, NODES, storage, RESOURCES); + String actualCapacityConfig = config.toJson(); + String expectedCapacityConfig = """ + { + "brokerCapacities" : [ { + "brokerId" : 0, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-0/kafka-log0" : "51200.0", + "/var/lib/kafka/data-1/kafka-log0" : "61440.0" + }, + "CPU" : { + "num.cores" : "1.222" + }, + "NW_IN" : "24414.0625", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 0" + }, { + "brokerId" : 1, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-1/kafka-log1" : "61440.0", + "/var/lib/kafka/data-0/kafka-log1" : "51200.0" + }, + "CPU" : { + "num.cores" : "1.222" + }, + "NW_IN" : "24414.0625", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 1" + }, { + "brokerId" : 2, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-0/kafka-log2" : "51200.0", + "/var/lib/kafka/data-1/kafka-log2" : "61440.0" + }, + "CPU" : { + "num.cores" : "1.222" + }, + "NW_IN" : "24414.0625", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 2" + } ] + }"""; + + assertEquals(expectedCapacityConfig, actualCapacityConfig); + } + + @ParallelTest + public void testBrokerCapacitiesWithPools() { + Set nodes = Set.of( + new NodeRef("my-cluster-pool1-0", 0, "pool1", false, true), + new NodeRef("my-cluster-pool1-1", 1, "pool1", false, true), + new NodeRef("my-cluster-pool1-2", 2, "pool1", false, true), + new NodeRef("my-cluster-pool2-10", 10, "pool2", false, true), + new NodeRef("my-cluster-pool2-11", 11, "pool2", false, true), + new NodeRef("my-cluster-pool2-12", 12, "pool2", false, true) + ); + + Map storage = new HashMap<>(); + storage.put("pool1", new JbodStorageBuilder().withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("100Gi").build()).build()); + storage.put("pool2", new JbodStorageBuilder().withVolumes(new PersistentClaimStorageBuilder().withId(1).withSize("1Ti").build()).build()); + + Map resources = new HashMap<>(); + resources.put("pool1", new ResourceRequirementsBuilder().withLimits(Map.of("cpu", new Quantity("4"), "memory", new Quantity("16Gi"))).build()); + resources.put("pool2", new ResourceRequirementsBuilder().withLimits(Map.of("cpu", new Quantity("5"), "memory", new Quantity("20Gi"))).build()); + + CapacityConfiguration config = new CapacityConfiguration(Reconciliation.DUMMY_RECONCILIATION, new CruiseControlSpec(), nodes, storage, resources); + String actualCapacityConfig = config.toJson(); + String expectedCapacityConfig = """ + { + "brokerCapacities" : [ { + "brokerId" : 0, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-0/kafka-log0" : "102400.0" + }, + "CPU" : { + "num.cores" : "4.0" + }, + "NW_IN" : "10000.0", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 0" + }, { + "brokerId" : 1, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-0/kafka-log1" : "102400.0" + }, + "CPU" : { + "num.cores" : "4.0" + }, + "NW_IN" : "10000.0", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 1" + }, { + "brokerId" : 2, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-0/kafka-log2" : "102400.0" + }, + "CPU" : { + "num.cores" : "4.0" + }, + "NW_IN" : "10000.0", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 2" + }, { + "brokerId" : 10, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-1/kafka-log10" : "1048576.0" + }, + "CPU" : { + "num.cores" : "5.0" + }, + "NW_IN" : "10000.0", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 10" + }, { + "brokerId" : 11, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-1/kafka-log11" : "1048576.0" + }, + "CPU" : { + "num.cores" : "5.0" + }, + "NW_IN" : "10000.0", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 11" + }, { + "brokerId" : 12, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-1/kafka-log12" : "1048576.0" + }, + "CPU" : { + "num.cores" : "5.0" + }, + "NW_IN" : "10000.0", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 12" + } ] + }"""; + + assertEquals(expectedCapacityConfig, actualCapacityConfig); + } + + @ParallelTest + public void testCpuCapacityGeneration() { + String brokerCpuCapacity = "6.0"; + String brokerCpuCapacityOverride = "2.0"; + String resourceRequestCpu = "3.0"; + String resourceLimitCpu = "4.0"; + + Map storage = Map.of( + "brokers", + new JbodStorageBuilder() + .withVolumes( + new PersistentClaimStorageBuilder() + .withId(0) + .withSize("100Gi") + .build() + ) + .build() + ); + + /* In this test case, when override is set for a broker, it takes precedence over the general broker capacity setting for that + specific broker, but the general broker capacity takes precedence over the Kafka resource request */ + Map resources = createResources(resourceRequestCpu, resourceLimitCpu); + + BrokerCapacity brokerCapacityOne = new BrokerCapacityBuilder() + .withCpu(brokerCpuCapacity) + .withOverrides() + .addNewOverride() + .addToBrokers(0) + .withCpu(brokerCpuCapacityOverride) + .endOverride() + .build(); + + verifyBrokerCapacity(storage, resources, brokerCapacityOne, brokerCpuCapacityOverride, brokerCpuCapacity, brokerCpuCapacity); + + // In this test case, when override is set for a broker, it takes precedence over the Kafka resource request + BrokerCapacity brokerCapacityTwo = new BrokerCapacityBuilder() + .withOverrides() + .addNewOverride() + .addToBrokers(0) + .withCpu(brokerCpuCapacityOverride) + .endOverride() + .build(); + + verifyBrokerCapacity(storage, resources, brokerCapacityTwo, brokerCpuCapacityOverride, resourceRequestCpu, resourceRequestCpu); + + /* In this test case, when neither the override nor the CPU resource request are configured but the CPU + resource limit for CPU is set for the Kafka brokers; therefore, resource limit will be used as the CPU capacity */ + resources = Map.of("brokers", + new ResourceRequirementsBuilder() + .withLimits(Map.of("cpu", new Quantity(resourceLimitCpu))) + .build()); + + BrokerCapacity brokerCapacityThree = new BrokerCapacityBuilder().build(); + + verifyBrokerCapacity(storage, resources, brokerCapacityThree, resourceLimitCpu, resourceLimitCpu, resourceLimitCpu); + + /* In this test case, when neither the override nor the Kafka resource requests or limits for CPU are configured, + the CPU capacity will be set to DEFAULT_CPU_CORE_CAPACITY */ + resources = Map.of("brokers", new ResourceRequirementsBuilder().build()); + + verifyBrokerCapacity(storage, resources, brokerCapacityThree, CpuCapacity.DEFAULT_CPU_CORE_CAPACITY, + CpuCapacity.DEFAULT_CPU_CORE_CAPACITY, CpuCapacity.DEFAULT_CPU_CORE_CAPACITY); + } + + private void verifyBrokerCapacity(Map storage, + Map resources, + BrokerCapacity brokerCapacity, + String brokerZeroCpuValue, + String brokerOneCpuValue, + String brokerTwoCpuValue) { + CruiseControlSpec spec = new CruiseControlSpecBuilder() + .withBrokerCapacity(brokerCapacity) + .build(); + + CapacityConfiguration config = new CapacityConfiguration(Reconciliation.DUMMY_RECONCILIATION, spec, NODES, storage, resources); + String actualCapacityConfig = config.toJson(); + String expectedCapacityConfig = String.format(""" + { + "brokerCapacities" : [ { + "brokerId" : 0, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-0/kafka-log0" : "102400.0" + }, + "CPU" : { + "num.cores" : "%s" + }, + "NW_IN" : "10000.0", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 0" + }, { + "brokerId" : 1, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-0/kafka-log1" : "102400.0" + }, + "CPU" : { + "num.cores" : "%s" + }, + "NW_IN" : "10000.0", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 1" + }, { + "brokerId" : 2, + "capacity" : { + "DISK" : { + "/var/lib/kafka/data-0/kafka-log2" : "102400.0" + }, + "CPU" : { + "num.cores" : "%s" + }, + "NW_IN" : "10000.0", + "NW_OUT" : "10000.0" + }, + "doc" : "Capacity for Broker 2" + } ] + }""", + brokerZeroCpuValue, + brokerOneCpuValue, + brokerTwoCpuValue); + + assertEquals(expectedCapacityConfig, actualCapacityConfig); + } + + private static Map createResources(String cpuRequest, String cpuLimit) { + return Map.of("brokers", + new ResourceRequirementsBuilder() + .withRequests(Map.of("cpu", new Quantity(cpuRequest))) + .withLimits(Map.of("cpu", new Quantity(cpuLimit))) + .build()); + } +}