diff --git a/changelog.md b/changelog.md index fe84b2d..ad7e680 100644 --- a/changelog.md +++ b/changelog.md @@ -4,7 +4,8 @@ * None. ### New Features -* None. +* Added basic client method `run_ingestor` to run ingestors via EAS's `executeIngestor` graphql mutation. +* Added basic client methods `get_ingestor_run` and `get_ingestor_run_list` to retrieve the records of previous ingestor runs. ### Enhancements * None. diff --git a/src/zepben/eas/client/eas_client.py b/src/zepben/eas/client/eas_client.py index 8eec065..8eb9885 100644 --- a/src/zepben/eas/client/eas_client.py +++ b/src/zepben/eas/client/eas_client.py @@ -11,6 +11,7 @@ from http import HTTPStatus from json import dumps from typing import Optional, List +from dataclasses import asdict import aiohttp from aiohttp import ClientSession @@ -20,6 +21,7 @@ from zepben.eas.client.feeder_load_analysis_input import FeederLoadAnalysisInput from zepben.eas.client.opendss import OpenDssConfig, GetOpenDssModelsFilterInput, GetOpenDssModelsSortCriteriaInput from zepben.eas.client.study import Study +from zepben.eas.client.ingestor import IngestorConfigInput, IngestorRunsFilterInput, IngestorRunsSortCriteriaInput from zepben.eas.client.util import construct_url from zepben.eas.client.work_package import WorkPackageConfig, FixedTime, TimePeriod, ForecastConfig, FeederConfigs @@ -861,6 +863,49 @@ async def async_upload_study(self, study: Study): } } } + if self._verify_certificate: + sslcontext = ssl.create_default_context(cafile=self._ca_filename) + + async with self.session.post( + construct_url(protocol=self._protocol, host=self._host, port=self._port, path="/api/graphql"), + headers=self._get_request_headers(), + json=json, + ssl=sslcontext if self._verify_certificate else False + ) as response: + if response.ok: + return await response.json() + else: + response.raise_for_status() + + def run_ingestor(self, run_config: List[IngestorConfigInput]): + """ + Send request to perform an ingestor run + :param run_config: A list of IngestorConfigInput + :return: The HTTP response received from the Evolve App Server after attempting to run the ingestor + """ + return get_event_loop().run_until_complete( + self.async_run_ingestor(run_config)) + + async def async_run_ingestor(self, run_config: List[IngestorConfigInput]): + """ + Send asynchronous request to perform an ingestor run + :param run_config: A list of IngestorConfigInput + :return: The HTTP response received from the Evolve App Server after attempting to run the ingestor + """ + with warnings.catch_warnings(): + if not self._verify_certificate: + warnings.filterwarnings("ignore", category=InsecureRequestWarning) + json = { + "query": """ + mutation executeIngestor($runConfig: [IngestorConfigInput!]) { + executeIngestor(runConfig: $runConfig) + } + """, + "variables": { + "runConfig": [asdict(x) for x in run_config], + } + } + if self._verify_certificate: sslcontext = ssl.create_default_context(cafile=self._ca_filename) @@ -875,7 +920,131 @@ async def async_upload_study(self, study: Study): else: response.raise_for_status() - def run_hosting_capacity_calibration(self, calibration_name: str, local_calibration_time: datetime, feeders: Optional[List[str]] = None): + def get_ingestor_run(self, ingestor_run_id: int): + """ + Send request to retrieve the record of a particular ingestor run. + :param ingestor_run_id: The ID of the ingestor run to retrieve execution information about. + :return: The HTTP response received from the Evolve App Server including the ingestor run information (if found). + """ + return get_event_loop().run_until_complete( + self.async_get_ingestor_run(ingestor_run_id)) + + async def async_get_ingestor_run(self, ingestor_run_id: int): + """ + Send asynchronous request to retrieve the record of a particular ingestor run. + :param ingestor_run_id: The ID of the ingestor run to retrieve execution information about. + :return: The HTTP response received from the Evolve App Server including the ingestor run information (if found). + """ + with warnings.catch_warnings(): + if not self._verify_certificate: + warnings.filterwarnings("ignore", category=InsecureRequestWarning) + json = { + "query": """ + query getIngestorRun($id: Int!) { + getIngestorRun(id: $id) { + id + containerRuntimeType, + payload, + token, + status, + startedAt, + statusLastUpdatedAt, + completedAt + } + } + """, + "variables": { + "id": ingestor_run_id, + } + } + + if self._verify_certificate: + sslcontext = ssl.create_default_context(cafile=self._ca_filename) + + async with self.session.post( + construct_url(protocol=self._protocol, host=self._host, port=self._port, path="/api/graphql"), + headers=self._get_request_headers(), + json=json, + ssl=sslcontext if self._verify_certificate else False + ) as response: + if response.ok: + return await response.json() + else: + raise response.raise_for_status() + + def get_ingestor_run_list(self, query_filter: Optional[IngestorRunsFilterInput] = None, + query_sort: Optional[IngestorRunsSortCriteriaInput] = None): + """ + Send request to retrieve a list of ingestor run records matching the provided filter parameters. + :param query_filter: An `IngestorRunsFilterInput` object. Only records matching the provided values will be returned. + If not supplied all records will be returned. (Optional) + :param query_sort: An `IngestorRunsSortCriteriaInput` that can control the order of the returned record based on a number of fields. (Optional) + :return: The HTTP response received from the Evolve App Server including all matching ingestor records found. + """ + return get_event_loop().run_until_complete( + self.async_get_ingestor_run_list(query_filter, query_sort)) + + async def async_get_ingestor_run_list(self, query_filter: Optional[IngestorRunsFilterInput] = None, + query_sort: Optional[IngestorRunsSortCriteriaInput] = None): + """ + Send asynchronous request to retrieve a list of ingestor run records matching the provided filter parameters. + :param query_filter: An `IngestorRunsFilterInput` object. Only records matching the provided values will be returned. + If not supplied all records will be returned. (Optional) + :param query_sort: An `IngestorRunsSortCriteriaInput` that can control the order of the returned record based on a number of fields. (Optional) + :return: The HTTP response received from the Evolve App Server including all matching ingestor records found. + """ + + with warnings.catch_warnings(): + if not self._verify_certificate: + warnings.filterwarnings("ignore", category=InsecureRequestWarning) + json = { + "query": """ + query listIngestorRuns($filter: IngestorRunsFilterInput, $sort: IngestorRunsSortCriteriaInput) { + listIngestorRuns(filter: $filter, sort: $sort) { + id + containerRuntimeType, + payload, + token, + status, + startedAt, + statusLastUpdatedAt, + completedAt + } + } + """, + "variables": { + **({"filter": { + "id": query_filter.id, + "status": query_filter.status and [state.name for state in query_filter.status], + "completed": query_filter.completed, + "containerRuntimeType": query_filter.container_runtime_type and [runtime.name for runtime in + query_filter.container_runtime_type] + }} if query_filter else {}), + **({"sort": { + "status": query_sort.status and query_sort.status.name, + "startedAt": query_sort.started_at and query_sort.started_at.name, + "statusLastUpdatedAt": query_sort.status_last_updated_at and query_sort.status_last_updated_at.name, + "completedAt": query_sort.completed_at and query_sort.completed_at.name, + "containerRuntimeType": query_sort.container_runtime_type and query_sort.container_runtime_type.name, + }} if query_sort else {}) + } + } + + if self._verify_certificate: + sslcontext = ssl.create_default_context(cafile=self._ca_filename) + + async with self.session.post( + construct_url(protocol=self._protocol, host=self._host, port=self._port, path="/api/graphql"), + headers=self._get_request_headers(), + json=json, + ssl=sslcontext if self._verify_certificate else False + ) as response: + if response.ok: + return await response.json() + else: + raise response.raise_for_status() + + def run_hosting_capacity_calibration(self, calibration_name: str, local_calibration_time: Optional[str] = None, feeders: Optional[List[str]] = None): """ Send request to run hosting capacity calibration :param calibration_name: A string representation of the calibration name diff --git a/src/zepben/eas/client/ingestor.py b/src/zepben/eas/client/ingestor.py new file mode 100644 index 0000000..d384696 --- /dev/null +++ b/src/zepben/eas/client/ingestor.py @@ -0,0 +1,72 @@ +# Copyright 2025 Zeppelin Bend Pty Ltd +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +from dataclasses import dataclass +from enum import Enum +from typing import Optional, List +from datetime import datetime + +__all__ = [ + "IngestorConfigInput", "IngestorRuntimeKind", "IngestorRunState", "IngestorRun", "IngestorRunsFilterInput", "Order", + "IngestorRunsSortCriteriaInput" +] + + +@dataclass +class IngestorConfigInput: + key: str + value: str + + +class IngestorRuntimeKind(Enum): + AZURE_CONTAINER_APP_JOB = "AZURE_CONTAINER_APP_JOB" + DOCKER = "DOCKER" + ECS = "ECS" + KUBERNETES = "KUBERNETES" + TEMPORAL_KUBERNETES = "TEMPORAL_KUBERNETES" + + +class IngestorRunState(Enum): + INITIALIZED = "INITIALIZED" + QUEUED = "QUEUED" + STARTED = "STARTED" + RUNNING = "RUNNING" + SUCCESS = "SUCCESS" + FAILURE = "FAILURE" + FAILED_TO_START = "FAILED_TO_START" + + +@dataclass +class IngestorRun: + id: int + container_runtime_type: Optional[IngestorRuntimeKind] + payload: str + token: str + status: IngestorRunState + started_at: datetime + status_last_updated_at: Optional[datetime] + completedAt: Optional[datetime] + + +@dataclass +class IngestorRunsFilterInput: + id: Optional[int] = None + status: Optional[List[IngestorRunState]] = None + completed: Optional[bool] = None + container_runtime_type: Optional[List[IngestorRuntimeKind]] = None + + +class Order(Enum): + ASC = "ASC" + DESC = "DESC" + + +@dataclass +class IngestorRunsSortCriteriaInput: + status: Optional[Order] = None + started_at: Optional[Order] = None + status_last_updated_at: Optional[Order] = None + completed_at: Optional[Order] = None + container_runtime_type: Optional[Order] = None diff --git a/test/test_eas_client.py b/test/test_eas_client.py index a66274a..65e96ec 100644 --- a/test/test_eas_client.py +++ b/test/test_eas_client.py @@ -19,6 +19,8 @@ from zepben.eas import EasClient, Study, SolveConfig from zepben.eas import FeederConfig, ForecastConfig, FixedTimeLoadOverride +from zepben.eas.client.ingestor import IngestorConfigInput, IngestorRunsSortCriteriaInput, IngestorRunsFilterInput, \ + IngestorRunState, IngestorRuntimeKind from zepben.eas.client.opendss import OpenDssConfig, GetOpenDssModelsFilterInput, OpenDssModelState, \ GetOpenDssModelsSortCriteriaInput, \ Order @@ -1283,3 +1285,155 @@ def test_get_opendss_model_download_url_valid_certificate_success(ca: trustme.CA res = eas_client.get_opendss_model_download_url(1) httpserver.check_assertions() assert res == "https://example.com/download/1" + + +def run_ingestor_request_handler(request): + actual_body = json.loads(request.data.decode()) + query = " ".join(actual_body['query'].split()) + + assert query == "mutation executeIngestor($runConfig: [IngestorConfigInput!]) { executeIngestor(runConfig: $runConfig) }" + assert actual_body['variables'] == {'runConfig': [{'key': 'random.config', 'value': 'random.value'}, + {'key': 'dataStorePath', 'value': '/some/place/with/data'}]} + + return Response(json.dumps({"executeIngestor": 5}), status=200, content_type="application/json") + + +def test_run_ingestor_no_verify_success(httpserver: HTTPServer): + eas_client = EasClient( + LOCALHOST, + httpserver.port, + verify_certificate=False + ) + + httpserver.expect_oneshot_request("/api/graphql").respond_with_handler( + run_ingestor_request_handler) + res = eas_client.run_ingestor([IngestorConfigInput("random.config", "random.value"), + IngestorConfigInput("dataStorePath", "/some/place/with/data")]) + httpserver.check_assertions() + assert res == {"executeIngestor": 5} + + +def get_ingestor_run_request_handler(request): + actual_body = json.loads(request.data.decode()) + query = " ".join(actual_body['query'].split()) + + assert query == "query getIngestorRun($id: Int!) { getIngestorRun(id: $id) { id containerRuntimeType, payload, token, status, startedAt, statusLastUpdatedAt, completedAt } }" + assert actual_body['variables'] == {"id": 1} + + return Response(json.dumps({"result": "success"}), status=200, content_type="application/json") + + +def test_get_ingestor_run_no_verify_success(httpserver: HTTPServer): + eas_client = EasClient( + LOCALHOST, + httpserver.port, + verify_certificate=False + ) + + httpserver.expect_oneshot_request("/api/graphql").respond_with_handler(get_ingestor_run_request_handler) + res = eas_client.get_ingestor_run(1) + httpserver.check_assertions() + assert res == {"result": "success"} + + +def get_ingestor_run_list_request_empty_handler(request): + actual_body = json.loads(request.data.decode()) + query = " ".join(actual_body['query'].split()) + + get_ingestor_run_list_query = """ + query listIngestorRuns($filter: IngestorRunsFilterInput, $sort: IngestorRunsSortCriteriaInput) { + listIngestorRuns(filter: $filter, sort: $sort) { + id + containerRuntimeType, + payload, + token, + status, + startedAt, + statusLastUpdatedAt, + completedAt + } + } + """ + assert query == " ".join(line.strip() for line in get_ingestor_run_list_query.strip().splitlines()) + assert actual_body['variables'] == {} + + return Response(json.dumps({"result": "success"}), status=200, content_type="application/json") + + +def test_get_ingestor_run_list_empty_filter_no_verify_success(httpserver: HTTPServer): + eas_client = EasClient( + LOCALHOST, + httpserver.port, + verify_certificate=False + ) + + httpserver.expect_oneshot_request("/api/graphql").respond_with_handler(get_ingestor_run_list_request_empty_handler) + res = eas_client.get_ingestor_run_list() + httpserver.check_assertions() + assert res == {"result": "success"} + + +def get_ingestor_run_list_request_complete_handler(request): + actual_body = json.loads(request.data.decode()) + query = " ".join(actual_body['query'].split()) + + get_ingestor_run_list_query = """ + query listIngestorRuns($filter: IngestorRunsFilterInput, $sort: IngestorRunsSortCriteriaInput) { + listIngestorRuns(filter: $filter, sort: $sort) { + id + containerRuntimeType, + payload, + token, + status, + startedAt, + statusLastUpdatedAt, + completedAt + } + } + """ + assert query == " ".join(line.strip() for line in get_ingestor_run_list_query.strip().splitlines()) + assert actual_body['variables'] == { + "filter": { + "id": 4, + "status": ["SUCCESS", "STARTED", "FAILED_TO_START"], + "completed": True, + "containerRuntimeType": ["TEMPORAL_KUBERNETES", "AZURE_CONTAINER_APP_JOB"] + }, + "sort": { + "status": "ASC", + "startedAt": "DESC", + "statusLastUpdatedAt": "ASC", + "completedAt": "DESC", + "containerRuntimeType": "ASC", + } + } + + return Response(json.dumps({"result": "success"}), status=200, content_type="application/json") + + +def test_get_ingestor_run_list_all_filters_no_verify_success(httpserver: HTTPServer): + eas_client = EasClient( + LOCALHOST, + httpserver.port, + verify_certificate=False + ) + + httpserver.expect_oneshot_request("/api/graphql").respond_with_handler(get_ingestor_run_list_request_complete_handler) + res = eas_client.get_ingestor_run_list( + query_filter=IngestorRunsFilterInput( + id=4, + status=[IngestorRunState.SUCCESS, IngestorRunState.STARTED, IngestorRunState.FAILED_TO_START], + completed=True, + container_runtime_type=[IngestorRuntimeKind.TEMPORAL_KUBERNETES, + IngestorRuntimeKind.AZURE_CONTAINER_APP_JOB] + ), + query_sort=IngestorRunsSortCriteriaInput( + status=Order.ASC, + started_at=Order.DESC, + status_last_updated_at=Order.ASC, + completed_at=Order.DESC, + container_runtime_type=Order.ASC + ) + ) + httpserver.check_assertions() + assert res == {"result": "success"}