@@ -125,6 +125,15 @@ def __init__(
125
125
self .sagemaker_session .boto_session .client ("scheduler" ),
126
126
)
127
127
128
+ @property
129
+ def latest_pipeline_version_id (self ):
130
+ """Retrieves the latest version id of this pipeline"""
131
+ summaries = self .list_pipeline_versions (max_results = 1 )["PipelineVersionSummaries" ]
132
+ if not summaries :
133
+ return None
134
+ else :
135
+ return summaries [0 ].get ("PipelineVersionId" )
136
+
128
137
def create (
129
138
self ,
130
139
role_arn : str = None ,
@@ -166,7 +175,8 @@ def create(
166
175
kwargs ,
167
176
Tags = tags ,
168
177
)
169
- return self .sagemaker_session .sagemaker_client .create_pipeline (** kwargs )
178
+ response = self .sagemaker_session .sagemaker_client .create_pipeline (** kwargs )
179
+ return response
170
180
171
181
def _create_args (
172
182
self , role_arn : str , description : str , parallelism_config : ParallelismConfiguration
@@ -214,15 +224,21 @@ def _create_args(
214
224
)
215
225
return kwargs
216
226
217
- def describe (self ) -> Dict [str , Any ]:
227
+ def describe (self , pipeline_version_id : int = None ) -> Dict [str , Any ]:
218
228
"""Describes a Pipeline in the Workflow service.
219
229
230
+ Args:
231
+ pipeline_version_id (Optional[str]): version ID of the pipeline to describe.
232
+
220
233
Returns:
221
234
Response dict from the service. See `boto3 client documentation
222
235
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/\
223
236
sagemaker.html#SageMaker.Client.describe_pipeline>`_
224
237
"""
225
- return self .sagemaker_session .sagemaker_client .describe_pipeline (PipelineName = self .name )
238
+ kwargs = dict (PipelineName = self .name )
239
+ if pipeline_version_id :
240
+ kwargs ["PipelineVersionId" ] = pipeline_version_id
241
+ return self .sagemaker_session .sagemaker_client .describe_pipeline (** kwargs )
226
242
227
243
def update (
228
244
self ,
@@ -257,7 +273,8 @@ def update(
257
273
return self .sagemaker_session .sagemaker_client .update_pipeline (self , description )
258
274
259
275
kwargs = self ._create_args (role_arn , description , parallelism_config )
260
- return self .sagemaker_session .sagemaker_client .update_pipeline (** kwargs )
276
+ response = self .sagemaker_session .sagemaker_client .update_pipeline (** kwargs )
277
+ return response
261
278
262
279
def upsert (
263
280
self ,
@@ -332,6 +349,7 @@ def start(
332
349
execution_description : str = None ,
333
350
parallelism_config : ParallelismConfiguration = None ,
334
351
selective_execution_config : SelectiveExecutionConfig = None ,
352
+ pipeline_version_id : int = None ,
335
353
):
336
354
"""Starts a Pipeline execution in the Workflow service.
337
355
@@ -345,6 +363,8 @@ def start(
345
363
over the parallelism configuration of the parent pipeline.
346
364
selective_execution_config (Optional[SelectiveExecutionConfig]): The configuration for
347
365
selective step execution.
366
+ pipeline_version_id (Optional[str]): version ID of the pipeline to start the execution from. If not
367
+ specified, uses the latest version ID.
348
368
349
369
Returns:
350
370
A `_PipelineExecution` instance, if successful.
@@ -366,6 +386,7 @@ def start(
366
386
PipelineExecutionDisplayName = execution_display_name ,
367
387
ParallelismConfiguration = parallelism_config ,
368
388
SelectiveExecutionConfig = selective_execution_config ,
389
+ PipelineVersionId = pipeline_version_id ,
369
390
)
370
391
if self .sagemaker_session .local_mode :
371
392
update_args (kwargs , PipelineParameters = parameters )
@@ -461,6 +482,32 @@ def list_executions(
461
482
if key in response
462
483
}
463
484
485
+ def list_pipeline_versions (
486
+ self , sort_order : str = None , max_results : int = None , next_token : str = None
487
+ ) -> str :
488
+ """Lists a pipeline's versions.
489
+
490
+ Args:
491
+ sort_order (str): The sort order for results (Ascending/Descending).
492
+ max_results (int): The maximum number of pipeline executions to return in the response.
493
+ next_token (str): If the result of the previous `ListPipelineExecutions` request was
494
+ truncated, the response includes a `NextToken`. To retrieve the next set of pipeline
495
+ executions, use the token in the next request.
496
+
497
+ Returns:
498
+ List of Pipeline Version Summaries. See
499
+ boto3 client list_pipeline_versions
500
+ https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker/client/list_pipeline_versions.html#
501
+ """
502
+ kwargs = dict (PipelineName = self .name )
503
+ update_args (
504
+ kwargs ,
505
+ SortOrder = sort_order ,
506
+ NextToken = next_token ,
507
+ MaxResults = max_results ,
508
+ )
509
+ return self .sagemaker_session .sagemaker_client .list_pipeline_versions (** kwargs )
510
+
464
511
def _get_latest_execution_arn (self ):
465
512
"""Retrieves the latest execution of this pipeline"""
466
513
response = self .list_executions (
@@ -855,7 +902,7 @@ def describe(self):
855
902
sagemaker.html#SageMaker.Client.describe_pipeline_execution>`_.
856
903
"""
857
904
return self .sagemaker_session .sagemaker_client .describe_pipeline_execution (
858
- PipelineExecutionArn = self .arn ,
905
+ PipelineExecutionArn = self .arn
859
906
)
860
907
861
908
def list_steps (self ):
0 commit comments