From 2706ad1073cb0ad2ce4e6e43a24d3993b306ab3c Mon Sep 17 00:00:00 2001 From: ddalvi Date: Tue, 30 Sep 2025 15:40:42 -0400 Subject: [PATCH 1/4] Revert "feat(api): Add SemaphoreKey and MutexName fields to proto" This reverts commit 28e5ba9d30edb30e649e20cb312e50ae386da1f8. Signed-off-by: ddalvi --- .../go/pipelinespec/pipeline_spec.pb.go | 33 ++++--------------- api/v2alpha1/pipeline_spec.proto | 10 ++---- 2 files changed, 8 insertions(+), 35 deletions(-) diff --git a/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go b/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go index d809e54f8fe..c81f0d64c21 100644 --- a/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go +++ b/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go @@ -2746,16 +2746,12 @@ func (x *KubernetesWorkspaceConfig) GetPvcSpecPatch() *structpb.Struct { // Spec for pipeline-level config options. See PipelineConfig DSL class. type PipelineConfig struct { state protoimpl.MessageState `protogen:"open.v1"` - // Name of the semaphore key to control pipeline concurrency - SemaphoreKey string `protobuf:"bytes,1,opt,name=semaphore_key,json=semaphoreKey,proto3" json:"semaphore_key,omitempty"` - // Name of the mutex to ensure mutual exclusion - MutexName string `protobuf:"bytes,2,opt,name=mutex_name,json=mutexName,proto3" json:"mutex_name,omitempty"` // Time to live configuration after the pipeline run is completed for // ephemeral resources created by the pipeline run. - ResourceTtl int32 `protobuf:"varint,3,opt,name=resource_ttl,json=resourceTtl,proto3" json:"resource_ttl,omitempty"` + ResourceTtl int32 `protobuf:"varint,1,opt,name=resource_ttl,json=resourceTtl,proto3" json:"resource_ttl,omitempty"` // Configuration for a shared storage workspace that persists for the duration of the pipeline run. // The workspace can be configured with size and Kubernetes-specific settings to override default PVC configurations. - Workspace *WorkspaceConfig `protobuf:"bytes,4,opt,name=workspace,proto3,oneof" json:"workspace,omitempty"` + Workspace *WorkspaceConfig `protobuf:"bytes,2,opt,name=workspace,proto3,oneof" json:"workspace,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -2790,20 +2786,6 @@ func (*PipelineConfig) Descriptor() ([]byte, []int) { return file_pipeline_spec_proto_rawDescGZIP(), []int{34} } -func (x *PipelineConfig) GetSemaphoreKey() string { - if x != nil { - return x.SemaphoreKey - } - return "" -} - -func (x *PipelineConfig) GetMutexName() string { - if x != nil { - return x.MutexName - } - return "" -} - func (x *PipelineConfig) GetResourceTtl() int32 { if x != nil { return x.ResourceTtl @@ -6105,13 +6087,10 @@ const file_pipeline_spec_proto_rawDesc = "" + "\v_kubernetes\"r\n" + "\x19KubernetesWorkspaceConfig\x12B\n" + "\x0epvc_spec_patch\x18\x01 \x01(\v2\x17.google.protobuf.StructH\x00R\fpvcSpecPatch\x88\x01\x01B\x11\n" + - "\x0f_pvc_spec_patch\"\xc7\x01\n" + - "\x0ePipelineConfig\x12#\n" + - "\rsemaphore_key\x18\x01 \x01(\tR\fsemaphoreKey\x12\x1d\n" + - "\n" + - "mutex_name\x18\x02 \x01(\tR\tmutexName\x12!\n" + - "\fresource_ttl\x18\x03 \x01(\x05R\vresourceTtl\x12@\n" + - "\tworkspace\x18\x04 \x01(\v2\x1d.ml_pipelines.WorkspaceConfigH\x00R\tworkspace\x88\x01\x01B\f\n" + + "\x0f_pvc_spec_patch\"\x83\x01\n" + + "\x0ePipelineConfig\x12!\n" + + "\fresource_ttl\x18\x01 \x01(\x05R\vresourceTtl\x12@\n" + + "\tworkspace\x18\x02 \x01(\v2\x1d.ml_pipelines.WorkspaceConfigH\x00R\tworkspace\x88\x01\x01B\f\n" + "\n" + "_workspaceB Date: Tue, 30 Sep 2025 15:41:07 -0400 Subject: [PATCH 2/4] Revert "feat(sdk) Add SemaphoreKey and MutexName fields to DSL" This reverts commit e997d426e1472549d5c7851d7e69dfe65fa20d79. Signed-off-by: ddalvi --- sdk/python/kfp/compiler/compiler_test.py | 61 ------------------- .../kfp/compiler/pipeline_spec_builder.py | 18 ++---- sdk/python/kfp/dsl/pipeline_config.py | 54 +--------------- 3 files changed, 7 insertions(+), 126 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 95a04b85f6f..abfe4698d1c 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -4336,67 +4336,6 @@ def my_pipeline(): pipeline_func=my_pipeline, package_path=output_yaml) -class TestPipelineSemaphoreMutex(unittest.TestCase): - - def test_pipeline_with_semaphore(self): - """Test that pipeline config correctly sets the semaphore key.""" - config = PipelineConfig() - config.semaphore_key = 'semaphore' - - @dsl.pipeline(pipeline_config=config) - def my_pipeline(): - task = comp() - - with tempfile.TemporaryDirectory() as tempdir: - output_yaml = os.path.join(tempdir, 'pipeline.yaml') - compiler.Compiler().compile( - pipeline_func=my_pipeline, package_path=output_yaml) - - with open(output_yaml, 'r') as f: - pipeline_docs = list(yaml.safe_load_all(f)) - - platform_spec = None - for doc in pipeline_docs: - if 'platforms' in doc: - platform_spec = doc - break - - self.assertIsNotNone(platform_spec, - 'No platforms section found in compiled output') - kubernetes_spec = platform_spec['platforms']['kubernetes'][ - 'pipelineConfig'] - self.assertEqual(kubernetes_spec['semaphoreKey'], 'semaphore') - - def test_pipeline_with_mutex(self): - """Test that pipeline config correctly sets the mutex name.""" - config = PipelineConfig() - config.mutex_name = 'mutex' - - @dsl.pipeline(pipeline_config=config) - def my_pipeline(): - task = comp() - - with tempfile.TemporaryDirectory() as tempdir: - output_yaml = os.path.join(tempdir, 'pipeline.yaml') - compiler.Compiler().compile( - pipeline_func=my_pipeline, package_path=output_yaml) - - with open(output_yaml, 'r') as f: - pipeline_docs = list(yaml.safe_load_all(f)) - - platform_spec = None - for doc in pipeline_docs: - if 'platforms' in doc: - platform_spec = doc - break - - self.assertIsNotNone(platform_spec, - 'No platforms section found in compiled output') - kubernetes_spec = platform_spec['platforms']['kubernetes'][ - 'pipelineConfig'] - self.assertEqual(kubernetes_spec['mutexName'], 'mutex') - - class ExtractInputOutputDescription(unittest.TestCase): def test_no_descriptions(self): diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index 0ef90e4719d..b01a7fc135f 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -2242,20 +2242,14 @@ def _write_kubernetes_manifest_to_file( def _merge_pipeline_config(pipelineConfig: pipeline_config.PipelineConfig, platformSpec: pipeline_spec_pb2.PlatformSpec): - config_dict = {} - workspace = pipelineConfig.workspace - if workspace is not None: - config_dict['workspace'] = workspace.get_workspace() - - if pipelineConfig.semaphore_key is not None: - config_dict['semaphoreKey'] = pipelineConfig.semaphore_key - if pipelineConfig.mutex_name is not None: - config_dict['mutexName'] = pipelineConfig.mutex_name + if workspace is None: + return platformSpec - if config_dict: - json_format.ParseDict({'pipelineConfig': config_dict}, - platformSpec.platforms['kubernetes']) + json_format.ParseDict( + {'pipelineConfig': { + 'workspace': workspace.get_workspace(), + }}, platformSpec.platforms['kubernetes']) return platformSpec diff --git a/sdk/python/kfp/dsl/pipeline_config.py b/sdk/python/kfp/dsl/pipeline_config.py index 72308ee16bd..42505bf0d25 100644 --- a/sdk/python/kfp/dsl/pipeline_config.py +++ b/sdk/python/kfp/dsl/pipeline_config.py @@ -96,57 +96,5 @@ def set_kubernetes_config(self, class PipelineConfig: """PipelineConfig contains pipeline-level config options.""" - def __init__(self, - workspace: Optional[WorkspaceConfig] = None, - semaphore_key: Optional[str] = None, - mutex_name: Optional[str] = None): + def __init__(self, workspace: Optional[WorkspaceConfig] = None): self.workspace = workspace - self._semaphore_key = semaphore_key - self._mutex_name = mutex_name - - @property - def semaphore_key(self) -> Optional[str]: - """Get the semaphore key for controlling pipeline concurrency. - - Returns: - Optional[str]: The semaphore key, or None if not set. - """ - return self._semaphore_key - - @semaphore_key.setter - def semaphore_key(self, value: str): - """Set the semaphore key to control pipeline concurrency. - - Pipelines with the same semaphore key will be limited to a configured maximum - number of concurrent executions. This allows you to control resource usage by - ensuring that only a specific number of pipelines can run simultaneously. - - Note: A pipeline can use both semaphores and mutexes together. The pipeline - will wait until all required locks are available before starting. - - Args: - value (str): The semaphore key name for controlling concurrent executions. - """ - self._semaphore_key = (value and value.strip()) or None - - @property - def mutex_name(self) -> Optional[str]: - """Get the mutex name for exclusive pipeline execution. - - Returns: - Optional[str]: The mutex name, or None if not set. - """ - return self._mutex_name - - @mutex_name.setter - def mutex_name(self, value: str): - """Set the name of the mutex to ensure mutual exclusion. - - Pipelines with the same mutex name will only run one at a time. This ensures - exclusive access to shared resources and prevents conflicts when multiple - pipelines would otherwise compete for the same resources. - - Args: - value (str): Name of the mutex for exclusive pipeline execution. - """ - self._mutex_name = (value and value.strip()) or None From 8f0fecab9c4f6e972ec218d0e46d6da3f348b5f7 Mon Sep 17 00:00:00 2001 From: ddalvi Date: Tue, 30 Sep 2025 16:39:30 -0400 Subject: [PATCH 3/4] fix: Remove semaphore/mutex fields from test proto objects Resolves CI failure where test code was still referencing removed SemaphoreKey and MutexName fields in PipelineConfig struct literal. Signed-off-by: ddalvi --- backend/test/proto_tests/objects.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/backend/test/proto_tests/objects.go b/backend/test/proto_tests/objects.go index 1bce695c684..1cb2dec0966 100644 --- a/backend/test/proto_tests/objects.go +++ b/backend/test/proto_tests/objects.go @@ -306,9 +306,7 @@ var platformSpec = &specPB.PlatformSpec{ }, }, PipelineConfig: &specPB.PipelineConfig{ - SemaphoreKey: "test-key", - MutexName: "test-mutex", - ResourceTtl: 24, + ResourceTtl: 24, }, }, }, From 1fdec8e217eb509564c47794daba808f24972a06 Mon Sep 17 00:00:00 2001 From: ddalvi Date: Tue, 30 Sep 2025 17:04:29 -0400 Subject: [PATCH 4/4] fix: Regenerate proto test fixtures after semaphore/mutex field removal Updates platform_spec test data to match the new protobuf schema without SemaphoreKey and MutexName fields. Resolves TestPlatformSpec failure. Signed-off-by: ddalvi --- .../generated-1791485/platform_spec.json | 28 +++++++++---------- .../generated-1791485/platform_spec.pb | 8 ++---- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/backend/test/proto_tests/testdata/generated-1791485/platform_spec.json b/backend/test/proto_tests/testdata/generated-1791485/platform_spec.json index a7840b1a173..5d83f192340 100644 --- a/backend/test/proto_tests/testdata/generated-1791485/platform_spec.json +++ b/backend/test/proto_tests/testdata/generated-1791485/platform_spec.json @@ -1,24 +1,22 @@ { - "platforms": { - "kubernetes": { - "deployment_spec": { - "executors": { - "root-executor": { - "container": { - "image": "test-image" + "platforms": { + "kubernetes": { + "deployment_spec": { + "executors": { + "root-executor": { + "container": { + "image": "test-image" } } } }, - "platform": "kubernetes", - "config": { - "project": "test-project" + "platform": "kubernetes", + "config": { + "project": "test-project" }, - "pipelineConfig": { - "semaphore_key": "test-key", - "mutex_name": "test-mutex", - "resource_ttl": 24 + "pipelineConfig": { + "resource_ttl": 24 } } } -} +} \ No newline at end of file diff --git a/backend/test/proto_tests/testdata/generated-1791485/platform_spec.pb b/backend/test/proto_tests/testdata/generated-1791485/platform_spec.pb index ce75cf60837..c07587f3194 100644 --- a/backend/test/proto_tests/testdata/generated-1791485/platform_spec.pb +++ b/backend/test/proto_tests/testdata/generated-1791485/platform_spec.pb @@ -1,7 +1,7 @@ - +x -kubernetes +kubernetesj ; 9 root-executor( @@ -12,6 +12,4 @@ kubernetes test-image kubernetes  -project test-project" -test-key -test-mutex \ No newline at end of file +project test-project" \ No newline at end of file