Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions api/v2alpha1/pipeline_spec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,9 @@ message RuntimeArtifact {

// Properties of the Artifact.
google.protobuf.Struct metadata = 6;

// Custom path for output artifact.
optional string custom_path = 7;
}

// Message that represents a list of artifacts.
Expand Down
11 changes: 11 additions & 0 deletions backend/src/v2/component/launcher_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,17 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec

// Upload artifacts from local path to remote storages.
localDir, err := LocalPathForURI(outputArtifact.Uri)

//todo: this is where the copying occurs.
// first we want to see if a custom path is set, so that we can upload from the custom (pvc?) path
// OR is the URI the remote path?
// actually, just modify the LocalPathForURI method to return a localDir in the pvc (i think)
// if a custom path is set, does that mean the pvc path exists locally?
if outputArtifact.CustomPath != nil {
// todo: therefore we will be uploading here from pvc to remote storages.
localDir = *outputArtifact.CustomPath
}

if err != nil {
glog.Warningf("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage.", name, outputArtifact.Uri)
} else if !strings.HasPrefix(outputArtifact.Uri, "oci://") {
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/kfp/dsl/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def assign_input_and_output_artifacts(self) -> None:
self.func,
)
self.output_artifacts[name] = output_artifact
#todo: I think this is where we are creating artifact files in local container storage.
makedirs_recursively(output_artifact.path)

def make_artifact(
Expand Down Expand Up @@ -303,6 +304,7 @@ def write_executor_output(self,
write_file = cluster_spec['task']['type'] in CHIEF_NODE_LABELS

if write_file:
#todo. this method is called by execute(), and here is where it is dumping the artifact output locally.
makedirs_recursively(self.executor_output_path)
with open(self.executor_output_path, 'w') as f:
f.write(json.dumps(self.excutor_output))
Expand Down
49 changes: 37 additions & 12 deletions sdk/python/kfp/dsl/executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ def test_func(output_artifact_two: Output[Metrics]):
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
'uri':
'new-uri',
'custom_path':
'',
'metadata': {
'key_1': 'value_1',
'key_2': 2,
Expand Down Expand Up @@ -613,7 +615,8 @@ def test_artifact_output1(self):
"type": {
"schemaTitle": "system.Artifact"
},
"uri": "gs://some-bucket/output"
"uri": "gs://some-bucket/output",
'custom_path':""
}
]
}
Expand Down Expand Up @@ -702,7 +705,9 @@ def test_func(first: str, second: str) -> Artifact:
'name':
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
'uri':
'gs://some-bucket/output'
'gs://some-bucket/output',
'custom_path':
''
}]
}
},
Expand Down Expand Up @@ -815,7 +820,9 @@ def func_returning_plain_tuple() -> NamedTuple('Outputs', [
'name':
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
'uri':
'gs://some-bucket/output_dataset'
'gs://some-bucket/output_dataset',
'custom_path':
''
}]
}
},
Expand Down Expand Up @@ -1054,6 +1061,8 @@ def test_func(
'',
'uri':
'gs://mlpipeline/v2/artifacts/my-test-pipeline-beta/b2b0cdee-b15c-48ff-b8bc-a394ae46c854/train/model',
'custom_path':
'',
'metadata': {
'accuracy': 0.9
}
Expand Down Expand Up @@ -1288,6 +1297,8 @@ def test_func() -> Artifact:
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
'uri':
'gs://manually_specified_bucket/foo',
'custom_path':
'',
'metadata': {
'data': 123
}
Expand Down Expand Up @@ -1338,7 +1349,9 @@ def test_func() -> Artifact:
'name':
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
'uri':
'gs://another_bucket/my_artifact',
'gs://another_bucket/my_artifact',''
'custom_path':
'',
'metadata': {
'data': 123
}
Expand Down Expand Up @@ -1408,6 +1421,8 @@ def test_func() -> NamedTuple('outputs', a=Artifact, d=Dataset):
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
'uri':
'gs://another_bucket/artifact',
'custom_path':
'',
'metadata': {
'data': 123
}
Expand All @@ -1419,6 +1434,8 @@ def test_func() -> NamedTuple('outputs', a=Artifact, d=Dataset):
'projects/123/locations/us-central1/metadataStores/default/artifacts/321',
'uri':
'gs://another_bucket/dataset',
'custom_path':
'',
'metadata': {}
}]
}
Expand Down Expand Up @@ -1621,7 +1638,8 @@ class TestDictToArtifact(parameterized.TestCase):
'type': {
'schemaTitle': 'system.Artifact'
},
'uri': 'gs://some-bucket/input_artifact_one'
'uri': 'gs://some-bucket/input_artifact_one',
'custom_path': '',
},
'artifact_cls': artifact_types.Artifact,
'expected_type': artifact_types.Artifact,
Expand All @@ -1633,7 +1651,8 @@ class TestDictToArtifact(parameterized.TestCase):
'type': {
'schemaTitle': 'system.Model'
},
'uri': 'gs://some-bucket/input_artifact_one'
'uri': 'gs://some-bucket/input_artifact_one',
'custom_path': '',
},
'artifact_cls': artifact_types.Model,
'expected_type': artifact_types.Model,
Expand All @@ -1645,7 +1664,8 @@ class TestDictToArtifact(parameterized.TestCase):
'type': {
'schemaTitle': 'system.Dataset'
},
'uri': 'gs://some-bucket/input_artifact_one'
'uri': 'gs://some-bucket/input_artifact_one',
'custom_path': '',
},
'artifact_cls': artifact_types.Dataset,
'expected_type': artifact_types.Dataset,
Expand All @@ -1657,7 +1677,8 @@ class TestDictToArtifact(parameterized.TestCase):
'type': {
'schemaTitle': 'system.Metrics'
},
'uri': 'gs://some-bucket/input_artifact_one'
'uri': 'gs://some-bucket/input_artifact_one',
'custom_path': '',
},
'artifact_cls': artifact_types.Metrics,
'expected_type': artifact_types.Metrics,
Expand All @@ -1669,7 +1690,8 @@ class TestDictToArtifact(parameterized.TestCase):
'type': {
'schemaTitle': 'system.ClassificationMetrics'
},
'uri': 'gs://some-bucket/input_artifact_one'
'uri': 'gs://some-bucket/input_artifact_one',
'custom_path': '',
},
'artifact_cls': artifact_types.ClassificationMetrics,
'expected_type': artifact_types.ClassificationMetrics,
Expand All @@ -1681,7 +1703,8 @@ class TestDictToArtifact(parameterized.TestCase):
'type': {
'schemaTitle': 'system.SlicedClassificationMetrics'
},
'uri': 'gs://some-bucket/input_artifact_one'
'uri': 'gs://some-bucket/input_artifact_one',
'custom_path': '',
},
'artifact_cls': artifact_types.SlicedClassificationMetrics,
'expected_type': artifact_types.SlicedClassificationMetrics,
Expand All @@ -1693,7 +1716,8 @@ class TestDictToArtifact(parameterized.TestCase):
'type': {
'schemaTitle': 'system.HTML'
},
'uri': 'gs://some-bucket/input_artifact_one'
'uri': 'gs://some-bucket/input_artifact_one',
'custom_path': '',
},
'artifact_cls': None,
'expected_type': artifact_types.HTML,
Expand All @@ -1705,7 +1729,8 @@ class TestDictToArtifact(parameterized.TestCase):
'type': {
'schemaTitle': 'system.Markdown'
},
'uri': 'gs://some-bucket/input_artifact_one'
'uri': 'gs://some-bucket/input_artifact_one',
'custom_path': '',
},
'artifact_cls': None,
'expected_type': artifact_types.Markdown,
Expand Down
25 changes: 24 additions & 1 deletion sdk/python/kfp/dsl/types/artifact_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,21 @@ def __init__(self,
self.uri = uri or ''
self.name = name or ''
self.metadata = metadata or {}
self.custom_path = None

#todo: this is designed to return custom path if set, and otherwise return local container storage path.s
@property
def path(self) -> str:
return self._get_path()

@path.setter
def path(self, path: str) -> None:
self._set_path(path)
self._set_custom_path(path)

#todo: here the path property returns the custom path if set --> otherwise, returns the regular path/
def _get_path(self) -> Optional[str]:
if self.custom_path is not None:
return self.custom_path
if self.uri.startswith(RemotePrefix.GCS.value):
return _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value
):]
Expand All @@ -108,9 +113,27 @@ def _get_path(self) -> Optional[str]:
# uri == path for local execution
return self.uri

