Skip to content

Commit 5ab37d1

Browse files
committed
Addressing feedback - js
Signed-off-by: Kyle Liberti <[email protected]>
1 parent 91694e1 commit 5ab37d1

File tree

11 files changed

+208
-303
lines changed

11 files changed

+208
-303
lines changed

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public class CruiseControl extends AbstractModel implements SupportsMetrics, Sup
111111
private boolean authEnabled;
112112
private HashLoginServiceApiCredentials apiCredentials;
113113
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the fromCrd method
114-
protected CapacityConfiguration capacityConfiguration;
114+
private CapacityConfiguration capacityConfiguration;
115115
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the fromCrd method
116116
private MetricsModel metrics;
117117
private LoggingModel logging;

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CapacityConfiguration.java

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ public class CapacityConfiguration {
154154
private static final String BROKER_ID_KEY = "brokerId";
155155
private static final String DOC_KEY = "doc";
156156

157-
private final Reconciliation reconciliation;
158157
private final TreeMap<Integer, CapacityEntry> capacityEntries;
159158

160159
/**
@@ -173,13 +172,13 @@ public CapacityConfiguration(
173172
Map<String, Storage> kafkaStorage,
174173
Map<String, ResourceRequirements> kafkaBrokerResources
175174
) {
176-
this.reconciliation = reconciliation;
177-
this.capacityEntries = new TreeMap<>();
178-
179-
generateCapacityEntries(spec.getCruiseControl(), kafkaBrokerNodes, kafkaStorage, kafkaBrokerResources);
175+
this.capacityEntries = generateCapacityEntries(reconciliation, spec.getCruiseControl(), kafkaBrokerNodes,
176+
kafkaStorage, kafkaBrokerResources);
180177
}
181178

182-
private Map<Integer, BrokerCapacityOverride> processBrokerCapacityOverrides(Set<NodeRef> kafkaBrokerNodes, BrokerCapacity brokerCapacity) {
179+
private static Map<Integer, BrokerCapacityOverride> processBrokerCapacityOverrides(Reconciliation reconciliation,
180+
Set<NodeRef> kafkaBrokerNodes,
181+
BrokerCapacity brokerCapacity) {
183182
Map<Integer, BrokerCapacityOverride> overrideMap = new HashMap<>();
184183
List<BrokerCapacityOverride> overrides = null;
185184
if (brokerCapacity != null) {
@@ -206,12 +205,15 @@ private Map<Integer, BrokerCapacityOverride> processBrokerCapacityOverrides(Set<
206205
return overrideMap;
207206
}
208207

209-
private void generateCapacityEntries(CruiseControlSpec spec,
208+
private static TreeMap<Integer, CapacityEntry> generateCapacityEntries(Reconciliation reconciliation,
209+
CruiseControlSpec spec,
210210
Set<NodeRef> kafkaBrokerNodes,
211211
Map<String, Storage> kafkaStorage,
212212
Map<String, ResourceRequirements> kafkaBrokerResources) {
213+
TreeMap<Integer, CapacityEntry> capacityEntries = new TreeMap<>();
213214
BrokerCapacity generalBrokerCapacity = spec.getBrokerCapacity();
214-
Map<Integer, BrokerCapacityOverride> brokerCapacityOverrideMap = processBrokerCapacityOverrides(kafkaBrokerNodes, generalBrokerCapacity);
215+
Map<Integer, BrokerCapacityOverride> brokerCapacityOverrideMap = processBrokerCapacityOverrides(reconciliation,
216+
kafkaBrokerNodes, generalBrokerCapacity);
215217

216218
for (NodeRef node : kafkaBrokerNodes) {
217219
BrokerCapacityOverride brokerCapacityOverride = brokerCapacityOverrideMap.get(node.nodeId());
@@ -221,38 +223,24 @@ private void generateCapacityEntries(CruiseControlSpec spec,
221223
InboundNetworkCapacity inboundNetwork = new InboundNetworkCapacity(generalBrokerCapacity, brokerCapacityOverride);
222224
OutboundNetworkCapacity outboundNetwork = new OutboundNetworkCapacity(generalBrokerCapacity, brokerCapacityOverride);
223225

224-
CapacityEntry capacityEntry = new CapacityEntry(node.nodeId(), cpu, disk, inboundNetwork, outboundNetwork);
226+
CapacityEntry capacityEntry = new CapacityEntry(node.nodeId(), disk, cpu, inboundNetwork, outboundNetwork);
225227
capacityEntries.put(node.nodeId(), capacityEntry);
226228
}
229+
return capacityEntries;
227230
}
228231

229232
/**
230233
* Represents a Cruise Control capacity entry configuration for a Kafka broker.
231234
*
232235
* @param id The broker ID.
233-
* @param capacity The capacity map for the broker capacity entry.
234-
* @param doc A human-readable description of this capacity entry.
235236
*/
236-
public record CapacityEntry(
237+
private record CapacityEntry(
237238
int id,
238-
Map<String, Object> capacity,
239-
String doc
240-
) {
241-
private CapacityEntry(int id, CpuCapacity cpu, DiskCapacity disk, InboundNetworkCapacity inboundNetwork,
242-
OutboundNetworkCapacity outboundNetwork) {
243-
this(id, buildCapacityMap(cpu, disk, inboundNetwork, outboundNetwork), "Capacity for Broker " + id);
244-
}
245-
246-
private static Map<String, Object> buildCapacityMap(CpuCapacity cpu, DiskCapacity disk, InboundNetworkCapacity inboundNetwork,
247-
OutboundNetworkCapacity outboundNetwork) {
248-
Map<String, Object> map = new HashMap<>();
249-
map.put(DiskCapacity.KEY, disk.getJson());
250-
map.put(CpuCapacity.KEY, cpu.getJson());
251-
map.put(InboundNetworkCapacity.KEY, inboundNetwork.toString());
252-
map.put(OutboundNetworkCapacity.KEY, outboundNetwork.toString());
253-
return map;
254-
}
255-
}
239+
DiskCapacity disk,
240+
CpuCapacity cpu,
241+
InboundNetworkCapacity inboundNetwork,
242+
OutboundNetworkCapacity outboundNetwork
243+
) { }
256244

257245
/**
258246
* Generate a capacity configuration for cluster.
@@ -263,24 +251,20 @@ private static Map<String, Object> buildCapacityMap(CpuCapacity cpu, DiskCapacit
263251
public String toString() {
264252
JsonArray capacityList = new JsonArray();
265253
for (CapacityEntry capacityEntry : capacityEntries.values()) {
266-
JsonObject capacityJson = new JsonObject();
267-
capacityEntry.capacity.forEach(capacityJson::put);
254+
JsonObject capacityJson = new JsonObject()
255+
.put(DiskCapacity.KEY, capacityEntry.disk.getJson())
256+
.put(CpuCapacity.KEY, capacityEntry.cpu.getJson())
257+
.put(InboundNetworkCapacity.KEY, capacityEntry.inboundNetwork.toString())
258+
.put(OutboundNetworkCapacity.KEY, capacityEntry.outboundNetwork.toString());
268259

269260
JsonObject capacityEntryJson = new JsonObject()
270261
.put(BROKER_ID_KEY, capacityEntry.id)
271262
.put(CAPACITY_KEY, capacityJson)
272-
.put(DOC_KEY, capacityEntry.doc());
263+
.put(DOC_KEY, "Capacity for Broker " + capacityEntry.id);
273264

274265
capacityList.add(capacityEntryJson);
275266
}
276267

277268
return new JsonObject().put(CAPACITIES_KEY, capacityList).encodePrettily();
278269
}
279-
280-
/**
281-
* @return Capacity entries
282-
*/
283-
public TreeMap<Integer, CapacityEntry> getCapacityEntries() {
284-
return capacityEntries;
285-
}
286270
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java

Lines changed: 92 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,23 @@
44
*/
55
package io.strimzi.operator.cluster.model.cruisecontrol;
66

7+
import io.fabric8.kubernetes.api.model.Quantity;
78
import io.fabric8.kubernetes.api.model.ResourceRequirements;
89
import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity;
910
import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride;
1011
import io.strimzi.operator.cluster.model.Quantities;
1112
import io.vertx.core.json.JsonObject;
1213

13-
import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.CPU;
14+
import java.util.Map;
1415

1516
/**
1617
* Cruise Control CPU capacity configuration for broker.
1718
*/
1819
public class CpuCapacity {
19-
protected static final String DEFAULT_CPU_CORE_CAPACITY = "1.0";
20+
/**
21+
* Default capacity value
22+
*/
23+
public static final String DEFAULT_CPU_CORE_CAPACITY = "1.0";
2024
/**
2125
* Key used to identify resource in broker entry in Cruise Control capacity configuration.
2226
*/
@@ -48,7 +52,7 @@ public CpuCapacity(String cores) {
4852
public CpuCapacity(BrokerCapacity brokerCapacity,
4953
BrokerCapacityOverride brokerCapacityOverride,
5054
ResourceRequirements resourceRequirements) {
51-
this(CPU.processResourceCapacity(brokerCapacity, brokerCapacityOverride, resourceRequirements));
55+
this(processResourceCapacity(brokerCapacity, brokerCapacityOverride, resourceRequirements));
5256
}
5357

5458
private static String milliCpuToCpu(int milliCPU) {
@@ -68,4 +72,89 @@ public JsonObject getJson() {
6872
public String toString() {
6973
return config.toString();
7074
}
75+
76+
/**
77+
* Given the configured brokerCapacity, broker-specific capacity override, and broker resource requirements,
78+
* returns the capacity for the resource.
79+
*
80+
* <p>
81+
* The broker-specific capacity override takes top precedence, then general brokerCapacity configuration,
82+
* and then the Kafka resource requests, then the Kafka resource limits, then resource
83+
* default.
84+
*
85+
* For example:
86+
* <ul>
87+
* <li> (1) The brokerCapacityOverride for a specific broker.
88+
* <li> (2) The general brokerCapacity configuration.
89+
* <li> (3) Kafka resource requests
90+
* <li> (4) Kafka resource limits
91+
* <li> (5) The resource default.
92+
* </ul>
93+
*
94+
* @param brokerCapacity The general brokerCapacity configuration.
95+
* @param brokerCapacityOverride The brokerCapacityOverride for specific broker.
96+
* @param resourceRequirements The Kafka resource requests and limits (for all brokers).
97+
*
98+
* @return The capacity of resource represented as a String.
99+
*/
100+
public static String processResourceCapacity(BrokerCapacity brokerCapacity,
101+
BrokerCapacityOverride brokerCapacityOverride,
102+
ResourceRequirements resourceRequirements) {
103+
if (brokerCapacityOverride != null && brokerCapacityOverride.getCpu() != null) {
104+
return brokerCapacityOverride.getCpu();
105+
}
106+
107+
if (brokerCapacity != null && brokerCapacity.getCpu() != null) {
108+
return brokerCapacity.getCpu();
109+
}
110+
111+
112+
String cpuBasedOnRequirements = getCpuBasedOnRequirements(resourceRequirements);
113+
if (cpuBasedOnRequirements != null) {
114+
return cpuBasedOnRequirements;
115+
}
116+
117+
return DEFAULT_CPU_CORE_CAPACITY;
118+
}
119+
120+
/**
121+
* Enum representing resource requirement types used in the resource requirements section of Strimzi custom resources.
122+
*/
123+
private enum ResourceRequirementsType {
124+
REQUEST,
125+
LIMIT;
126+
}
127+
128+
private static Quantity getCpuQuantity(ResourceRequirements resources,
129+
ResourceRequirementsType requirementType) {
130+
if (resources == null) {
131+
return null;
132+
}
133+
134+
Map<String, Quantity> resourceRequirement = switch (requirementType) {
135+
case REQUEST -> resources.getRequests();
136+
case LIMIT -> resources.getLimits();
137+
};
138+
139+
return resourceRequirement != null ? resourceRequirement.get("cpu") : null;
140+
}
141+
142+
/**
143+
* Derives the CPU capacity from the resource requirements section of Strimzi custom resource.
144+
*
145+
* @param resourceRequirements The Strimzi custom resource requirements containing CPU requests and/or limits.
146+
* @return The CPU capacity as a {@link String}, or {@code null} if no CPU values are defined.
147+
*/
148+
private static String getCpuBasedOnRequirements(ResourceRequirements resourceRequirements) {
149+
Quantity request = getCpuQuantity(resourceRequirements, ResourceRequirementsType.REQUEST);
150+
Quantity limit = getCpuQuantity(resourceRequirements, ResourceRequirementsType.LIMIT);
151+
152+
if (request != null) {
153+
return request.toString();
154+
} else if (limit != null) {
155+
return limit.toString();
156+
} else {
157+
return null;
158+
}
159+
}
71160
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/DiskCapacity.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public DiskCapacity(Storage storage, int brokerId) {
5757
*/
5858
private static String getSizeInMiB(String size) {
5959
if (size == null) {
60-
return ResourceCapacityType.DISK.getDefaultResourceCapacity();
60+
return DEFAULT_DISK_CAPACITY_IN_MIB;
6161
}
6262
return String.valueOf(StorageUtils.convertTo(size, "Mi"));
6363
}
@@ -112,7 +112,12 @@ private static Map<String, String> generateNonJbodDiskConfig(Storage storage) {
112112
return Map.of(SINGLE_DISK, size);
113113
}
114114

115-
protected Object getJson() {
115+
/**
116+
* Returns capacity value as a JsonObject.
117+
*
118+
* @return The capacity value as a JsonObject.
119+
*/
120+
/* test */ Object getJson() {
116121
if (config.size() == 1 && config.containsKey(SINGLE_DISK)) {
117122
return config.get(SINGLE_DISK);
118123
} else {
@@ -126,6 +131,6 @@ protected Object getJson() {
126131

127132
@Override
128133
public String toString() {
129-
return this.getJson().toString();
134+
return getJson().toString();
130135
}
131136
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/InboundNetworkCapacity.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,9 @@
44
*/
55
package io.strimzi.operator.cluster.model.cruisecontrol;
66

7-
87
import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacity;
98
import io.strimzi.api.kafka.model.kafka.cruisecontrol.BrokerCapacityOverride;
109

11-
import static io.strimzi.operator.cluster.model.cruisecontrol.ResourceCapacityType.INBOUND_NETWORK;
12-
1310
/**
1411
* Cruise Control inbound network capacity configuration for broker.
1512
*/
@@ -28,6 +25,39 @@ public class InboundNetworkCapacity extends NetworkCapacity {
2825
* @param brokerCapacityOverride The brokerCapacityOverride for specific broker.
2926
*/
3027
public InboundNetworkCapacity(BrokerCapacity brokerCapacity, BrokerCapacityOverride brokerCapacityOverride) {
31-
super(getThroughputInKiB(INBOUND_NETWORK.processResourceCapacity(brokerCapacity, brokerCapacityOverride, null)));
28+
super(getThroughputInKiB(processResourceCapacity(brokerCapacity, brokerCapacityOverride)));
29+
}
30+
31+
/**
32+
* Given the configured brokerCapacity, broker-specific capacity override, and broker resource requirements,
33+
* returns the capacity for the resource.
34+
*
35+
* <p>
36+
* The broker-specific capacity override takes top precedence, then general brokerCapacity configuration,
37+
* then resource default.
38+
*
39+
* For example:
40+
* <ul>
41+
* <li> (1) The brokerCapacityOverride for a specific broker.
42+
* <li> (2) The general brokerCapacity configuration.
43+
* <li> (3) The resource default.
44+
* </ul>
45+
*
46+
* @param brokerCapacity The general brokerCapacity configuration.
47+
* @param brokerCapacityOverride The brokerCapacityOverride for specific broker.
48+
*
49+
* @return The capacity of resource represented as a String.
50+
*/
51+
private static String processResourceCapacity(BrokerCapacity brokerCapacity,
52+
BrokerCapacityOverride brokerCapacityOverride) {
53+
if (brokerCapacityOverride != null && brokerCapacityOverride.getInboundNetwork() != null) {
54+
return brokerCapacityOverride.getInboundNetwork();
55+
}
56+
57+
if (brokerCapacity != null && brokerCapacity.getInboundNetwork() != null) {
58+
return brokerCapacity.getInboundNetwork();
59+
}
60+
61+
return DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND;
3262
}
3363
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/NetworkCapacity.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
* Cruise Control network capacity configuration for broker.
1111
*/
1212
public class NetworkCapacity {
13-
protected static final String DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000KiB/s";
13+
/**
14+
* Default capacity value
15+
*/
16+
public static final String DEFAULT_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000KiB/s";
1417
protected String config;
1518

1619
protected NetworkCapacity(String config) {

0 commit comments

Comments
 (0)