diff --git a/.gitignore b/.gitignore index b36301c7c44..516c604448a 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,7 @@ R/.Rbuildignore .DS_Store .env +.venv node_modules main.js.map diff --git a/metaflow/plugins/argo/argo_client.py b/metaflow/plugins/argo/argo_client.py index 692c0d1a9e6..7cd48504682 100644 --- a/metaflow/plugins/argo/argo_client.py +++ b/metaflow/plugins/argo/argo_client.py @@ -309,7 +309,9 @@ def trigger_workflow_template(self, name, usertype, username, parameters={}): json.loads(e.body)["message"] if e.body is not None else e.reason ) - def schedule_workflow_template(self, name, schedule=None, timezone=None): + def schedule_workflow_template( + self, name, schedule=None, timezone=None, concurrency_policy=None + ): # Unfortunately, Kubernetes client does not handle optimistic # concurrency control by itself unlike kubectl client = self._client.get() @@ -321,6 +323,7 @@ def schedule_workflow_template(self, name, schedule=None, timezone=None): "suspend": schedule is None, "schedule": schedule, "timezone": timezone, + "concurrencyPolicy": concurrency_policy, "failedJobsHistoryLimit": 10000, # default is unfortunately 1 "successfulJobsHistoryLimit": 10000, # default is unfortunately 3 "workflowSpec": {"workflowTemplateRef": {"name": name}}, diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index 87b6d9f80f4..29d98b45fb9 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -175,7 +175,7 @@ def __init__( self.parameters = self._process_parameters() self.config_parameters = self._process_config_parameters() self.triggers, self.trigger_options = self._process_triggers() - self._schedule, self._timezone = self._get_schedule() + self._schedule, self._timezone, self._concurrency_policy = self._get_schedule() self._base_labels = self._base_kubernetes_labels() self._base_annotations = self._base_kubernetes_annotations() @@ -386,14 +386,18 @@ def _get_schedule(self): if schedule: # Remove the field "Year" if it exists schedule = schedule[0] - return " ".join(schedule.schedule.split()[:5]), schedule.timezone - return None, None + return ( + " ".join(schedule.schedule.split()[:5]), + schedule.timezone, + schedule.concurrency_policy, + ) + return None, None, None def schedule(self): try: argo_client = ArgoClient(namespace=KUBERNETES_NAMESPACE) argo_client.schedule_workflow_template( - self.name, self._schedule, self._timezone + self.name, self._schedule, self._timezone, self._concurrency_policy ) # Register sensor. # Metaflow will overwrite any existing sensor. @@ -735,7 +739,13 @@ def _compile_workflow_template(self): # hence configuring it to an empty string if self._timezone is None: self._timezone = "" - cron_info = {"schedule": self._schedule, "tz": self._timezone} + if self._concurrency_policy is None: + self._concurrency_policy = "" + cron_info = { + "schedule": self._schedule, + "tz": self._timezone, + "concurrency_policy": self._concurrency_policy, + } annotations.update({"metaflow/cron": json.dumps(cron_info)}) if self.parameters: diff --git a/metaflow/plugins/aws/step_functions/schedule_decorator.py b/metaflow/plugins/aws/step_functions/schedule_decorator.py index d5f977538f6..3bb428e5dca 100644 --- a/metaflow/plugins/aws/step_functions/schedule_decorator.py +++ b/metaflow/plugins/aws/step_functions/schedule_decorator.py @@ -30,6 +30,7 @@ class ScheduleDecorator(FlowDecorator): "daily": True, "hourly": False, "timezone": None, + "concurrency_policy": None, } def flow_init( @@ -50,3 +51,4 @@ def flow_init( # Argo Workflows supports the IANA timezone standard, e.g. America/Los_Angeles self.timezone = self.attributes["timezone"] + self.concurrency_policy = self.attributes["concurrency_policy"]