Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/xpk/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@
from ..core.workload import get_workload_list
from ..utils.console import get_user_input, xpk_exit, xpk_print
from ..utils.file import write_tmp_file
from ..utils.kubectl import (
apply_kubectl_manifest,
update_jobset_manifest,
update_kueue_manifest,
)
from . import cluster_gcluster
from .common import set_cluster_command

Expand Down Expand Up @@ -317,6 +322,30 @@ def cluster_create(args) -> None:

install_kjob(args)

# Update memory limit of Jobset Controller Manager
k8s_api_client = setup_k8s_env(args)
jobset_manifest = update_jobset_manifest(args)
if jobset_manifest is None:
xpk_print(
"Updated jobset manifest is empty, not updating the jobset controller."
)
xpk_print("Updating Jobset Controller Manager")
return_code = apply_kubectl_manifest(k8s_api_client, [jobset_manifest])
if return_code != 0:
xpk_print(f'Updating Jobset Controller Manager returned ERROR {return_code}')
return return_code

kueue_manifest = update_kueue_manifest(args)
if kueue_manifest is None:
xpk_print(
"Updated Kueue manifest is empty, not updating the Kueue controller."
)
xpk_print("Updating Kueue Controller Manager")
return_code = apply_kubectl_manifest(k8s_api_client, [kueue_manifest])
if return_code != 0:
xpk_print(f'Updating Kueue Controller Manager returned ERROR {return_code}')
return return_code

if system.accelerator_type == AcceleratorType['GPU']:
prepare_gpus(args, system)

Expand Down
215 changes: 215 additions & 0 deletions src/xpk/utils/kubectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,21 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import math
import requests
import yaml
import packaging
from packaging.version import Version

from kubernetes.client.exceptions import ApiException
from kubernetes.dynamic import DynamicClient

from ..core.cluster import JOBSET_VERSION
from ..core.commands import run_command_for_value
from ..core.kueue import get_kueue_version
from .console import xpk_print

KUEUE_VERSION = 'v0.10.0'

def apply_kubectl_manifest(client, manifest) -> int:
xpk_print('Applying manifest')
Expand Down Expand Up @@ -58,3 +67,209 @@ def apply_kubectl_manifest(client, manifest) -> int:
xpk_print(f'Error applying {kind}: {e}')
status_code = 1
return status_code


def update_jobset_manifest(args):
"""Update the jobset manifest to increase the resources for the jobset controller manager.

Args:
args: user provided arguments for running the command.

Returns:
The updated jobset manifest.
"""
manifest_url = f"https://github.com/kubernetes-sigs/jobset/releases/download/{JOBSET_VERSION}/manifests.yaml"
manifest_content = None
# Fetch the manifest content
try:
response = requests.get(manifest_url, timeout=10)
response.raise_for_status() # Raise an exception for HTTP errors
manifest_content = response.text
except requests.exceptions.Timeout as e:
xpk_print(f"Error: Request to {manifest_url} after 10 seconds: {e}")
xpk_exit(1)
except requests.exceptions.RequestException as e:
xpk_print(f"Error fetching manifest from {manifest_url}: {e}")
xpk_exit(1)

if manifest_content is None:
xpk_print("Manifest content not found.")
xpk_exit(1)

# Load all YAML documents from the manifest
yaml_data_list = list(yaml.safe_load_all(manifest_content))
# Iterate through the yaml_data to find the Deployment for
# jobset-controller-manager
update_manifest = False
for yaml_data in yaml_data_list:
if (
yaml_data
and yaml_data.get("apiVersion") == "apps/v1"
and yaml_data.get("kind") == "Deployment"
and yaml_data.get("metadata", {}).get("name")
== "jobset-controller-manager"
):
# Found the Deployment, now modify the resources
containers = yaml_data["spec"]["template"]["spec"]["containers"]
for container in containers:
if container["name"] == "manager":
# Update resource limits and requests
current_cpu_request = (
container["resources"].get("requests", {}).get("cpu", "0m")
)
current_memory_request = (
container["resources"].get("requests", {}).get("memory", "0Mi")
)
current_memory_limit = (
container["resources"].get("limits", {}).get("memory", "0Mi")
)

# Define new values for comparison
cmd_total_node_num = (
'kubectl get node --no-headers | wc -l'
)
return_code, out = run_command_for_value(
cmd_total_node_num, 'Count total nodes', args
)
if return_code != 0:
xpk_exit(1)
new_cpu_request = "500m"
new_memory_request = "128Mi"
# 1.2MiB per VM or 4GiB (whichever is greater).
new_memory_limit = f"{max(math.ceil(int(out) * 1.2), 4096)}Mi"

