Skip to content

Commit 4134f52

Browse files
Merge pull request #33670 from alex-hunt-materialize/configurable_balancerd_pods
support configuring replicas for balancerd and console in MZ CR
2 parents a86406c + 362d02a commit 4134f52

File tree

4 files changed

+93
-51
lines changed

4 files changed

+93
-51
lines changed

src/cloud-resources/src/crd/materialize.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ pub mod v1alpha1 {
9999
pub balancerd_resource_requirements: Option<ResourceRequirements>,
100100
// Resource requirements for the console pod
101101
pub console_resource_requirements: Option<ResourceRequirements>,
102+
// Number of balancerd pods to create
103+
pub balancerd_replicas: Option<i32>,
104+
// Number of console pods to create
105+
pub console_replicas: Option<i32>,
102106

103107
// Name of the kubernetes service account to use.
104108
// If not set, we will create one with the same name as this Materialize object.
@@ -268,6 +272,14 @@ pub mod v1alpha1 {
268272
self.name_prefixed("balancerd-external-tls")
269273
}
270274

275+
pub fn balancerd_replicas(&self) -> i32 {
276+
self.spec.balancerd_replicas.unwrap_or(2)
277+
}
278+
279+
pub fn console_replicas(&self) -> i32 {
280+
self.spec.console_replicas.unwrap_or(2)
281+
}
282+
271283
pub fn console_configmap_name(&self) -> String {
272284
self.name_prefixed("console")
273285
}

src/orchestratord/src/controller/materialize/balancer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ fn create_balancerd_deployment_object(
324324
};
325325

326326
let deployment_spec = DeploymentSpec {
327-
replicas: Some(1),
327+
replicas: Some(mz.balancerd_replicas()),
328328
selector: LabelSelector {
329329
match_labels: Some(pod_template_labels.clone()),
330330
..Default::default()

src/orchestratord/src/controller/materialize/console.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ ssl_certificate_key /nginx/tls/tls.key;",
399399
};
400400

401401
let deployment_spec = DeploymentSpec {
402-
replicas: Some(2),
402+
replicas: Some(mz.console_replicas()),
403403
selector: LabelSelector {
404404
match_labels: Some(pod_template_labels.clone()),
405405
..Default::default()

test/orchestratord/mzcompose.py

Lines changed: 79 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -56,57 +56,48 @@ def get_image(image: str, tag: str | None) -> str:
5656
return f'{image.rsplit(":", 1)[0]}:{tag}'
5757

5858

59-
def get_orchestratord_data() -> dict[str, Any]:
59+
def get_pod_data(
60+
labels: dict[str, str], namespace="materialize-environment"
61+
) -> dict[str, Any]:
6062
return json.loads(
6163
spawn.capture(
6264
[
6365
"kubectl",
6466
"get",
6567
"pod",
6668
"-l",
67-
"app.kubernetes.io/instance=operator",
69+
",".join(f"{key}={value}" for key, value in labels.items()),
6870
"-n",
69-
"materialize",
71+
namespace,
7072
"-o",
7173
"json",
7274
]
7375
)
7476
)
7577

7678

79+
def get_orchestratord_data() -> dict[str, Any]:
80+
return get_pod_data(
81+
labels={"app.kubernetes.io/instance": "operator"},
82+
namespace="materialize",
83+
)
84+
85+
7786
def get_balancerd_data() -> dict[str, Any]:
78-
return json.loads(
79-
spawn.capture(
80-
[
81-
"kubectl",
82-
"get",
83-
"pod",
84-
"-l",
85-
"app=balancerd",
86-
"-n",
87-
"materialize-environment",
88-
"-o",
89-
"json",
90-
]
91-
)
87+
return get_pod_data(
88+
labels={"materialize.cloud/app": "balancerd"},
89+
)
90+
91+
92+
def get_console_data() -> dict[str, Any]:
93+
return get_pod_data(
94+
labels={"materialize.cloud/app": "console"},
9295
)
9396

9497

9598
def get_environmentd_data() -> dict[str, Any]:
96-
return json.loads(
97-
spawn.capture(
98-
[
99-
"kubectl",
100-
"get",
101-
"pod",
102-
"-l",
103-
"app=environmentd",
104-
"-n",
105-
"materialize-environment",
106-
"-o",
107-
"json",
108-
]
109-
)
99+
return get_pod_data(
100+
labels={"materialize.cloud/app": "environmentd"},
110101
)
111102

112103

@@ -528,6 +519,63 @@ def validate(self, mods: dict[type[Modification], Any]) -> None:
528519
), f"Expected no {expected} in environmentd args, but found it: {args}"
529520

530521

522+
class BalancerdReplicas(Modification):
523+
@classmethod
524+
def values(cls) -> list[Any]:
525+
return [None, 1, 2]
526+
527+
@classmethod
528+
def default(cls) -> Any:
529+
return None
530+
531+
def modify(self, definition: dict[str, Any]) -> None:
532+
if self.value is not None:
533+
definition["materialize"]["spec"]["balancerdReplicas"] = self.value
534+
535+
def validate(self, mods: dict[type[Modification], Any]) -> None:
536+
if not mods[BalancerdEnabled]:
537+
return
538+
539+
def check_replicas():
540+
balancerd = get_balancerd_data()
541+
num_pods = len(balancerd["items"])
542+
expected = self.value if self.value is not None else 2
543+
assert (
544+
num_pods == expected
545+
), f"Expected {expected} balancerd pods, but found {num_pods}"
546+
547+
retry(check_replicas, 120)
548+
549+
550+
class ConsoleReplicas(Modification):
551+
@classmethod
552+
def values(cls) -> list[Any]:
553+
return [None, 1, 2]
554+
555+
@classmethod
556+
def default(cls) -> Any:
557+
return None
558+
559+
def modify(self, definition: dict[str, Any]) -> None:
560+
if self.value is not None:
561+
definition["materialize"]["spec"]["consoleReplicas"] = self.value
562+
563+
def validate(self, mods: dict[type[Modification], Any]) -> None:
564+
if not mods[ConsoleEnabled]:
565+
return
566+
567+
def check_replicas():
568+
console = get_console_data()
569+
num_pods = len(console["items"])
570+
expected = self.value if self.value is not None else 2
571+
assert (
572+
num_pods == expected
573+
), f"Expected {expected} console pods, but found {num_pods}"
574+
575+
# console doesn't get launched until last
576+
retry(check_replicas, 120)
577+
578+
531579
def validate_cluster_replica_size(
532580
size: dict[str, Any], swap_enabled: bool, storage_class_name_set: bool
533581
):
@@ -571,24 +619,6 @@ def validate_container_resources(
571619
assert resources["requests"]["memory"] == resources["limits"]["memory"]
572620

573621

574-
def get_pod_data(labels: dict[str, str]) -> dict[str, Any]:
575-
return json.loads(
576-
spawn.capture(
577-
[
578-
"kubectl",
579-
"get",
580-
"pod",
581-
"-l",
582-
",".join(f"{key}={value}" for key, value in labels.items()),
583-
"-n",
584-
"materialize-environment",
585-
"-o",
586-
"json",
587-
]
588-
)
589-
)
590-
591-
592622
class SwapEnabledGlobal(Modification):
593623
@classmethod
594624
def values(cls) -> list[Any]:

0 commit comments

Comments
 (0)