From a875fdf3a267f48509f9ed2dc7e9f17f1dcc8094 Mon Sep 17 00:00:00 2001 From: Zheng Chen Date: Mon, 16 Jun 2025 19:44:22 +0000 Subject: [PATCH 01/11] Update Kueue and Jobset controller default limit value --- src/xpk/commands/cluster.py | 31 +++++- src/xpk/utils/kubectl.py | 215 ++++++++++++++++++++++++++++++++++++ 2 files changed, 245 insertions(+), 1 deletion(-) diff --git a/src/xpk/commands/cluster.py b/src/xpk/commands/cluster.py index 83eb2b07..bb1b40e5 100644 --- a/src/xpk/commands/cluster.py +++ b/src/xpk/commands/cluster.py @@ -73,6 +73,11 @@ from ..core.workload import get_workload_list from ..utils.console import get_user_input, xpk_exit, xpk_print from ..utils.file import write_tmp_file +from ..utils.kubectl import ( + apply_kubectl_manifest, + update_jobset_manifest, + update_kueue_manifest, +) from . import cluster_gcluster from .common import set_cluster_command @@ -315,8 +320,32 @@ def cluster_create(args) -> None: install_kueue(args, system, autoprovisioning_config) - install_kjob(args) + # install_kjob(args) + # Update memory limit of Jobset Controller Manager + k8s_api_client = setup_k8s_env(args) + jobset_manifest = update_jobset_manifest(args) + if jobset_manifest is None: + xpk_print( + "Updated jobset manifest is empty, not updating the jobset controller." + ) + xpk_print("Updating Jobset Controller Manager") + return_code = apply_kubectl_manifest(k8s_api_client, [jobset_manifest]) + if return_code != 0: + xpk_print(f'Updating Jobset Controller Manager returned ERROR {return_code}') + return return_code + + kueue_manifest = update_kueue_manifest(args) + if kueue_manifest is None: + xpk_print( + "Updated Kueue manifest is empty, not updating the Kueue controller." + ) + xpk_print("Updating Kueue Controller Manager") + return_code = apply_kubectl_manifest(k8s_api_client, [kueue_manifest]) + if return_code != 0: + xpk_print(f'Updating Kueue Controller Manager returned ERROR {return_code}') + return return_code + if system.accelerator_type == AcceleratorType['GPU']: prepare_gpus(args, system) diff --git a/src/xpk/utils/kubectl.py b/src/xpk/utils/kubectl.py index c48e093e..9638eb65 100644 --- a/src/xpk/utils/kubectl.py +++ b/src/xpk/utils/kubectl.py @@ -13,12 +13,21 @@ See the License for the specific language governing permissions and limitations under the License. """ +import math +import requests +import yaml +import packaging +from packaging.version import Version from kubernetes.client.exceptions import ApiException from kubernetes.dynamic import DynamicClient +from ..core.cluster import JOBSET_VERSION +from ..core.commands import run_command_for_value +from ..core.kueue import get_kueue_version from .console import xpk_print +KUEUE_VERSION = 'v0.10.0' def apply_kubectl_manifest(client, manifest) -> int: xpk_print('Applying manifest') @@ -58,3 +67,209 @@ def apply_kubectl_manifest(client, manifest) -> int: xpk_print(f'Error applying {kind}: {e}') status_code = 1 return status_code + + +def update_jobset_manifest(args): + """Update the jobset manifest to increase the resources for the jobset controller manager. + + Args: + args: user provided arguments for running the command. + + Returns: + The updated jobset manifest. + """ + manifest_url = f"https://github.com/kubernetes-sigs/jobset/releases/download/{JOBSET_VERSION}/manifests.yaml" + manifest_content = None + # Fetch the manifest content + try: + response = requests.get(manifest_url, timeout=10) + response.raise_for_status() # Raise an exception for HTTP errors + manifest_content = response.text + except requests.exceptions.Timeout as e: + xpk_print(f"Error: Request to {manifest_url} after 10 seconds: {e}") + xpk_exit(1) + except requests.exceptions.RequestException as e: + xpk_print(f"Error fetching manifest from {manifest_url}: {e}") + xpk_exit(1) + + if manifest_content is None: + xpk_print("Manifest content not found.") + xpk_exit(1) + + # Load all YAML documents from the manifest + yaml_data_list = list(yaml.safe_load_all(manifest_content)) + # Iterate through the yaml_data to find the Deployment for + # jobset-controller-manager + update_manifest = False + for yaml_data in yaml_data_list: + if ( + yaml_data + and yaml_data.get("apiVersion") == "apps/v1" + and yaml_data.get("kind") == "Deployment" + and yaml_data.get("metadata", {}).get("name") + == "jobset-controller-manager" + ): + # Found the Deployment, now modify the resources + containers = yaml_data["spec"]["template"]["spec"]["containers"] + for container in containers: + if container["name"] == "manager": + # Update resource limits and requests + current_cpu_request = ( + container["resources"].get("requests", {}).get("cpu", "0m") + ) + current_memory_request = ( + container["resources"].get("requests", {}).get("memory", "0Mi") + ) + current_memory_limit = ( + container["resources"].get("limits", {}).get("memory", "0Mi") + ) + + # Define new values for comparison + cmd_total_node_num = ( + 'kubectl get node --no-headers | wc -l' + ) + return_code, out = run_command_for_value( + cmd_total_node_num, 'Count total nodes', args + ) + if return_code != 0: + xpk_exit(1) + new_cpu_request = "500m" + new_memory_request = "128Mi" + # 1.2MiB per VM or 4GiB (whichever is greater). + new_memory_limit = f"{max(math.ceil(int(out) * 1.2), 4096)}Mi" + + if parse_resource_value(current_cpu_request) < parse_resource_value( + new_cpu_request + ): + container["resources"]["requests"]["cpu"] = new_cpu_request + update_manifest = True + if parse_resource_value( + current_memory_request + ) < parse_resource_value(new_memory_request): + container["resources"]["requests"]["memory"] = new_memory_request + update_manifest = True + if parse_resource_value(current_memory_limit) < parse_resource_value( + new_memory_limit + ): + container["resources"]["limits"]["memory"] = new_memory_limit + update_manifest = True + container["resources"]["limits"]["memory"] = new_memory_limit + update_manifest = True + break + if update_manifest: + xpk_print("Jobset controller updation required.") + return yaml_data + xpk_print("Jobset controller no updation required.") + +def update_kueue_manifest(args): + """Update the kueue manifest to increase the resources for the kueue controller manager. + + Args: + args: user provided arguments for running the command. + + Returns: + The updated kueue manifest. + """ + err_code, kueue_version_installed = get_kueue_version(args) + if err_code == 0: + if Version(kueue_version_installed) < Version('v0.9.0') and Version( + KUEUE_VERSION + ) >= Version('v0.9.0'): + xpk_print('Upgrading kueue on cluster from version < 0.9.0.') + upgrade_code = delete_multikueueclusters_definitions(args) + if upgrade_code != 0: + return upgrade_code + upgrade_code = delete_multikueueconfigs_definitions(args) + if upgrade_code != 0: + return upgrade_code + manifest_url = f"https://github.com/kubernetes-sigs/kueue/releases/download/{KUEUE_VERSION}/manifests.yaml" + manifest_content = None + # Fetch the manifest content + try: + response = requests.get(manifest_url, timeout=10) + response.raise_for_status() # Raise an exception for HTTP errors + manifest_content = response.text + except requests.exceptions.Timeout as e: + xpk_print(f"Error: Request to {manifest_url} after 10 seconds: {e}") + xpk_exit(1) + except requests.exceptions.RequestException as e: + xpk_print(f"Error fetching manifest from {manifest_url}: {e}") + xpk_exit(1) + + if manifest_content is None: + xpk_print("Manifest content not found.") + xpk_exit(1) + + # Load all YAML documents from the manifest + yaml_data_list = list(yaml.safe_load_all(manifest_content)) + # Iterate through the yaml_data to find the Deployment for + # kueue-controller-manager + update_manifest = False + for yaml_data in yaml_data_list: + if ( + yaml_data + and yaml_data.get("apiVersion") == "apps/v1" + and yaml_data.get("kind") == "Deployment" + and yaml_data.get("metadata", {}).get("name") + == "kueue-controller-manager" + ): + # Found the Deployment, now modify the resources + containers = yaml_data["spec"]["template"]["spec"]["containers"] + for container in containers: + if container["name"] == "manager": + # Update resource limits and requests + current_cpu_request = ( + container["resources"].get("requests", {}).get("cpu", "0m") + ) + current_memory_request = ( + container["resources"].get("requests", {}).get("memory", "0Mi") + ) + current_memory_limit = ( + container["resources"].get("limits", {}).get("memory", "0Mi") + ) + + # Define new values for comparison + cmd_total_node_num = ( + 'kubectl get node --no-headers | wc -l' + ) + return_code, out = run_command_for_value( + cmd_total_node_num, 'Count total nodes', args + ) + if return_code != 0: + xpk_exit(1) + new_cpu_request = "500m" + new_memory_request = "128Mi" + # 1.2MiB per VM or 4GiB (whichever is greater). + new_memory_limit = f"{max(math.ceil(int(out) * 1.2), 4096)}Mi" + + if parse_resource_value(current_cpu_request) < parse_resource_value( + new_cpu_request + ): + container["resources"]["requests"]["cpu"] = new_cpu_request + update_manifest = True + if parse_resource_value( + current_memory_request + ) < parse_resource_value(new_memory_request): + container["resources"]["requests"]["memory"] = new_memory_request + update_manifest = True + if parse_resource_value(current_memory_limit) < parse_resource_value( + new_memory_limit + ): + container["resources"]["limits"]["memory"] = new_memory_limit + update_manifest = True + container["resources"]["limits"]["memory"] = new_memory_limit + update_manifest = True + break + if update_manifest: + xpk_print("Kueue controller updation required.") + return yaml_data + xpk_print("Kueue controller no updation required.") + +def parse_resource_value(value) -> int: + if value.endswith("m"): + return int(value[:-1]) + if value.endswith("Mi"): + return int(value[:-2]) + if value.endswith("Gi"): + return int(value[:-2]) * 1024 + return int(value) \ No newline at end of file From 52915c95a97c8f639bfc717f35f9230f18a27edd Mon Sep 17 00:00:00 2001 From: Zheng Chen Date: Mon, 16 Jun 2025 21:37:42 +0000 Subject: [PATCH 02/11] Update cluster.py --- src/xpk/commands/cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/xpk/commands/cluster.py b/src/xpk/commands/cluster.py index bb1b40e5..a0ddae3f 100644 --- a/src/xpk/commands/cluster.py +++ b/src/xpk/commands/cluster.py @@ -320,7 +320,7 @@ def cluster_create(args) -> None: install_kueue(args, system, autoprovisioning_config) - # install_kjob(args) + install_kjob(args) # Update memory limit of Jobset Controller Manager k8s_api_client = setup_k8s_env(args) From 734a724439bd73e4db6b3737d7b53735471f1357 Mon Sep 17 00:00:00 2001 From: Zheng Chen Date: Mon, 23 Jun 2025 15:28:58 +0000 Subject: [PATCH 03/11] Split into get and update manifest --- src/xpk/utils/kubectl.py | 68 +++++++++++++++++++++++++--------------- 1 file changed, 43 insertions(+), 25 deletions(-) diff --git a/src/xpk/utils/kubectl.py b/src/xpk/utils/kubectl.py index 9638eb65..c67d973e 100644 --- a/src/xpk/utils/kubectl.py +++ b/src/xpk/utils/kubectl.py @@ -28,6 +28,8 @@ from .console import xpk_print KUEUE_VERSION = 'v0.10.0' +MEMORY_SIZE_PER_VM = 1.2 +MIN_MEMORY_LIMIT_SIZE = 4096 def apply_kubectl_manifest(client, manifest) -> int: xpk_print('Applying manifest') @@ -68,12 +70,8 @@ def apply_kubectl_manifest(client, manifest) -> int: status_code = 1 return status_code - -def update_jobset_manifest(args): - """Update the jobset manifest to increase the resources for the jobset controller manager. - - Args: - args: user provided arguments for running the command. +def get_jobset_manifest(): + """Get the jobset manifest. Returns: The updated jobset manifest. @@ -85,6 +83,7 @@ def update_jobset_manifest(args): response = requests.get(manifest_url, timeout=10) response.raise_for_status() # Raise an exception for HTTP errors manifest_content = response.text + return manifest_content except requests.exceptions.Timeout as e: xpk_print(f"Error: Request to {manifest_url} after 10 seconds: {e}") xpk_exit(1) @@ -96,6 +95,17 @@ def update_jobset_manifest(args): xpk_print("Manifest content not found.") xpk_exit(1) + +def update_jobset_manifest(args): + """Update the jobset manifest to increase the resources for the jobset controller manager. + + Args: + args: user provided arguments for running the command. + + Returns: + The updated jobset manifest. + """ + manifest_content = get_jobset_manifest() # Load all YAML documents from the manifest yaml_data_list = list(yaml.safe_load_all(manifest_content)) # Iterate through the yaml_data to find the Deployment for @@ -136,7 +146,7 @@ def update_jobset_manifest(args): new_cpu_request = "500m" new_memory_request = "128Mi" # 1.2MiB per VM or 4GiB (whichever is greater). - new_memory_limit = f"{max(math.ceil(int(out) * 1.2), 4096)}Mi" + new_memory_limit = f"{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi" if parse_resource_value(current_cpu_request) < parse_resource_value( new_cpu_request @@ -161,27 +171,12 @@ def update_jobset_manifest(args): return yaml_data xpk_print("Jobset controller no updation required.") -def update_kueue_manifest(args): - """Update the kueue manifest to increase the resources for the kueue controller manager. - - Args: - args: user provided arguments for running the command. +def get_kueue_manifest(): + """Get the kueue manifest. Returns: The updated kueue manifest. """ - err_code, kueue_version_installed = get_kueue_version(args) - if err_code == 0: - if Version(kueue_version_installed) < Version('v0.9.0') and Version( - KUEUE_VERSION - ) >= Version('v0.9.0'): - xpk_print('Upgrading kueue on cluster from version < 0.9.0.') - upgrade_code = delete_multikueueclusters_definitions(args) - if upgrade_code != 0: - return upgrade_code - upgrade_code = delete_multikueueconfigs_definitions(args) - if upgrade_code != 0: - return upgrade_code manifest_url = f"https://github.com/kubernetes-sigs/kueue/releases/download/{KUEUE_VERSION}/manifests.yaml" manifest_content = None # Fetch the manifest content @@ -189,6 +184,7 @@ def update_kueue_manifest(args): response = requests.get(manifest_url, timeout=10) response.raise_for_status() # Raise an exception for HTTP errors manifest_content = response.text + return manifest_content except requests.exceptions.Timeout as e: xpk_print(f"Error: Request to {manifest_url} after 10 seconds: {e}") xpk_exit(1) @@ -200,6 +196,28 @@ def update_kueue_manifest(args): xpk_print("Manifest content not found.") xpk_exit(1) +def update_kueue_manifest(args): + """Update the kueue manifest to increase the resources for the kueue controller manager. + + Args: + args: user provided arguments for running the command. + + Returns: + The updated kueue manifest. + """ + err_code, kueue_version_installed = get_kueue_version(args) + if err_code == 0: + if Version(kueue_version_installed) < Version('v0.9.0') and Version( + KUEUE_VERSION + ) >= Version('v0.9.0'): + xpk_print('Upgrading kueue on cluster from version < 0.9.0.') + upgrade_code = delete_multikueueclusters_definitions(args) + if upgrade_code != 0: + return upgrade_code + upgrade_code = delete_multikueueconfigs_definitions(args) + if upgrade_code != 0: + return upgrade_code + manifest_content = get_kueue_manifest() # Load all YAML documents from the manifest yaml_data_list = list(yaml.safe_load_all(manifest_content)) # Iterate through the yaml_data to find the Deployment for @@ -240,7 +258,7 @@ def update_kueue_manifest(args): new_cpu_request = "500m" new_memory_request = "128Mi" # 1.2MiB per VM or 4GiB (whichever is greater). - new_memory_limit = f"{max(math.ceil(int(out) * 1.2), 4096)}Mi" + new_memory_limit = f"{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi" if parse_resource_value(current_cpu_request) < parse_resource_value( new_cpu_request From f1a26d8abe17a3ab78ac81241fa7222b4304fe69 Mon Sep 17 00:00:00 2001 From: Zheng Chen Date: Mon, 23 Jun 2025 18:04:46 +0000 Subject: [PATCH 04/11] Remove dup lines --- src/xpk/utils/kubectl.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/xpk/utils/kubectl.py b/src/xpk/utils/kubectl.py index c67d973e..47f5f56a 100644 --- a/src/xpk/utils/kubectl.py +++ b/src/xpk/utils/kubectl.py @@ -163,8 +163,6 @@ def update_jobset_manifest(args): ): container["resources"]["limits"]["memory"] = new_memory_limit update_manifest = True - container["resources"]["limits"]["memory"] = new_memory_limit - update_manifest = True break if update_manifest: xpk_print("Jobset controller updation required.") @@ -275,8 +273,6 @@ def update_kueue_manifest(args): ): container["resources"]["limits"]["memory"] = new_memory_limit update_manifest = True - container["resources"]["limits"]["memory"] = new_memory_limit - update_manifest = True break if update_manifest: xpk_print("Kueue controller updation required.") From 66e263f9581778d4f2c4d8d6f9101a409d4e6dee Mon Sep 17 00:00:00 2001 From: Zheng Chen Date: Tue, 24 Jun 2025 15:05:49 +0000 Subject: [PATCH 05/11] Organize code --- src/xpk/commands/cluster.py | 44 +++++++++++++++-------------- src/xpk/utils/kubectl.py | 55 ++++++------------------------------- 2 files changed, 32 insertions(+), 67 deletions(-) diff --git a/src/xpk/commands/cluster.py b/src/xpk/commands/cluster.py index a0ddae3f..b128dfca 100644 --- a/src/xpk/commands/cluster.py +++ b/src/xpk/commands/cluster.py @@ -75,8 +75,8 @@ from ..utils.file import write_tmp_file from ..utils.kubectl import ( apply_kubectl_manifest, - update_jobset_manifest, - update_kueue_manifest, + update_jobset_resources_if_necessary, + update_kueue_resources_if_necessary, ) from . import cluster_gcluster from .common import set_cluster_command @@ -324,27 +324,29 @@ def cluster_create(args) -> None: # Update memory limit of Jobset Controller Manager k8s_api_client = setup_k8s_env(args) - jobset_manifest = update_jobset_manifest(args) - if jobset_manifest is None: - xpk_print( + update_jobset_command_code, jobset_manifest = update_jobset_resources_if_necessary(args) + if update_jobset_command_code !=0: + if jobset_manifest is None: + xpk_print( "Updated jobset manifest is empty, not updating the jobset controller." - ) - xpk_print("Updating Jobset Controller Manager") - return_code = apply_kubectl_manifest(k8s_api_client, [jobset_manifest]) - if return_code != 0: - xpk_print(f'Updating Jobset Controller Manager returned ERROR {return_code}') - return return_code + ) + xpk_print("Updating Jobset Controller Manager") + return_code = apply_kubectl_manifest(k8s_api_client, [jobset_manifest]) + if return_code != 0: + xpk_print(f'Updating Jobset Controller Manager returned ERROR {return_code}') + return return_code - kueue_manifest = update_kueue_manifest(args) - if kueue_manifest is None: - xpk_print( - "Updated Kueue manifest is empty, not updating the Kueue controller." - ) - xpk_print("Updating Kueue Controller Manager") - return_code = apply_kubectl_manifest(k8s_api_client, [kueue_manifest]) - if return_code != 0: - xpk_print(f'Updating Kueue Controller Manager returned ERROR {return_code}') - return return_code + update_kueue_command_code, kueue_manifest = update_kueue_resources_if_necessary(args) + if update_kueue_command_code !=0: + if kueue_manifest is None: + xpk_print( + "Updated Kueue manifest is empty, not updating the Kueue controller." + ) + xpk_print("Updating Kueue Controller Manager") + return_code = apply_kubectl_manifest(k8s_api_client, [kueue_manifest]) + if return_code != 0: + xpk_print(f'Updating Kueue Controller Manager returned ERROR {return_code}') + return return_code if system.accelerator_type == AcceleratorType['GPU']: prepare_gpus(args, system) diff --git a/src/xpk/utils/kubectl.py b/src/xpk/utils/kubectl.py index 47f5f56a..01992920 100644 --- a/src/xpk/utils/kubectl.py +++ b/src/xpk/utils/kubectl.py @@ -24,10 +24,9 @@ from ..core.cluster import JOBSET_VERSION from ..core.commands import run_command_for_value -from ..core.kueue import get_kueue_version +from ..core.kueue import get_kueue_version, KUEUE_VERSION from .console import xpk_print -KUEUE_VERSION = 'v0.10.0' MEMORY_SIZE_PER_VM = 1.2 MIN_MEMORY_LIMIT_SIZE = 4096 @@ -96,7 +95,7 @@ def get_jobset_manifest(): xpk_exit(1) -def update_jobset_manifest(args): +def update_jobset_resources_if_necessary(args): """Update the jobset manifest to increase the resources for the jobset controller manager. Args: @@ -123,13 +122,7 @@ def update_jobset_manifest(args): containers = yaml_data["spec"]["template"]["spec"]["containers"] for container in containers: if container["name"] == "manager": - # Update resource limits and requests - current_cpu_request = ( - container["resources"].get("requests", {}).get("cpu", "0m") - ) - current_memory_request = ( - container["resources"].get("requests", {}).get("memory", "0Mi") - ) + # Update memory limit current_memory_limit = ( container["resources"].get("limits", {}).get("memory", "0Mi") ) @@ -143,21 +136,8 @@ def update_jobset_manifest(args): ) if return_code != 0: xpk_exit(1) - new_cpu_request = "500m" - new_memory_request = "128Mi" # 1.2MiB per VM or 4GiB (whichever is greater). new_memory_limit = f"{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi" - - if parse_resource_value(current_cpu_request) < parse_resource_value( - new_cpu_request - ): - container["resources"]["requests"]["cpu"] = new_cpu_request - update_manifest = True - if parse_resource_value( - current_memory_request - ) < parse_resource_value(new_memory_request): - container["resources"]["requests"]["memory"] = new_memory_request - update_manifest = True if parse_resource_value(current_memory_limit) < parse_resource_value( new_memory_limit ): @@ -166,8 +146,9 @@ def update_jobset_manifest(args): break if update_manifest: xpk_print("Jobset controller updation required.") - return yaml_data + return update_manifest, yaml_data xpk_print("Jobset controller no updation required.") + return update_manifest, yaml_data def get_kueue_manifest(): """Get the kueue manifest. @@ -194,7 +175,7 @@ def get_kueue_manifest(): xpk_print("Manifest content not found.") xpk_exit(1) -def update_kueue_manifest(args): +def update_kueue_resources_if_necessary(args): """Update the kueue manifest to increase the resources for the kueue controller manager. Args: @@ -233,13 +214,7 @@ def update_kueue_manifest(args): containers = yaml_data["spec"]["template"]["spec"]["containers"] for container in containers: if container["name"] == "manager": - # Update resource limits and requests - current_cpu_request = ( - container["resources"].get("requests", {}).get("cpu", "0m") - ) - current_memory_request = ( - container["resources"].get("requests", {}).get("memory", "0Mi") - ) + # Update memory limit current_memory_limit = ( container["resources"].get("limits", {}).get("memory", "0Mi") ) @@ -253,21 +228,8 @@ def update_kueue_manifest(args): ) if return_code != 0: xpk_exit(1) - new_cpu_request = "500m" - new_memory_request = "128Mi" # 1.2MiB per VM or 4GiB (whichever is greater). new_memory_limit = f"{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi" - - if parse_resource_value(current_cpu_request) < parse_resource_value( - new_cpu_request - ): - container["resources"]["requests"]["cpu"] = new_cpu_request - update_manifest = True - if parse_resource_value( - current_memory_request - ) < parse_resource_value(new_memory_request): - container["resources"]["requests"]["memory"] = new_memory_request - update_manifest = True if parse_resource_value(current_memory_limit) < parse_resource_value( new_memory_limit ): @@ -276,8 +238,9 @@ def update_kueue_manifest(args): break if update_manifest: xpk_print("Kueue controller updation required.") - return yaml_data + return update_manifest, yaml_data xpk_print("Kueue controller no updation required.") + return update_manifest, yaml_data def parse_resource_value(value) -> int: if value.endswith("m"): From 389a62ed37f45c5fee2d73d04564129101d27150 Mon Sep 17 00:00:00 2001 From: Zheng Chen Date: Wed, 25 Jun 2025 19:18:29 +0000 Subject: [PATCH 06/11] Redesign the feature --- src/xpk/commands/cluster.py | 41 +++------- src/xpk/core/jobset.py | 146 ++++++++++++++++++++++++++++++++++++ src/xpk/core/kueue.py | 129 ++++++++++++++++++++++++++++++- src/xpk/utils/kubectl.py | 101 ++----------------------- 4 files changed, 290 insertions(+), 127 deletions(-) create mode 100644 src/xpk/core/jobset.py diff --git a/src/xpk/commands/cluster.py b/src/xpk/commands/cluster.py index b128dfca..20308df4 100644 --- a/src/xpk/commands/cluster.py +++ b/src/xpk/commands/cluster.py @@ -42,12 +42,14 @@ get_gke_server_config, zone_to_region, ) +from ..core.jobset import update_jobset_resources_if_necessary from ..core.kjob import apply_kjob_crds, prepare_kjob, verify_kjob_installed from ..core.kueue import ( cluster_preheat_yml, install_kueue_crs, install_kueue_on_cluster, wait_for_kueue_available, + update_kueue_resources_if_necessary, ) from ..core.nap import enable_autoprovisioning_on_cluster from ..core.network import ( @@ -73,11 +75,7 @@ from ..core.workload import get_workload_list from ..utils.console import get_user_input, xpk_exit, xpk_print from ..utils.file import write_tmp_file -from ..utils.kubectl import ( - apply_kubectl_manifest, - update_jobset_resources_if_necessary, - update_kueue_resources_if_necessary, -) +from ..utils.kubectl import apply_kubectl_manifest from . import cluster_gcluster from .common import set_cluster_command @@ -313,6 +311,9 @@ def cluster_create(args) -> None: set_jobset_on_cluster_code = set_jobset_on_cluster(args) if set_jobset_on_cluster_code != 0: xpk_exit(set_jobset_on_cluster_code) + update_jobset_resources_code = update_jobset_resources_if_necessary(args) + if update_jobset_resources_code != 0: + xpk_exit(update_jobset_resources_code) set_pathways_job_on_cluster_code = set_pathways_job_on_cluster(args) if set_pathways_job_on_cluster_code != 0: @@ -321,32 +322,6 @@ def cluster_create(args) -> None: install_kueue(args, system, autoprovisioning_config) install_kjob(args) - - # Update memory limit of Jobset Controller Manager - k8s_api_client = setup_k8s_env(args) - update_jobset_command_code, jobset_manifest = update_jobset_resources_if_necessary(args) - if update_jobset_command_code !=0: - if jobset_manifest is None: - xpk_print( - "Updated jobset manifest is empty, not updating the jobset controller." - ) - xpk_print("Updating Jobset Controller Manager") - return_code = apply_kubectl_manifest(k8s_api_client, [jobset_manifest]) - if return_code != 0: - xpk_print(f'Updating Jobset Controller Manager returned ERROR {return_code}') - return return_code - - update_kueue_command_code, kueue_manifest = update_kueue_resources_if_necessary(args) - if update_kueue_command_code !=0: - if kueue_manifest is None: - xpk_print( - "Updated Kueue manifest is empty, not updating the Kueue controller." - ) - xpk_print("Updating Kueue Controller Manager") - return_code = apply_kubectl_manifest(k8s_api_client, [kueue_manifest]) - if return_code != 0: - xpk_print(f'Updating Kueue Controller Manager returned ERROR {return_code}') - return return_code if system.accelerator_type == AcceleratorType['GPU']: prepare_gpus(args, system) @@ -988,6 +963,10 @@ def install_kueue(args, system: SystemCharacteristics, autoprovisioning_config): if enable_kueue_credentials_code != 0: xpk_exit(enable_kueue_credentials_code) + xpk_print('Update Kueue Controller Manager resources') + update_kueue_resources_code = update_kueue_resources_if_necessary(args) + if update_kueue_resources_code != 0: + xpk_exit(update_kueue_resources_code) def prepare_gpus(args, system: SystemCharacteristics): xpk_print('Installing NCCL Plugin for cluster') diff --git a/src/xpk/core/jobset.py b/src/xpk/core/jobset.py new file mode 100644 index 00000000..7c1fce04 --- /dev/null +++ b/src/xpk/core/jobset.py @@ -0,0 +1,146 @@ +import math + +from ..utils.console import xpk_exit, xpk_print +from ..utils.file import write_tmp_file +from ..core.kueue import ( + MEMORY_SIZE_PER_VM, + MIN_MEMORY_LIMIT_SIZE, +) +from .commands import ( + run_command_for_value, + run_command_with_updates_retry, +) + +jobset_controller_manager_yml = """ +apiVersion: v1 +kind: Service +metadata: + labels: + app.kubernetes.io/component: webhook + app.kubernetes.io/created-by: jobset + app.kubernetes.io/instance: webhook-service + app.kubernetes.io/managed-by: kustomize + app.kubernetes.io/name: service + app.kubernetes.io/part-of: jobset + name: jobset-webhook-service + namespace: jobset-system +spec: + ports: + - port: 443 + protocol: TCP + targetPort: 9443 + selector: + control-plane: controller-manager +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app.kubernetes.io/component: manager + app.kubernetes.io/created-by: jobset + app.kubernetes.io/instance: controller-manager + app.kubernetes.io/managed-by: kustomize + app.kubernetes.io/name: deployment + app.kubernetes.io/part-of: jobset + control-plane: controller-manager + name: jobset-controller-manager + namespace: jobset-system +spec: + replicas: 1 + selector: + matchLabels: + control-plane: controller-manager + template: + metadata: + annotations: + kubectl.kubernetes.io/default-container: manager + labels: + control-plane: controller-manager + spec: + containers: + - args: + - --config=/controller_manager_config.yaml + - --zap-log-level=2 + command: + - /manager + image: registry.k8s.io/jobset/jobset:v0.8.0 + livenessProbe: + httpGet: + path: /healthz + port: 8081 + initialDelaySeconds: 15 + periodSeconds: 20 + name: manager + ports: + - containerPort: 9443 + name: webhook-server + protocol: TCP + readinessProbe: + httpGet: + path: /readyz + port: 8081 + initialDelaySeconds: 5 + periodSeconds: 10 + resources: + limits: + memory: {memory_limit_size} + requests: + cpu: 500m + memory: 128Mi + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + volumeMounts: + - mountPath: /controller_manager_config.yaml + name: manager-config + subPath: controller_manager_config.yaml + - mountPath: /tmp/k8s-webhook-server/serving-certs + name: cert + readOnly: true + securityContext: + runAsNonRoot: true + serviceAccountName: jobset-controller-manager + terminationGracePeriodSeconds: 10 + volumes: + - configMap: + name: jobset-manager-config + name: manager-config + - name: cert + secret: + defaultMode: 420 + secretName: jobset-webhook-server-cert +""" + +def update_jobset_resources_if_necessary(args): + """Update the jobset manifest to increase the resources for the jobset controller manager. + + Args: + args: user provided arguments for running the command. + + Returns: + 0 if successful and 1 otherwise. + """ + # Get total number of nodes + cmd_total_node_num = ( + 'kubectl get node --no-headers | wc -l' + ) + return_code, out = run_command_for_value( + cmd_total_node_num, 'Count total nodes', args + ) + if return_code != 0: + xpk_exit(1) + # 1.2MiB per VM or 4GiB (whichever is greater). + new_memory_limit = f"{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi" + yml_string = jobset_controller_manager_yml.format( + memory_limit_size=new_memory_limit, + ) + tmp = write_tmp_file(yml_string) + command = f'kubectl apply -f {str(tmp.file.name)}' + + task = 'Updating jobset Controller Manager resources' + return_code = run_command_with_updates_retry(command, task, args) + if return_code != 0: + xpk_print(f'{task} returned ERROR {return_code}') + return return_code \ No newline at end of file diff --git a/src/xpk/core/kueue.py b/src/xpk/core/kueue.py index 4c7158c4..fc0e3e62 100644 --- a/src/xpk/core/kueue.py +++ b/src/xpk/core/kueue.py @@ -16,6 +16,7 @@ from argparse import Namespace +import math import packaging from packaging.version import Version @@ -43,6 +44,8 @@ CLUSTER_QUEUE_NAME = 'cluster-queue' LOCAL_QUEUE_NAME = 'multislice-queue' WAIT_FOR_KUEUE_TIMEOUT = '5m' +MEMORY_SIZE_PER_VM = 1.2 +MIN_MEMORY_LIMIT_SIZE = 4096 packaging.version.VERSION_PATTERN = r'^v\d+\.\d+\.\d+$' @@ -166,6 +169,98 @@ command: [ "sleep", "inf" ] """ +kueue_controller_manager_yml = """ +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app.kubernetes.io/component: controller + app.kubernetes.io/name: kueue + control-plane: controller-manager + name: kueue-controller-manager + namespace: kueue-system +spec: + replicas: 1 + selector: + matchLabels: + control-plane: controller-manager + template: + metadata: + annotations: + kubectl.kubernetes.io/default-container: manager + labels: + app.kubernetes.io/component: controller + app.kubernetes.io/name: kueue + control-plane: controller-manager + spec: + containers: + - args: + - --config=/controller_manager_config.yaml + - --zap-log-level=2 + command: + - /manager + image: registry.k8s.io/kueue/kueue:v0.10.0 + imagePullPolicy: Always + livenessProbe: + httpGet: + path: /healthz + port: 8081 + initialDelaySeconds: 15 + periodSeconds: 20 + name: manager + ports: + - containerPort: 8082 + name: visibility + protocol: TCP + - containerPort: 9443 + name: webhook-server + protocol: TCP + readinessProbe: + httpGet: + path: /readyz + port: 8081 + initialDelaySeconds: 5 + periodSeconds: 10 + resources: + limits: + cpu: 500m + memory: {memory_limit_size} + requests: + cpu: 500m + memory: 512Mi + securityContext: + allowPrivilegeEscalation: false + volumeMounts: + - mountPath: /tmp/k8s-webhook-server/serving-certs + name: cert + readOnly: true + - mountPath: /controller_manager_config.yaml + name: manager-config + subPath: controller_manager_config.yaml + - args: + - --secure-listen-address=0.0.0.0:8443 + - --upstream=http://127.0.0.1:8080/ + - --logtostderr=true + - --v=10 + image: registry.k8s.io/kubebuilder/kube-rbac-proxy:v0.16.0 + name: kube-rbac-proxy + ports: + - containerPort: 8443 + name: https + protocol: TCP + securityContext: + runAsNonRoot: true + serviceAccountName: kueue-controller-manager + terminationGracePeriodSeconds: 10 + volumes: + - name: cert + secret: + defaultMode: 420 + secretName: kueue-webhook-server-cert + - configMap: + name: kueue-manager-config + name: manager-config +""" def verify_kueuectl(args: Namespace) -> None: """Verify if kueuectl is installed. @@ -370,7 +465,7 @@ def get_kueue_covered_resources_config( total_chips: total number of chips for the specific resource type. Returns: - A string of Kueue covered resources configuration. + 0 if successful and 1 otherwise. """ config_format = """ - coveredResources: ["{resource_type}"] @@ -386,3 +481,35 @@ def get_kueue_covered_resources_config( total_chips=total_chips, ) return config_string + +def update_kueue_resources_if_necessary(args): + """Update the kueue manifest to increase the resources for the kueue controller manager. + + Args: + args: user provided arguments for running the command. + + Returns: + The updated kueue manifest. + """ + # Get total number of nodes + cmd_total_node_num = ( + 'kubectl get node --no-headers | wc -l' + ) + return_code, out = run_command_for_value( + cmd_total_node_num, 'Count total nodes', args + ) + if return_code != 0: + xpk_exit(1) + # 1.2MiB per VM or 4GiB (whichever is greater). + new_memory_limit = f"{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi" + yml_string = kueue_controller_manager_yml.format( + memory_limit_size=new_memory_limit, + ) + tmp = write_tmp_file(yml_string) + command = f'kubectl apply -f {str(tmp.file.name)}' + + task = 'Updating Kueue Controller Manager resources' + return_code = run_command_with_updates_retry(command, task, args) + if return_code != 0: + xpk_print(f'{task} returned ERROR {return_code}') + return return_code \ No newline at end of file diff --git a/src/xpk/utils/kubectl.py b/src/xpk/utils/kubectl.py index 01992920..76bb3d49 100644 --- a/src/xpk/utils/kubectl.py +++ b/src/xpk/utils/kubectl.py @@ -24,11 +24,14 @@ from ..core.cluster import JOBSET_VERSION from ..core.commands import run_command_for_value -from ..core.kueue import get_kueue_version, KUEUE_VERSION +from ..core.kueue import ( + get_kueue_version, + KUEUE_VERSION, + MEMORY_SIZE_PER_VM, + MIN_MEMORY_LIMIT_SIZE, +) from .console import xpk_print -MEMORY_SIZE_PER_VM = 1.2 -MIN_MEMORY_LIMIT_SIZE = 4096 def apply_kubectl_manifest(client, manifest) -> int: xpk_print('Applying manifest') @@ -150,98 +153,6 @@ def update_jobset_resources_if_necessary(args): xpk_print("Jobset controller no updation required.") return update_manifest, yaml_data -def get_kueue_manifest(): - """Get the kueue manifest. - - Returns: - The updated kueue manifest. - """ - manifest_url = f"https://github.com/kubernetes-sigs/kueue/releases/download/{KUEUE_VERSION}/manifests.yaml" - manifest_content = None - # Fetch the manifest content - try: - response = requests.get(manifest_url, timeout=10) - response.raise_for_status() # Raise an exception for HTTP errors - manifest_content = response.text - return manifest_content - except requests.exceptions.Timeout as e: - xpk_print(f"Error: Request to {manifest_url} after 10 seconds: {e}") - xpk_exit(1) - except requests.exceptions.RequestException as e: - xpk_print(f"Error fetching manifest from {manifest_url}: {e}") - xpk_exit(1) - - if manifest_content is None: - xpk_print("Manifest content not found.") - xpk_exit(1) - -def update_kueue_resources_if_necessary(args): - """Update the kueue manifest to increase the resources for the kueue controller manager. - - Args: - args: user provided arguments for running the command. - - Returns: - The updated kueue manifest. - """ - err_code, kueue_version_installed = get_kueue_version(args) - if err_code == 0: - if Version(kueue_version_installed) < Version('v0.9.0') and Version( - KUEUE_VERSION - ) >= Version('v0.9.0'): - xpk_print('Upgrading kueue on cluster from version < 0.9.0.') - upgrade_code = delete_multikueueclusters_definitions(args) - if upgrade_code != 0: - return upgrade_code - upgrade_code = delete_multikueueconfigs_definitions(args) - if upgrade_code != 0: - return upgrade_code - manifest_content = get_kueue_manifest() - # Load all YAML documents from the manifest - yaml_data_list = list(yaml.safe_load_all(manifest_content)) - # Iterate through the yaml_data to find the Deployment for - # kueue-controller-manager - update_manifest = False - for yaml_data in yaml_data_list: - if ( - yaml_data - and yaml_data.get("apiVersion") == "apps/v1" - and yaml_data.get("kind") == "Deployment" - and yaml_data.get("metadata", {}).get("name") - == "kueue-controller-manager" - ): - # Found the Deployment, now modify the resources - containers = yaml_data["spec"]["template"]["spec"]["containers"] - for container in containers: - if container["name"] == "manager": - # Update memory limit - current_memory_limit = ( - container["resources"].get("limits", {}).get("memory", "0Mi") - ) - - # Define new values for comparison - cmd_total_node_num = ( - 'kubectl get node --no-headers | wc -l' - ) - return_code, out = run_command_for_value( - cmd_total_node_num, 'Count total nodes', args - ) - if return_code != 0: - xpk_exit(1) - # 1.2MiB per VM or 4GiB (whichever is greater). - new_memory_limit = f"{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi" - if parse_resource_value(current_memory_limit) < parse_resource_value( - new_memory_limit - ): - container["resources"]["limits"]["memory"] = new_memory_limit - update_manifest = True - break - if update_manifest: - xpk_print("Kueue controller updation required.") - return update_manifest, yaml_data - xpk_print("Kueue controller no updation required.") - return update_manifest, yaml_data - def parse_resource_value(value) -> int: if value.endswith("m"): return int(value[:-1]) From d15cd932f4b6668c4b19b9e49c1d6759f48b7db0 Mon Sep 17 00:00:00 2001 From: Zheng Chen Date: Wed, 25 Jun 2025 19:23:23 +0000 Subject: [PATCH 07/11] Clean up code --- src/xpk/commands/cluster.py | 2 - src/xpk/utils/kubectl.py | 103 ------------------------------------ 2 files changed, 105 deletions(-) diff --git a/src/xpk/commands/cluster.py b/src/xpk/commands/cluster.py index 20308df4..b4c3870d 100644 --- a/src/xpk/commands/cluster.py +++ b/src/xpk/commands/cluster.py @@ -75,7 +75,6 @@ from ..core.workload import get_workload_list from ..utils.console import get_user_input, xpk_exit, xpk_print from ..utils.file import write_tmp_file -from ..utils.kubectl import apply_kubectl_manifest from . import cluster_gcluster from .common import set_cluster_command @@ -173,7 +172,6 @@ def cluster_adapt(args) -> None: install_kueue(args, system, autoprovisioning_config) install_kjob(args) - if system.accelerator_type == AcceleratorType['GPU']: prepare_gpus(args, system) diff --git a/src/xpk/utils/kubectl.py b/src/xpk/utils/kubectl.py index 76bb3d49..c48e093e 100644 --- a/src/xpk/utils/kubectl.py +++ b/src/xpk/utils/kubectl.py @@ -13,23 +13,10 @@ See the License for the specific language governing permissions and limitations under the License. """ -import math -import requests -import yaml -import packaging -from packaging.version import Version from kubernetes.client.exceptions import ApiException from kubernetes.dynamic import DynamicClient -from ..core.cluster import JOBSET_VERSION -from ..core.commands import run_command_for_value -from ..core.kueue import ( - get_kueue_version, - KUEUE_VERSION, - MEMORY_SIZE_PER_VM, - MIN_MEMORY_LIMIT_SIZE, -) from .console import xpk_print @@ -71,93 +58,3 @@ def apply_kubectl_manifest(client, manifest) -> int: xpk_print(f'Error applying {kind}: {e}') status_code = 1 return status_code - -def get_jobset_manifest(): - """Get the jobset manifest. - - Returns: - The updated jobset manifest. - """ - manifest_url = f"https://github.com/kubernetes-sigs/jobset/releases/download/{JOBSET_VERSION}/manifests.yaml" - manifest_content = None - # Fetch the manifest content - try: - response = requests.get(manifest_url, timeout=10) - response.raise_for_status() # Raise an exception for HTTP errors - manifest_content = response.text - return manifest_content - except requests.exceptions.Timeout as e: - xpk_print(f"Error: Request to {manifest_url} after 10 seconds: {e}") - xpk_exit(1) - except requests.exceptions.RequestException as e: - xpk_print(f"Error fetching manifest from {manifest_url}: {e}") - xpk_exit(1) - - if manifest_content is None: - xpk_print("Manifest content not found.") - xpk_exit(1) - - -def update_jobset_resources_if_necessary(args): - """Update the jobset manifest to increase the resources for the jobset controller manager. - - Args: - args: user provided arguments for running the command. - - Returns: - The updated jobset manifest. - """ - manifest_content = get_jobset_manifest() - # Load all YAML documents from the manifest - yaml_data_list = list(yaml.safe_load_all(manifest_content)) - # Iterate through the yaml_data to find the Deployment for - # jobset-controller-manager - update_manifest = False - for yaml_data in yaml_data_list: - if ( - yaml_data - and yaml_data.get("apiVersion") == "apps/v1" - and yaml_data.get("kind") == "Deployment" - and yaml_data.get("metadata", {}).get("name") - == "jobset-controller-manager" - ): - # Found the Deployment, now modify the resources - containers = yaml_data["spec"]["template"]["spec"]["containers"] - for container in containers: - if container["name"] == "manager": - # Update memory limit - current_memory_limit = ( - container["resources"].get("limits", {}).get("memory", "0Mi") - ) - - # Define new values for comparison - cmd_total_node_num = ( - 'kubectl get node --no-headers | wc -l' - ) - return_code, out = run_command_for_value( - cmd_total_node_num, 'Count total nodes', args - ) - if return_code != 0: - xpk_exit(1) - # 1.2MiB per VM or 4GiB (whichever is greater). - new_memory_limit = f"{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi" - if parse_resource_value(current_memory_limit) < parse_resource_value( - new_memory_limit - ): - container["resources"]["limits"]["memory"] = new_memory_limit - update_manifest = True - break - if update_manifest: - xpk_print("Jobset controller updation required.") - return update_manifest, yaml_data - xpk_print("Jobset controller no updation required.") - return update_manifest, yaml_data - -def parse_resource_value(value) -> int: - if value.endswith("m"): - return int(value[:-1]) - if value.endswith("Mi"): - return int(value[:-2]) - if value.endswith("Gi"): - return int(value[:-2]) * 1024 - return int(value) \ No newline at end of file From bab8a813398b4651554503ad6be8b32b75f5db65 Mon Sep 17 00:00:00 2001 From: Zheng Chen Date: Wed, 25 Jun 2025 19:25:22 +0000 Subject: [PATCH 08/11] Correct wrong description --- src/xpk/core/kueue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/xpk/core/kueue.py b/src/xpk/core/kueue.py index fc0e3e62..165c1b36 100644 --- a/src/xpk/core/kueue.py +++ b/src/xpk/core/kueue.py @@ -465,7 +465,7 @@ def get_kueue_covered_resources_config( total_chips: total number of chips for the specific resource type. Returns: - 0 if successful and 1 otherwise. + A string of Kueue covered resources configuration. """ config_format = """ - coveredResources: ["{resource_type}"] @@ -489,7 +489,7 @@ def update_kueue_resources_if_necessary(args): args: user provided arguments for running the command. Returns: - The updated kueue manifest. + 0 if successful and 1 otherwise. """ # Get total number of nodes cmd_total_node_num = ( From c4719a96de9802e7f6ca3cc78a0fa510ecd54ae5 Mon Sep 17 00:00:00 2001 From: Zheng Chen Date: Thu, 26 Jun 2025 02:09:49 +0000 Subject: [PATCH 09/11] Remove unnecessary section of yaml --- src/xpk/core/jobset.py | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/src/xpk/core/jobset.py b/src/xpk/core/jobset.py index 7c1fce04..ad9955e8 100644 --- a/src/xpk/core/jobset.py +++ b/src/xpk/core/jobset.py @@ -12,26 +12,6 @@ ) jobset_controller_manager_yml = """ -apiVersion: v1 -kind: Service -metadata: - labels: - app.kubernetes.io/component: webhook - app.kubernetes.io/created-by: jobset - app.kubernetes.io/instance: webhook-service - app.kubernetes.io/managed-by: kustomize - app.kubernetes.io/name: service - app.kubernetes.io/part-of: jobset - name: jobset-webhook-service - namespace: jobset-system -spec: - ports: - - port: 443 - protocol: TCP - targetPort: 9443 - selector: - control-plane: controller-manager ---- apiVersion: apps/v1 kind: Deployment metadata: From 7b8e64e0d215e6d018085ac0ef7fca414b21d158 Mon Sep 17 00:00:00 2001 From: Zheng Chen Date: Tue, 8 Jul 2025 01:31:58 +0000 Subject: [PATCH 10/11] Resolve lint issue --- src/xpk/commands/cluster.py | 2 +- src/xpk/core/jobset.py | 20 ++++++++++++++++++-- src/xpk/core/kueue.py | 4 ++-- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/xpk/commands/cluster.py b/src/xpk/commands/cluster.py index b4c3870d..d248684e 100644 --- a/src/xpk/commands/cluster.py +++ b/src/xpk/commands/cluster.py @@ -320,7 +320,7 @@ def cluster_create(args) -> None: install_kueue(args, system, autoprovisioning_config) install_kjob(args) - + if system.accelerator_type == AcceleratorType['GPU']: prepare_gpus(args, system) diff --git a/src/xpk/core/jobset.py b/src/xpk/core/jobset.py index ad9955e8..a2135754 100644 --- a/src/xpk/core/jobset.py +++ b/src/xpk/core/jobset.py @@ -1,3 +1,19 @@ +""" +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + import math from ..utils.console import xpk_exit, xpk_print @@ -112,7 +128,7 @@ def update_jobset_resources_if_necessary(args): if return_code != 0: xpk_exit(1) # 1.2MiB per VM or 4GiB (whichever is greater). - new_memory_limit = f"{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi" + new_memory_limit = f'{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi' yml_string = jobset_controller_manager_yml.format( memory_limit_size=new_memory_limit, ) @@ -123,4 +139,4 @@ def update_jobset_resources_if_necessary(args): return_code = run_command_with_updates_retry(command, task, args) if return_code != 0: xpk_print(f'{task} returned ERROR {return_code}') - return return_code \ No newline at end of file + return return_code diff --git a/src/xpk/core/kueue.py b/src/xpk/core/kueue.py index 165c1b36..142810f2 100644 --- a/src/xpk/core/kueue.py +++ b/src/xpk/core/kueue.py @@ -501,7 +501,7 @@ def update_kueue_resources_if_necessary(args): if return_code != 0: xpk_exit(1) # 1.2MiB per VM or 4GiB (whichever is greater). - new_memory_limit = f"{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi" + new_memory_limit = f'{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi' yml_string = kueue_controller_manager_yml.format( memory_limit_size=new_memory_limit, ) @@ -512,4 +512,4 @@ def update_kueue_resources_if_necessary(args): return_code = run_command_with_updates_retry(command, task, args) if return_code != 0: xpk_print(f'{task} returned ERROR {return_code}') - return return_code \ No newline at end of file + return return_code From 65c129cfeb983da6b2587cdeb4d91f4ef7f6563e Mon Sep 17 00:00:00 2001 From: Zheng Chen Date: Tue, 8 Jul 2025 16:38:19 +0000 Subject: [PATCH 11/11] Reformat the change --- src/xpk/commands/cluster.py | 1 + src/xpk/core/jobset.py | 9 +++++---- src/xpk/core/kueue.py | 10 ++++++---- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/xpk/commands/cluster.py b/src/xpk/commands/cluster.py index d248684e..334441be 100644 --- a/src/xpk/commands/cluster.py +++ b/src/xpk/commands/cluster.py @@ -966,6 +966,7 @@ def install_kueue(args, system: SystemCharacteristics, autoprovisioning_config): if update_kueue_resources_code != 0: xpk_exit(update_kueue_resources_code) + def prepare_gpus(args, system: SystemCharacteristics): xpk_print('Installing NCCL Plugin for cluster') install_nccl_code = install_nccl_on_cluster(args, system) diff --git a/src/xpk/core/jobset.py b/src/xpk/core/jobset.py index a2135754..3b53c6a5 100644 --- a/src/xpk/core/jobset.py +++ b/src/xpk/core/jobset.py @@ -109,6 +109,7 @@ secretName: jobset-webhook-server-cert """ + def update_jobset_resources_if_necessary(args): """Update the jobset manifest to increase the resources for the jobset controller manager. @@ -119,16 +120,16 @@ def update_jobset_resources_if_necessary(args): 0 if successful and 1 otherwise. """ # Get total number of nodes - cmd_total_node_num = ( - 'kubectl get node --no-headers | wc -l' - ) + cmd_total_node_num = 'kubectl get node --no-headers | wc -l' return_code, out = run_command_for_value( cmd_total_node_num, 'Count total nodes', args ) if return_code != 0: xpk_exit(1) # 1.2MiB per VM or 4GiB (whichever is greater). - new_memory_limit = f'{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi' + new_memory_limit = ( + f'{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi' + ) yml_string = jobset_controller_manager_yml.format( memory_limit_size=new_memory_limit, ) diff --git a/src/xpk/core/kueue.py b/src/xpk/core/kueue.py index 142810f2..0b225878 100644 --- a/src/xpk/core/kueue.py +++ b/src/xpk/core/kueue.py @@ -262,6 +262,7 @@ name: manager-config """ + def verify_kueuectl(args: Namespace) -> None: """Verify if kueuectl is installed. Args: @@ -482,6 +483,7 @@ def get_kueue_covered_resources_config( ) return config_string + def update_kueue_resources_if_necessary(args): """Update the kueue manifest to increase the resources for the kueue controller manager. @@ -492,16 +494,16 @@ def update_kueue_resources_if_necessary(args): 0 if successful and 1 otherwise. """ # Get total number of nodes - cmd_total_node_num = ( - 'kubectl get node --no-headers | wc -l' - ) + cmd_total_node_num = 'kubectl get node --no-headers | wc -l' return_code, out = run_command_for_value( cmd_total_node_num, 'Count total nodes', args ) if return_code != 0: xpk_exit(1) # 1.2MiB per VM or 4GiB (whichever is greater). - new_memory_limit = f'{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi' + new_memory_limit = ( + f'{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi' + ) yml_string = kueue_controller_manager_yml.format( memory_limit_size=new_memory_limit, )