Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
* None.

### New Features
* None.
* Added basic client method `run_ingestor` to run ingestors via EAS's `executeIngestor` graphql mutation.
* Added basic client methods `get_ingestor_run` and `get_ingestor_run_list` to retrieve the records of previous ingestor runs.

### Enhancements
* None.
Expand Down
171 changes: 170 additions & 1 deletion src/zepben/eas/client/eas_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from http import HTTPStatus
from json import dumps
from typing import Optional, List
from dataclasses import asdict

import aiohttp
from aiohttp import ClientSession
Expand All @@ -20,6 +21,7 @@
from zepben.eas.client.feeder_load_analysis_input import FeederLoadAnalysisInput
from zepben.eas.client.opendss import OpenDssConfig, GetOpenDssModelsFilterInput, GetOpenDssModelsSortCriteriaInput
from zepben.eas.client.study import Study
from zepben.eas.client.ingestor import IngestorConfigInput, IngestorRunsFilterInput, IngestorRunsSortCriteriaInput
from zepben.eas.client.util import construct_url
from zepben.eas.client.work_package import WorkPackageConfig, FixedTime, TimePeriod, ForecastConfig, FeederConfigs

Expand Down Expand Up @@ -861,6 +863,49 @@ async def async_upload_study(self, study: Study):
}
}
}
if self._verify_certificate:
sslcontext = ssl.create_default_context(cafile=self._ca_filename)

async with self.session.post(
construct_url(protocol=self._protocol, host=self._host, port=self._port, path="/api/graphql"),
headers=self._get_request_headers(),
json=json,
ssl=sslcontext if self._verify_certificate else False
) as response:
if response.ok:
return await response.json()
else:
response.raise_for_status()

def run_ingestor(self, run_config: List[IngestorConfigInput]):
"""
Send request to perform an ingestor run
:param run_config: A list of IngestorConfigInput
:return: The HTTP response received from the Evolve App Server after attempting to run the ingestor
"""
return get_event_loop().run_until_complete(
self.async_run_ingestor(run_config))

async def async_run_ingestor(self, run_config: List[IngestorConfigInput]):
"""
Send asynchronous request to perform an ingestor run
:param run_config: A list of IngestorConfigInput
:return: The HTTP response received from the Evolve App Server after attempting to run the ingestor
"""
with warnings.catch_warnings():
if not self._verify_certificate:
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
json = {
"query": """
mutation executeIngestor($runConfig: [IngestorConfigInput!]) {
executeIngestor(runConfig: $runConfig)
}
""",
"variables": {
"runConfig": [asdict(x) for x in run_config],
}
}

if self._verify_certificate:
sslcontext = ssl.create_default_context(cafile=self._ca_filename)

Expand All @@ -875,7 +920,131 @@ async def async_upload_study(self, study: Study):
else:
response.raise_for_status()

def run_hosting_capacity_calibration(self, calibration_name: str, local_calibration_time: datetime, feeders: Optional[List[str]] = None):
def get_ingestor_run(self, ingestor_run_id: int):
"""
Send request to retrieve the record of a particular ingestor run.
:param ingestor_run_id: The ID of the ingestor run to retrieve execution information about.
:return: The HTTP response received from the Evolve App Server including the ingestor run information (if found).
"""
return get_event_loop().run_until_complete(
self.async_get_ingestor_run(ingestor_run_id))

async def async_get_ingestor_run(self, ingestor_run_id: int):
"""
Send asynchronous request to retrieve the record of a particular ingestor run.
:param ingestor_run_id: The ID of the ingestor run to retrieve execution information about.
:return: The HTTP response received from the Evolve App Server including the ingestor run information (if found).
"""
with warnings.catch_warnings():
if not self._verify_certificate:
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
json = {
"query": """
query getIngestorRun($id: Int!) {
getIngestorRun(id: $id) {
id
containerRuntimeType,
payload,
token,
status,
startedAt,
statusLastUpdatedAt,
completedAt
}
}
""",
"variables": {
"id": ingestor_run_id,
}
}

if self._verify_certificate:
sslcontext = ssl.create_default_context(cafile=self._ca_filename)

async with self.session.post(
construct_url(protocol=self._protocol, host=self._host, port=self._port, path="/api/graphql"),
headers=self._get_request_headers(),
json=json,
ssl=sslcontext if self._verify_certificate else False
) as response:
if response.ok:
return await response.json()
else:
raise response.raise_for_status()