@property
def custom_path(self) -> str:
return self._get_custom_path()

def _get_custom_path(self) -> Optional[str]:
if self.custom_path is not None:
return self.custom_path
return None

#todo: this is the internal function to set the path.
def _set_path(self, path: str) -> None:
self.uri = convert_local_path_to_remote_path(path)

#todo: what differentiates this from setting the path regularly?
def _set_custom_path(self, custom_path: str) -> None:
self.custom_path = custom_path

@custom_path.setter
def custom_path(self, value):
self._custom_path = value


def convert_local_path_to_remote_path(path: str) -> str:
if path.startswith(_GCS_LOCAL_MOUNT_PREFIX):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from kfp.dsl import Output
from kfp.dsl.types.artifact_types import Artifact
from kfp.v2 import dsl


@dsl.component
def generate_artifact() -> list:
return [1, 2, 3, 4]

@dsl.component
def append_to_list(digit: int, input_list: Output[Artifact]) -> list:
input_list.append(digit)
return input_list

@dsl.component
def validate_artifact_custom_path(exp_path: str, input_list: Output[Artifact]) -> bool:
if input_list.path is not exp_path:
raise ValueError(f"File uri is {input_list.path} but should be {exp_path}.")

@dsl.pipeline
def pipeline_with_custom_path_artifact():
output_artifact_task = generate_artifact()
output_artifact_task.output.set_custom_path('/etc/test/file/path')
task2 = validate_artifact_custom_path(path='/etc/test/file/path', input_list=output_artifact_task.output)
Loading