diff --git a/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go b/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go index d809e54f8fe..301e3e2a30a 100644 --- a/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go +++ b/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go @@ -2065,7 +2065,9 @@ type RuntimeArtifact struct { // Deprecated: Marked as deprecated in pipeline_spec.proto. CustomProperties map[string]*Value `protobuf:"bytes,5,rep,name=custom_properties,json=customProperties,proto3" json:"custom_properties,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` // Properties of the Artifact. - Metadata *structpb.Struct `protobuf:"bytes,6,opt,name=metadata,proto3" json:"metadata,omitempty"` + Metadata *structpb.Struct `protobuf:"bytes,6,opt,name=metadata,proto3" json:"metadata,omitempty"` + // Custom path for output artifact. + CustomPath *string `protobuf:"bytes,7,opt,name=custom_path,json=customPath,proto3,oneof" json:"custom_path,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -2144,6 +2146,13 @@ func (x *RuntimeArtifact) GetMetadata() *structpb.Struct { return nil } +func (x *RuntimeArtifact) GetCustomPath() string { + if x != nil && x.CustomPath != nil { + return *x.CustomPath + } + return "" +} + // Message that represents a list of artifacts. type ArtifactList struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -5988,7 +5997,7 @@ const file_pipeline_spec_proto_rawDesc = "" + "\tint_value\x18\x01 \x01(\x03H\x00R\bintValue\x12#\n" + "\fdouble_value\x18\x02 \x01(\x01H\x00R\vdoubleValue\x12#\n" + "\fstring_value\x18\x03 \x01(\tH\x00R\vstringValueB\a\n" + - "\x05value\"\x89\x04\n" + + "\x05value\"\xbf\x04\n" + "\x0fRuntimeArtifact\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\x124\n" + "\x04type\x18\x02 \x01(\v2 .ml_pipelines.ArtifactTypeSchemaR\x04type\x12\x10\n" + @@ -5997,13 +6006,16 @@ const file_pipeline_spec_proto_rawDesc = "" + "properties\x18\x04 \x03(\v2-.ml_pipelines.RuntimeArtifact.PropertiesEntryB\x02\x18\x01R\n" + "properties\x12d\n" + "\x11custom_properties\x18\x05 \x03(\v23.ml_pipelines.RuntimeArtifact.CustomPropertiesEntryB\x02\x18\x01R\x10customProperties\x123\n" + - "\bmetadata\x18\x06 \x01(\v2\x17.google.protobuf.StructR\bmetadata\x1aR\n" + + "\bmetadata\x18\x06 \x01(\v2\x17.google.protobuf.StructR\bmetadata\x12$\n" + + "\vcustom_path\x18\a \x01(\tH\x00R\n" + + "customPath\x88\x01\x01\x1aR\n" + "\x0fPropertiesEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12)\n" + "\x05value\x18\x02 \x01(\v2\x13.ml_pipelines.ValueR\x05value:\x028\x01\x1aX\n" + "\x15CustomPropertiesEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12)\n" + - "\x05value\x18\x02 \x01(\v2\x13.ml_pipelines.ValueR\x05value:\x028\x01\"K\n" + + "\x05value\x18\x02 \x01(\v2\x13.ml_pipelines.ValueR\x05value:\x028\x01B\x0e\n" + + "\f_custom_path\"K\n" + "\fArtifactList\x12;\n" + "\tartifacts\x18\x01 \x03(\v2\x1d.ml_pipelines.RuntimeArtifactR\tartifacts\"\xfa\b\n" + "\rExecutorInput\x12:\n" + @@ -6430,6 +6442,7 @@ func file_pipeline_spec_proto_init() { (*Value_DoubleValue)(nil), (*Value_StringValue)(nil), } + file_pipeline_spec_proto_msgTypes[23].OneofWrappers = []any{} file_pipeline_spec_proto_msgTypes[32].OneofWrappers = []any{} file_pipeline_spec_proto_msgTypes[33].OneofWrappers = []any{} file_pipeline_spec_proto_msgTypes[34].OneofWrappers = []any{} diff --git a/api/v2alpha1/pipeline_spec.proto b/api/v2alpha1/pipeline_spec.proto index 11dece99820..5d796d6a812 100644 --- a/api/v2alpha1/pipeline_spec.proto +++ b/api/v2alpha1/pipeline_spec.proto @@ -967,6 +967,9 @@ message RuntimeArtifact { // Properties of the Artifact. google.protobuf.Struct metadata = 6; + + // Custom path for output artifact. + optional string custom_path = 7; } // Message that represents a list of artifacts. diff --git a/backend/src/v2/component/launcher_v2.go b/backend/src/v2/component/launcher_v2.go index c068b666e25..bab84e51c70 100644 --- a/backend/src/v2/component/launcher_v2.go +++ b/backend/src/v2/component/launcher_v2.go @@ -629,6 +629,17 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec // Upload artifacts from local path to remote storages. localDir, err := LocalPathForURI(outputArtifact.Uri) + + //todo: this is where the copying occurs. + // first we want to see if a custom path is set, so that we can upload from the custom (pvc?) path + // OR is the URI the remote path? + // actually, just modify the LocalPathForURI method to return a localDir in the pvc (i think) + // if a custom path is set, does that mean the pvc path exists locally? + if outputArtifact.CustomPath != nil { + // todo: therefore we will be uploading here from pvc to remote storages. + localDir = *outputArtifact.CustomPath + } + if err != nil { glog.Warningf("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage.", name, outputArtifact.Uri) } else if !strings.HasPrefix(outputArtifact.Uri, "oci://") { diff --git a/sdk/python/kfp/dsl/executor.py b/sdk/python/kfp/dsl/executor.py index ded418cc4ea..151bb3a41bd 100644 --- a/sdk/python/kfp/dsl/executor.py +++ b/sdk/python/kfp/dsl/executor.py @@ -102,6 +102,7 @@ def assign_input_and_output_artifacts(self) -> None: self.func, ) self.output_artifacts[name] = output_artifact + #todo: I think this is where we are creating artifact files in local container storage. makedirs_recursively(output_artifact.path) def make_artifact( @@ -303,6 +304,7 @@ def write_executor_output(self, write_file = cluster_spec['task']['type'] in CHIEF_NODE_LABELS if write_file: + #todo. this method is called by execute(), and here is where it is dumping the artifact output locally. makedirs_recursively(self.executor_output_path) with open(self.executor_output_path, 'w') as f: f.write(json.dumps(self.excutor_output)) diff --git a/sdk/python/kfp/dsl/executor_test.py b/sdk/python/kfp/dsl/executor_test.py index b5082dd9a36..09ffab0e656 100644 --- a/sdk/python/kfp/dsl/executor_test.py +++ b/sdk/python/kfp/dsl/executor_test.py @@ -318,6 +318,8 @@ def test_func(output_artifact_two: Output[Metrics]): 'projects/123/locations/us-central1/metadataStores/default/artifacts/123', 'uri': 'new-uri', + 'custom_path': + '', 'metadata': { 'key_1': 'value_1', 'key_2': 2, @@ -613,7 +615,8 @@ def test_artifact_output1(self): "type": { "schemaTitle": "system.Artifact" }, - "uri": "gs://some-bucket/output" + "uri": "gs://some-bucket/output", + 'custom_path':"" } ] } @@ -702,7 +705,9 @@ def test_func(first: str, second: str) -> Artifact: 'name': 'projects/123/locations/us-central1/metadataStores/default/artifacts/123', 'uri': - 'gs://some-bucket/output' + 'gs://some-bucket/output', + 'custom_path': + '' }] } }, @@ -815,7 +820,9 @@ def func_returning_plain_tuple() -> NamedTuple('Outputs', [ 'name': 'projects/123/locations/us-central1/metadataStores/default/artifacts/123', 'uri': - 'gs://some-bucket/output_dataset' + 'gs://some-bucket/output_dataset', + 'custom_path': + '' }] } }, @@ -1054,6 +1061,8 @@ def test_func( '', 'uri': 'gs://mlpipeline/v2/artifacts/my-test-pipeline-beta/b2b0cdee-b15c-48ff-b8bc-a394ae46c854/train/model', + 'custom_path': + '', 'metadata': { 'accuracy': 0.9 } @@ -1288,6 +1297,8 @@ def test_func() -> Artifact: 'projects/123/locations/us-central1/metadataStores/default/artifacts/123', 'uri': 'gs://manually_specified_bucket/foo', + 'custom_path': + '', 'metadata': { 'data': 123 } @@ -1338,7 +1349,9 @@ def test_func() -> Artifact: 'name': 'projects/123/locations/us-central1/metadataStores/default/artifacts/123', 'uri': - 'gs://another_bucket/my_artifact', + 'gs://another_bucket/my_artifact','' + 'custom_path': + '', 'metadata': { 'data': 123 } @@ -1408,6 +1421,8 @@ def test_func() -> NamedTuple('outputs', a=Artifact, d=Dataset): 'projects/123/locations/us-central1/metadataStores/default/artifacts/123', 'uri': 'gs://another_bucket/artifact', + 'custom_path': + '', 'metadata': { 'data': 123 } @@ -1419,6 +1434,8 @@ def test_func() -> NamedTuple('outputs', a=Artifact, d=Dataset): 'projects/123/locations/us-central1/metadataStores/default/artifacts/321', 'uri': 'gs://another_bucket/dataset', + 'custom_path': + '', 'metadata': {} }] } @@ -1621,7 +1638,8 @@ class TestDictToArtifact(parameterized.TestCase): 'type': { 'schemaTitle': 'system.Artifact' }, - 'uri': 'gs://some-bucket/input_artifact_one' + 'uri': 'gs://some-bucket/input_artifact_one', + 'custom_path': '', }, 'artifact_cls': artifact_types.Artifact, 'expected_type': artifact_types.Artifact, @@ -1633,7 +1651,8 @@ class TestDictToArtifact(parameterized.TestCase): 'type': { 'schemaTitle': 'system.Model' }, - 'uri': 'gs://some-bucket/input_artifact_one' + 'uri': 'gs://some-bucket/input_artifact_one', + 'custom_path': '', }, 'artifact_cls': artifact_types.Model, 'expected_type': artifact_types.Model, @@ -1645,7 +1664,8 @@ class TestDictToArtifact(parameterized.TestCase): 'type': { 'schemaTitle': 'system.Dataset' }, - 'uri': 'gs://some-bucket/input_artifact_one' + 'uri': 'gs://some-bucket/input_artifact_one', + 'custom_path': '', }, 'artifact_cls': artifact_types.Dataset, 'expected_type': artifact_types.Dataset, @@ -1657,7 +1677,8 @@ class TestDictToArtifact(parameterized.TestCase): 'type': { 'schemaTitle': 'system.Metrics' }, - 'uri': 'gs://some-bucket/input_artifact_one' + 'uri': 'gs://some-bucket/input_artifact_one', + 'custom_path': '', }, 'artifact_cls': artifact_types.Metrics, 'expected_type': artifact_types.Metrics, @@ -1669,7 +1690,8 @@ class TestDictToArtifact(parameterized.TestCase): 'type': { 'schemaTitle': 'system.ClassificationMetrics' }, - 'uri': 'gs://some-bucket/input_artifact_one' + 'uri': 'gs://some-bucket/input_artifact_one', + 'custom_path': '', }, 'artifact_cls': artifact_types.ClassificationMetrics, 'expected_type': artifact_types.ClassificationMetrics, @@ -1681,7 +1703,8 @@ class TestDictToArtifact(parameterized.TestCase): 'type': { 'schemaTitle': 'system.SlicedClassificationMetrics' }, - 'uri': 'gs://some-bucket/input_artifact_one' + 'uri': 'gs://some-bucket/input_artifact_one', + 'custom_path': '', }, 'artifact_cls': artifact_types.SlicedClassificationMetrics, 'expected_type': artifact_types.SlicedClassificationMetrics, @@ -1693,7 +1716,8 @@ class TestDictToArtifact(parameterized.TestCase): 'type': { 'schemaTitle': 'system.HTML' }, - 'uri': 'gs://some-bucket/input_artifact_one' + 'uri': 'gs://some-bucket/input_artifact_one', + 'custom_path': '', }, 'artifact_cls': None, 'expected_type': artifact_types.HTML, @@ -1705,7 +1729,8 @@ class TestDictToArtifact(parameterized.TestCase): 'type': { 'schemaTitle': 'system.Markdown' }, - 'uri': 'gs://some-bucket/input_artifact_one' + 'uri': 'gs://some-bucket/input_artifact_one', + 'custom_path': '', }, 'artifact_cls': None, 'expected_type': artifact_types.Markdown, diff --git a/sdk/python/kfp/dsl/types/artifact_types.py b/sdk/python/kfp/dsl/types/artifact_types.py index fed21b159c1..0c6bbd8445b 100644 --- a/sdk/python/kfp/dsl/types/artifact_types.py +++ b/sdk/python/kfp/dsl/types/artifact_types.py @@ -82,16 +82,21 @@ def __init__(self, self.uri = uri or '' self.name = name or '' self.metadata = metadata or {} + self.custom_path = None + #todo: this is designed to return custom path if set, and otherwise return local container storage path.s @property def path(self) -> str: return self._get_path() @path.setter def path(self, path: str) -> None: - self._set_path(path) + self._set_custom_path(path) + #todo: here the path property returns the custom path if set --> otherwise, returns the regular path/ def _get_path(self) -> Optional[str]: + if self.custom_path is not None: + return self.custom_path if self.uri.startswith(RemotePrefix.GCS.value): return _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value ):] @@ -108,9 +113,27 @@ def _get_path(self) -> Optional[str]: # uri == path for local execution return self.uri + @property + def custom_path(self) -> str: + return self._get_custom_path() + + def _get_custom_path(self) -> Optional[str]: + if self.custom_path is not None: + return self.custom_path + return None + + #todo: this is the internal function to set the path. def _set_path(self, path: str) -> None: self.uri = convert_local_path_to_remote_path(path) + #todo: what differentiates this from setting the path regularly? + def _set_custom_path(self, custom_path: str) -> None: + self.custom_path = custom_path + + @custom_path.setter + def custom_path(self, value): + self._custom_path = value + def convert_local_path_to_remote_path(path: str) -> str: if path.startswith(_GCS_LOCAL_MOUNT_PREFIX): diff --git a/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_artifact_custom_path.py b/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_artifact_custom_path.py new file mode 100644 index 00000000000..7b413971672 --- /dev/null +++ b/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_artifact_custom_path.py @@ -0,0 +1,24 @@ +from kfp.dsl import Output +from kfp.dsl.types.artifact_types import Artifact +from kfp.v2 import dsl + + +@dsl.component +def generate_artifact() -> list: + return [1, 2, 3, 4] + +@dsl.component +def append_to_list(digit: int, input_list: Output[Artifact]) -> list: + input_list.append(digit) + return input_list + +@dsl.component +def validate_artifact_custom_path(exp_path: str, input_list: Output[Artifact]) -> bool: + if input_list.path is not exp_path: + raise ValueError(f"File uri is {input_list.path} but should be {exp_path}.") + +@dsl.pipeline +def pipeline_with_custom_path_artifact(): + output_artifact_task = generate_artifact() + output_artifact_task.output.set_custom_path('/etc/test/file/path') + task2 = validate_artifact_custom_path(path='/etc/test/file/path', input_list=output_artifact_task.output) \ No newline at end of file