def get_ingestor_run_list(self, query_filter: Optional[IngestorRunsFilterInput] = None,
query_sort: Optional[IngestorRunsSortCriteriaInput] = None):
"""
Send request to retrieve a list of ingestor run records matching the provided filter parameters.
:param query_filter: An `IngestorRunsFilterInput` object. Only records matching the provided values will be returned.
If not supplied all records will be returned. (Optional)
:param query_sort: An `IngestorRunsSortCriteriaInput` that can control the order of the returned record based on a number of fields. (Optional)
:return: The HTTP response received from the Evolve App Server including all matching ingestor records found.
"""
return get_event_loop().run_until_complete(
self.async_get_ingestor_run_list(query_filter, query_sort))

async def async_get_ingestor_run_list(self, query_filter: Optional[IngestorRunsFilterInput] = None,
query_sort: Optional[IngestorRunsSortCriteriaInput] = None):
"""
Send asynchronous request to retrieve a list of ingestor run records matching the provided filter parameters.
:param query_filter: An `IngestorRunsFilterInput` object. Only records matching the provided values will be returned.
If not supplied all records will be returned. (Optional)
:param query_sort: An `IngestorRunsSortCriteriaInput` that can control the order of the returned record based on a number of fields. (Optional)
:return: The HTTP response received from the Evolve App Server including all matching ingestor records found.
"""

with warnings.catch_warnings():
if not self._verify_certificate:
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
json = {
"query": """
query listIngestorRuns($filter: IngestorRunsFilterInput, $sort: IngestorRunsSortCriteriaInput) {
listIngestorRuns(filter: $filter, sort: $sort) {
id
containerRuntimeType,
payload,
token,
status,
startedAt,
statusLastUpdatedAt,
completedAt
}
}
""",
"variables": {
**({"filter": {
"id": query_filter.id,
"status": query_filter.status and [state.name for state in query_filter.status],
"completed": query_filter.completed,
"containerRuntimeType": query_filter.container_runtime_type and [runtime.name for runtime in
query_filter.container_runtime_type]
}} if query_filter else {}),
**({"sort": {
"status": query_sort.status and query_sort.status.name,
"startedAt": query_sort.started_at and query_sort.started_at.name,
"statusLastUpdatedAt": query_sort.status_last_updated_at and query_sort.status_last_updated_at.name,
"completedAt": query_sort.completed_at and query_sort.completed_at.name,
"containerRuntimeType": query_sort.container_runtime_type and query_sort.container_runtime_type.name,
}} if query_sort else {})
}
}

if self._verify_certificate:
sslcontext = ssl.create_default_context(cafile=self._ca_filename)

async with self.session.post(
construct_url(protocol=self._protocol, host=self._host, port=self._port, path="/api/graphql"),
headers=self._get_request_headers(),
json=json,
ssl=sslcontext if self._verify_certificate else False
) as response:
if response.ok:
return await response.json()
else:
raise response.raise_for_status()

def run_hosting_capacity_calibration(self, calibration_name: str, local_calibration_time: Optional[str] = None, feeders: Optional[List[str]] = None):
"""
Send request to run hosting capacity calibration
:param calibration_name: A string representation of the calibration name
Expand Down
72 changes: 72 additions & 0 deletions src/zepben/eas/client/ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright 2025 Zeppelin Bend Pty Ltd
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
from dataclasses import dataclass
from enum import Enum
from typing import Optional, List
from datetime import datetime

__all__ = [
"IngestorConfigInput", "IngestorRuntimeKind", "IngestorRunState", "IngestorRun", "IngestorRunsFilterInput", "Order",
"IngestorRunsSortCriteriaInput"
]


@dataclass
class IngestorConfigInput:
key: str
value: str


class IngestorRuntimeKind(Enum):
AZURE_CONTAINER_APP_JOB = "AZURE_CONTAINER_APP_JOB"
DOCKER = "DOCKER"
ECS = "ECS"
KUBERNETES = "KUBERNETES"
TEMPORAL_KUBERNETES = "TEMPORAL_KUBERNETES"


class IngestorRunState(Enum):
INITIALIZED = "INITIALIZED"
QUEUED = "QUEUED"
STARTED = "STARTED"
RUNNING = "RUNNING"
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"
FAILED_TO_START = "FAILED_TO_START"


@dataclass
class IngestorRun:
id: int
container_runtime_type: Optional[IngestorRuntimeKind]
payload: str
token: str
status: IngestorRunState
started_at: datetime
status_last_updated_at: Optional[datetime]
completedAt: Optional[datetime]


@dataclass
class IngestorRunsFilterInput:
id: Optional[int] = None
status: Optional[List[IngestorRunState]] = None
completed: Optional[bool] = None
container_runtime_type: Optional[List[IngestorRuntimeKind]] = None


class Order(Enum):
ASC = "ASC"
DESC = "DESC"


@dataclass
class IngestorRunsSortCriteriaInput:
status: Optional[Order] = None
started_at: Optional[Order] = None
status_last_updated_at: Optional[Order] = None
completed_at: Optional[Order] = None
container_runtime_type: Optional[Order] = None
Loading