Skip to content

Commit 23b7804

Browse files
committed
Add CustomPath field to RuntimeArtifact proto.
Signed-off-by: agoins <[email protected]>
1 parent c866ff3 commit 23b7804

File tree

8 files changed

+214
-21
lines changed

8 files changed

+214
-21
lines changed

api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go

Lines changed: 17 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v2alpha1/pipeline_spec.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -967,6 +967,9 @@ message RuntimeArtifact {
967967

968968
// Properties of the Artifact.
969969
google.protobuf.Struct metadata = 6;
970+
971+
// Custom path for output artifact.
972+
optional string custom_path = 7;
970973
}
971974

972975
// Message that represents a list of artifacts.

backend/src/v2/component/launcher_v2.go

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ func executeV2(
368368
return nil, nil, err
369369
}
370370

371+
// ensure executorOutput contains custompath
371372
executorOutput, err := execute(
372373
ctx,
373374
executorInput,
@@ -545,21 +546,32 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec
545546
mergeRuntimeArtifacts(list.Artifacts[0], outputArtifact)
546547
}
547548

548-
// Upload artifacts from local path to remote storages.
549-
localDir, err := LocalPathForURI(outputArtifact.Uri)
550-
if err != nil {
551-
glog.Warningf("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage.", name, outputArtifact.Uri)
552-
} else if !strings.HasPrefix(outputArtifact.Uri, "oci://") {
553-
blobKey, err := opts.bucketConfig.KeyFromURI(outputArtifact.Uri)
549+
var dir string
550+
var copyDir string
551+
var err error
552+
// If artifact customPath is set, upload to remote storages from this path. Otherwise, upload from local path (outputArtifact.URI)
553+
if *outputArtifact.CustomPath != "" {
554+
dir = *outputArtifact.CustomPath
555+
copyDir = *outputArtifact.CustomPath
556+
} else {
557+
dir = outputArtifact.Uri
558+
copyDir, err = LocalPathForURI(dir)
559+
if err != nil {
560+
glog.Warningf("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage.", name, dir)
561+
}
562+
}
563+
564+
if !strings.HasPrefix(dir, "oci://") {
565+
blobKey, err := opts.bucketConfig.KeyFromURI(dir)
554566
if err != nil {
555567
return nil, fmt.Errorf("failed to upload output artifact %q: %w", name, err)
556568
}
557-
if err := objectstore.UploadBlob(ctx, opts.bucket, localDir, blobKey); err != nil {
569+
if err := objectstore.UploadBlob(ctx, opts.bucket, copyDir, blobKey); err != nil {
558570
// We allow components to not produce output files
559571
if errors.Is(err, os.ErrNotExist) {
560-
glog.Warningf("Local filepath %q does not exist", localDir)
572+
glog.Warningf("Local filepath %q does not exist", copyDir)
561573
} else {
562-
return nil, fmt.Errorf("failed to upload output artifact %q to remote storage URI %q: %w", name, outputArtifact.Uri, err)
574+
return nil, fmt.Errorf("failed to upload output artifact %q to remote storage URI %q: %w", name, dir, err)
563575
}
564576
}
565577
}
@@ -898,7 +910,7 @@ func getExecutorOutputFile(path string) (*pipelinespec.ExecutorOutput, error) {
898910

899911
func LocalPathForURI(uri string) (string, error) {
900912
if strings.HasPrefix(uri, "gs://") {
901-
return "/gcs/" + strings.TrimPrefix(uri, "gs://"), nil
913+
return "/home/agoins/gcs/" + strings.TrimPrefix(uri, "gs://"), nil
902914
}
903915
if strings.HasPrefix(uri, "minio://") {
904916
return "/minio/" + strings.TrimPrefix(uri, "minio://"), nil
@@ -926,14 +938,16 @@ func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error {
926938
}
927939

928940
for _, outputArtifact := range artifactList.Artifacts {
941+
// If custom path is set, do not create local directory for output artifact.
942+
if outputArtifact.CustomPath == nil {
943+
localPath, err := LocalPathForURI(outputArtifact.Uri)
944+
if err != nil {
945+
return fmt.Errorf("failed to generate local storage path for output artifact %q: %w", name, err)
946+
}
929947

930-
localPath, err := LocalPathForURI(outputArtifact.Uri)
931-
if err != nil {
932-
return fmt.Errorf("failed to generate local storage path for output artifact %q: %w", name, err)
933-
}
934-
935-
if err := os.MkdirAll(filepath.Dir(localPath), 0755); err != nil {
936-
return fmt.Errorf("unable to create directory %q for output artifact %q: %w", filepath.Dir(localPath), name, err)
948+
if err := os.MkdirAll(filepath.Dir(localPath), 0755); err != nil {
949+
return fmt.Errorf("unable to create directory %q for output artifact %q: %w", filepath.Dir(localPath), name, err)
950+
}
937951
}
938952
}
939953
}

backend/src/v2/component/launcher_v2_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,100 @@ var addNumbersComponent = &pipelinespec.ComponentSpec{
5050
},
5151
}
5252

53+
// Tests that launcher correctly executes the user component and successfully writes output parameters to file.
54+
func Test_executeV2_Artifacts(t *testing.T) {
55+
uri := "gs:///tmp/custom/artifact.txt"
56+
custom := "mem://test-bucket/pipeline-root/temp.txt"
57+
tests := []struct {
58+
name string
59+
executorInput *pipelinespec.ExecutorInput
60+
executorArgs []string
61+
wantErr bool
62+
}{
63+
{
64+
"custom path",
65+
&pipelinespec.ExecutorInput{
66+
Inputs: &pipelinespec.ExecutorInput_Inputs{
67+
ParameterValues: map[string]*structpb.Value{"a": structpb.NewNumberValue(1), "b": structpb.NewNumberValue(2)},
68+
},
69+
Outputs: &pipelinespec.ExecutorInput_Outputs{
70+
Artifacts: map[string]*pipelinespec.ArtifactList{
71+
"dataset": {
72+
Artifacts: []*pipelinespec.RuntimeArtifact{
73+
{
74+
Uri: uri,
75+
CustomPath: &custom,
76+
Type: &pipelinespec.ArtifactTypeSchema{
77+
Kind: &pipelinespec.ArtifactTypeSchema_InstanceSchema{InstanceSchema: "title: kfp.Model\ntype: object\nproperties:\n framework:\n type: string\n framework_version:\n type: string\n"},
78+
},
79+
},
80+
},
81+
},
82+
},
83+
},
84+
},
85+
[]string{"-c", "echo \"{{$.inputs.parameters['content']}}\" > {{$.outputs.artifacts['data'].path}}"},
86+
false,
87+
},
88+
{
89+
"default path",
90+
&pipelinespec.ExecutorInput{
91+
Inputs: &pipelinespec.ExecutorInput_Inputs{
92+
ParameterValues: map[string]*structpb.Value{"b": structpb.NewNumberValue(2)},
93+
},
94+
Outputs: &pipelinespec.ExecutorInput_Outputs{
95+
Artifacts: map[string]*pipelinespec.ArtifactList{
96+
"dataset": {
97+
Artifacts: []*pipelinespec.RuntimeArtifact{
98+
{
99+
Uri: uri,
100+
CustomPath: &custom,
101+
Type: &pipelinespec.ArtifactTypeSchema{
102+
Kind: &pipelinespec.ArtifactTypeSchema_InstanceSchema{InstanceSchema: "title: kfp.Model\ntype: object\nproperties:\n framework:\n type: string\n framework_version:\n type: string\n"},
103+
},
104+
},
105+
},
106+
},
107+
},
108+
},
109+
},
110+
[]string{"-c", "echo 'ok' > {{$.outputs.artifacts['artifact'].path}}"},
111+
false,
112+
},
113+
}
114+
115+
for _, test := range tests {
116+
t.Run(test.name, func(t *testing.T) {
117+
fakeKubernetesClientset := &fake.Clientset{}
118+
fakeMetadataClient := metadata.NewFakeClient()
119+
bucket, err := blob.OpenBucket(context.Background(), "mem://test-bucket")
120+
assert.Nil(t, err)
121+
bucketConfig, err := objectstore.ParseBucketConfig("mem://test-bucket/pipeline-root/", nil)
122+
assert.Nil(t, err)
123+
_, _, err = executeV2(
124+
context.Background(),
125+
test.executorInput,
126+
addNumbersComponent,
127+
"sh",
128+
test.executorArgs,
129+
bucket,
130+
bucketConfig,
131+
fakeMetadataClient,
132+
"namespace",
133+
fakeKubernetesClientset,
134+
"false",
135+
)
136+
//
137+
//if test.wantErr {
138+
// assert.NotNil(t, err)
139+
//} else {
140+
// assert.Nil(t, err)
141+
//}
142+
assert.NotEmpty(t, bucket)
143+
})
144+
}
145+
}
146+
53147
// Tests that launcher correctly executes the user component and successfully writes output parameters to file.
54148
func Test_executeV2_Parameters(t *testing.T) {
55149
tests := []struct {

sdk/python/kfp/dsl/executor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ def write_executor_output(self,
287287
'name': artifact.name,
288288
'uri': artifact.uri,
289289
'metadata': artifact.metadata,
290+
'custom_path': artifact.custom_path
290291
}
291292
artifacts_list = {'artifacts': [runtime_artifact]}
292293

sdk/python/kfp/dsl/types/artifact_types.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ def __init__(self,
8282
self.uri = uri or ''
8383
self.name = name or ''
8484
self.metadata = metadata or {}
85+
self.custom_path = ''
8586

8687
@property
8788
def path(self) -> str:
@@ -92,6 +93,8 @@ def path(self, path: str) -> None:
9293
self._set_path(path)
9394

9495
def _get_path(self) -> Optional[str]:
96+
if self.custom_path is not '':
97+
return self.custom_path
9598
if self.uri.startswith(RemotePrefix.GCS.value):
9699
return _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value
97100
):]
@@ -111,6 +114,14 @@ def _get_path(self) -> Optional[str]:
111114
def _set_path(self, path: str) -> None:
112115
self.uri = convert_local_path_to_remote_path(path)
113116

117+
@property
118+
def custom_path(self) -> str:
119+
return self._custom_path
120+
121+
@custom_path.setter
122+
def custom_path(self, value):
123+
self._custom_path = value
124+
114125

115126
def convert_local_path_to_remote_path(path: str) -> str:
116127
if path.startswith(_GCS_LOCAL_MOUNT_PREFIX):
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from kfp import dsl
2+
from kfp.dsl import Output
3+
from kfp.dsl.types.artifact_types import Artifact
4+
5+
6+
@dsl.component
7+
def create_list() -> list:
8+
return [1, 2, 3, 4]
9+
10+
11+
@dsl.component
12+
def append_to_list(digit: int, input_list: Output[Artifact]) -> list:
13+
input_list.append(digit)
14+
return input_list
15+
16+
17+
@dsl.component
18+
def validate_custom_path(exp_path: str, input_list: Output[Artifact]) -> bool:
19+
#todo: is this the correct comparison? (or should use != instead?)
20+
if input_list.path is not exp_path:
21+
raise ValueError(
22+
f"File uri is {input_list.path} but should be {exp_path}.")
23+
24+
25+
@dsl.pipeline
26+
def pipeline_with_custom_path_artifact():
27+
task1 = create_list()
28+
task1.output.set_custom_path('/etc/test/file/path')
29+
task2 = validate_custom_path(
30+
path='/etc/test/file/path', input_list=task1.output)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from sdk.python.kfp.dsl import Output
2+
from sdk.python.kfp.dsl.types.artifact_types import Artifact
3+
from sdk.python.kfp import dsl
4+
5+
@dsl.component
6+
def create_list() -> list:
7+
return [1, 2, 3, 4]
8+
9+
@dsl.component
10+
def append_to_list(digit: int, input_list: Output[Artifact]) -> list:
11+
input_list.append(digit)
12+
return input_list
13+
14+
@dsl.component
15+
def validate_custom_path(exp_path: str, input_list: Output[Artifact]) -> bool:
16+
#todo: is this the correct comparison? (or should use != instead?)
17+
if input_list.path is not exp_path:
18+
raise ValueError(f"File uri is {input_list.path} but should be {exp_path}.")
19+
20+
def component(artifact: Output[Artifact]) -> bool:
21+
return True
22+
23+
@dsl.pipeline
24+
def pipeline_with_custom_path_artifact():
25+
task1 = create_list()
26+
task1.output.set_custom_path('/etc/test/file/path')
27+
# task2 = validate_custom_path(path='/etc/test/file/path', input_list=task1.output)

0 commit comments

Comments
 (0)