if parse_resource_value(current_cpu_request) < parse_resource_value(
new_cpu_request
):
container["resources"]["requests"]["cpu"] = new_cpu_request
update_manifest = True
if parse_resource_value(
current_memory_request
) < parse_resource_value(new_memory_request):
container["resources"]["requests"]["memory"] = new_memory_request
update_manifest = True
if parse_resource_value(current_memory_limit) < parse_resource_value(
new_memory_limit
):
container["resources"]["limits"]["memory"] = new_memory_limit
update_manifest = True
container["resources"]["limits"]["memory"] = new_memory_limit
update_manifest = True
break
if update_manifest:
xpk_print("Jobset controller updation required.")
return yaml_data
xpk_print("Jobset controller no updation required.")

def update_kueue_manifest(args):
"""Update the kueue manifest to increase the resources for the kueue controller manager.

Args:
args: user provided arguments for running the command.

Returns:
The updated kueue manifest.
"""
err_code, kueue_version_installed = get_kueue_version(args)
if err_code == 0:
if Version(kueue_version_installed) < Version('v0.9.0') and Version(
KUEUE_VERSION
) >= Version('v0.9.0'):
xpk_print('Upgrading kueue on cluster from version < 0.9.0.')
upgrade_code = delete_multikueueclusters_definitions(args)
if upgrade_code != 0:
return upgrade_code
upgrade_code = delete_multikueueconfigs_definitions(args)
if upgrade_code != 0:
return upgrade_code
manifest_url = f"https://github.com/kubernetes-sigs/kueue/releases/download/{KUEUE_VERSION}/manifests.yaml"
manifest_content = None
# Fetch the manifest content
try:
response = requests.get(manifest_url, timeout=10)
response.raise_for_status() # Raise an exception for HTTP errors
manifest_content = response.text
except requests.exceptions.Timeout as e:
xpk_print(f"Error: Request to {manifest_url} after 10 seconds: {e}")
xpk_exit(1)
except requests.exceptions.RequestException as e:
xpk_print(f"Error fetching manifest from {manifest_url}: {e}")
xpk_exit(1)

if manifest_content is None:
xpk_print("Manifest content not found.")
xpk_exit(1)

# Load all YAML documents from the manifest
yaml_data_list = list(yaml.safe_load_all(manifest_content))
# Iterate through the yaml_data to find the Deployment for
# kueue-controller-manager
update_manifest = False
for yaml_data in yaml_data_list:
if (
yaml_data
and yaml_data.get("apiVersion") == "apps/v1"
and yaml_data.get("kind") == "Deployment"
and yaml_data.get("metadata", {}).get("name")
== "kueue-controller-manager"
):
# Found the Deployment, now modify the resources
containers = yaml_data["spec"]["template"]["spec"]["containers"]
for container in containers:
if container["name"] == "manager":
# Update resource limits and requests
current_cpu_request = (
container["resources"].get("requests", {}).get("cpu", "0m")
)
current_memory_request = (
container["resources"].get("requests", {}).get("memory", "0Mi")
)
current_memory_limit = (
container["resources"].get("limits", {}).get("memory", "0Mi")
)

# Define new values for comparison
cmd_total_node_num = (
'kubectl get node --no-headers | wc -l'
)
return_code, out = run_command_for_value(
cmd_total_node_num, 'Count total nodes', args
)
if return_code != 0:
xpk_exit(1)
new_cpu_request = "500m"
new_memory_request = "128Mi"
# 1.2MiB per VM or 4GiB (whichever is greater).
new_memory_limit = f"{max(math.ceil(int(out) * 1.2), 4096)}Mi"

if parse_resource_value(current_cpu_request) < parse_resource_value(
new_cpu_request
):
container["resources"]["requests"]["cpu"] = new_cpu_request
update_manifest = True
if parse_resource_value(
current_memory_request
) < parse_resource_value(new_memory_request):
container["resources"]["requests"]["memory"] = new_memory_request
update_manifest = True
if parse_resource_value(current_memory_limit) < parse_resource_value(
new_memory_limit
):
container["resources"]["limits"]["memory"] = new_memory_limit
update_manifest = True
container["resources"]["limits"]["memory"] = new_memory_limit
update_manifest = True
break
if update_manifest:
xpk_print("Kueue controller updation required.")
return yaml_data
xpk_print("Kueue controller no updation required.")

def parse_resource_value(value) -> int:
if value.endswith("m"):
return int(value[:-1])
if value.endswith("Mi"):
return int(value[:-2])
if value.endswith("Gi"):
return int(value[:-2]) * 1024
return int(value)
Loading