Skip to content

Commit d21ade2

Browse files
authored
python(feat): Plumb bytes through sift-py and sift_client. (#304)
1 parent aa16167 commit d21ade2

File tree

11 files changed

+59
-15
lines changed

11 files changed

+59
-15
lines changed

python/examples/ingestion_with_python_config/simulator.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from sift_py.ingestion.channel import (
99
bit_field_value,
10+
bytes_value,
1011
double_value,
1112
enum_value,
1213
int32_value,
@@ -100,6 +101,10 @@ def run(self):
100101
random.choice(self.sample_bit_field_values)
101102
),
102103
},
104+
{
105+
"channel_name": "raw_bin",
106+
"value": bytes_value(str(timestamp).encode("utf-8")),
107+
},
103108
],
104109
}
105110
)

python/examples/ingestion_with_python_config/telemetry_config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ def nostromos_lv_426() -> TelemetryConfig:
5555
ChannelBitFieldElement(name="heater", index=7, bit_count=1),
5656
],
5757
)
58+
raw_binary_channel = ChannelConfig(
59+
name="raw_bin",
60+
data_type=ChannelDataType.BYTES,
61+
description="Example of binary encoded data (binary string encoding of time in seconds)",
62+
)
5863

5964
return TelemetryConfig(
6065
asset_name="NostromoLV426",
@@ -66,6 +71,7 @@ def nostromos_lv_426() -> TelemetryConfig:
6671
voltage_channel,
6772
vehicle_state_channel,
6873
gpio_channel,
74+
raw_binary_channel,
6975
],
7076
),
7177
FlowConfig(

python/examples/ingestion_with_yaml_config/simulator.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from sift_py.ingestion.channel import (
99
bit_field_value,
10+
bytes_value,
1011
double_value,
1112
enum_value,
1213
int32_value,
@@ -39,7 +40,7 @@ def __init__(self, ingestion_service: IngestionService):
3940
sample_bit_field_values = ["00001001", "00100011", "00001101", "11000001"]
4041
self.sample_bit_field_values = [bytes([int(byte, 2)]) for byte in sample_bit_field_values]
4142

42-
sample_logs = Path().joinpath("sample_data").joinpath("sample_logs.txt")
43+
sample_logs = Path(__file__).parent.joinpath("sample_data").joinpath("sample_logs.txt")
4344

4445
with open(sample_logs, "r") as file:
4546
self.sample_logs = file.readlines()
@@ -100,6 +101,10 @@ def run(self):
100101
random.choice(self.sample_bit_field_values)
101102
),
102103
},
104+
{
105+
"channel_name": "raw_bin",
106+
"value": bytes_value(str(timestamp).encode("utf-8")),
107+
},
103108
],
104109
}
105110
)

python/examples/ingestion_with_yaml_config/telemetry_config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33

44
from sift_py.ingestion.service import TelemetryConfig
55

6-
TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")
6+
TELEMETRY_CONFIGS_DIR = Path(__file__).parent.joinpath("telemetry_configs")
77

88

99
def nostromos_lv_426() -> TelemetryConfig:
10-
telemetry_config_name = os.getenv("TELEMETRY_CONFIG")
10+
telemetry_config_name = os.getenv("TELEMETRY_CONFIG", "nostromo_lv_426.yml")
1111

1212
if telemetry_config_name is None:
1313
raise Exception("Missing 'TELEMETRY_CONFIG' environment variable.")

python/examples/ingestion_with_yaml_config/telemetry_configs/nostromo_lv_426.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
asset_name: NostromoLV426
2-
ingestion_client_key: nostromo_lv_426
32

43
channels:
54
log_channel: &log_channel
@@ -51,13 +50,19 @@ channels:
5150
index: 7
5251
bit_count: 1
5352

53+
raw_binary_channel: &raw_binary_channel
54+
name: raw_bin
55+
data_type: bytes
56+
description: Example of binary encoded data (binary string encoding of time in seconds)
57+
5458
flows:
5559
- name: readings
5660
channels:
5761
- <<: *velocity_channel
5862
- <<: *voltage_channel
5963
- <<: *vehicle_state_channel
6064
- <<: *gpio_channel
65+
- <<: *raw_binary_channel
6166

6267
- name: voltage
6368
channels:

python/lib/sift_client/_internal/low_level_wrappers/data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ async def get_channel_data(
240240
end_time: datetime | None = None,
241241
limit: int | None = None,
242242
ignore_cache: bool = False,
243-
):
243+
) -> dict[str, pd.DataFrame]:
244244
"""
245245
Get the data for a channel during a run.
246246
"""

python/lib/sift_client/resources/channels.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from datetime import datetime
55
from typing import TYPE_CHECKING, Dict, List
66

7-
import numpy as np
7+
import pandas as pd
88
import pyarrow as pa
99

1010
from sift_client._internal.low_level_wrappers.channels import ChannelsLowLevelClient
@@ -176,7 +176,7 @@ async def get_data(
176176
start_time: datetime | None = None,
177177
end_time: datetime | None = None,
178178
limit: int | None = None,
179-
) -> Dict[str, np.ndarray]:
179+
) -> Dict[str, pd.DataFrame]:
180180
"""
181181
Get data for one or more channels.
182182

python/lib/sift_client/resources/sync_stubs/__init__.pyi

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import re
66
from datetime import datetime
77
from typing import Any, Dict, List
88

9-
import numpy as np
9+
import pandas as pd
1010
import pyarrow as pa
1111

1212
from sift_client.client import SiftClient
@@ -428,7 +428,7 @@ class ChannelsAPI:
428428
start_time: datetime | None = None,
429429
end_time: datetime | None = None,
430430
limit: int | None = None,
431-
) -> Dict[str, np.ndarray]:
431+
) -> Dict[str, pd.DataFrame]:
432432
"""
433433
Get data for one or more channels.
434434

python/lib/sift_client/types/channel.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from sift.data.v2.data_pb2 import (
1515
BitFieldValues,
1616
BoolValues,
17+
BytesValues,
1718
DoubleValues,
1819
EnumValues,
1920
FloatValues,
@@ -45,6 +46,7 @@ class ChannelDataType(Enum):
4546
INT_64 = channel_pb.CHANNEL_DATA_TYPE_INT_64
4647
UINT_32 = channel_pb.CHANNEL_DATA_TYPE_UINT_32
4748
UINT_64 = channel_pb.CHANNEL_DATA_TYPE_UINT_64
49+
BYTES = channel_pb.CHANNEL_DATA_TYPE_BYTES
4850

4951
def __str__(self) -> str:
5052
ret = self.name.lower()
@@ -78,7 +80,7 @@ def from_str(raw: str) -> Optional["ChannelDataType"]:
7880
if item.__str__() == val:
7981
return item
8082
raise Exception(
81-
"Unreachable. ChannelTypeUrls and ChannelDataType enum names are out of sync."
83+
f"{raw} type not found. ChannelTypeUrls and ChannelDataType enum names are out of sync."
8284
)
8385
else:
8486
try:
@@ -111,6 +113,8 @@ def proto_data_class(data_type: ChannelDataType) -> Any:
111113
return Uint32Values
112114
elif data_type == ChannelDataType.UINT_64:
113115
return Uint64Values
116+
elif data_type == ChannelDataType.BYTES:
117+
return BytesValues
114118
else:
115119
raise ValueError(f"Unknown data type: {data_type}")
116120

@@ -138,6 +142,8 @@ def hash_str(self, api_format: bool = False) -> str:
138142
return "CHANNEL_DATA_TYPE_UINT_32" if api_format else ChannelDataType.UINT_32.__str__()
139143
elif self == ChannelDataType.UINT_64:
140144
return "CHANNEL_DATA_TYPE_UINT_64" if api_format else ChannelDataType.UINT_64.__str__()
145+
elif self == ChannelDataType.BYTES:
146+
return "CHANNEL_DATA_TYPE_BYTES" if api_format else ChannelDataType.BYTES.__str__()
141147
else:
142148
raise Exception("Unreachable.")
143149

@@ -249,7 +255,7 @@ def data(
249255
limit: The maximum number of data points to return.
250256
251257
Returns:
252-
A ChannelTimeSeries object.
258+
A dict of channel name to pandas DataFrame or Arrow Table object.
253259
"""
254260
if as_arrow:
255261
data = self.client.channels.get_data_as_arrow(

python/lib/sift_py/ingestion/channel.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def __init__(
7171
self.identifier = self.fqn()
7272

7373
def value_from(
74-
self, value: Optional[Union[int, float, bool, str]]
74+
self, value: Optional[Union[int, float, bool, str, bytes]]
7575
) -> Optional[IngestWithConfigDataChannelValue]:
7676
"""
7777
Like `try_value_from` except will return `None` there is a failure to produce a channel value due to a type mismatch.
@@ -82,7 +82,7 @@ def value_from(
8282
return None
8383

8484
def try_value_from(
85-
self, value: Optional[Union[int, float, bool, str]]
85+
self, value: Optional[Union[int, float, bool, str, bytes]]
8686
) -> IngestWithConfigDataChannelValue:
8787
"""
8888
Generate a channel value for this particular channel configuration. This will raise an exception
@@ -112,7 +112,8 @@ def try_value_from(
112112
return enum_value(int(value))
113113
elif isinstance(value, str) and self.data_type == ChannelDataType.STRING:
114114
return string_value(value)
115-
115+
elif isinstance(value, bytes) and self.data_type == ChannelDataType.BYTES:
116+
return bytes_value(value)
116117
raise ValueError(f"Failed to cast value of type {type(value)} to {self.data_type}")
117118

118119
def as_pb(self, klass: Type[ChannelConfigPb]) -> ChannelConfigPb:
@@ -209,6 +210,7 @@ class ChannelDataTypeStrRep(Enum):
209210
INT_64 = "int64"
210211
UINT_32 = "uint32"
211212
UINT_64 = "uint64"
213+
BYTES = "bytes"
212214

213215
@staticmethod
214216
def from_api_format(val: str) -> Optional["ChannelDataTypeStrRep"]:
@@ -224,6 +226,7 @@ def from_api_format(val: str) -> Optional["ChannelDataTypeStrRep"]:
224226
"CHANNEL_DATA_TYPE_INT_64": ChannelDataTypeStrRep.INT_64,
225227
"CHANNEL_DATA_TYPE_UINT_32": ChannelDataTypeStrRep.UINT_32,
226228
"CHANNEL_DATA_TYPE_UINT_64": ChannelDataTypeStrRep.UINT_64,
229+
"CHANNEL_DATA_TYPE_BYTES": ChannelDataTypeStrRep.BYTES,
227230
}[val]
228231
except KeyError:
229232
return None
@@ -244,6 +247,7 @@ class ChannelDataType(Enum):
244247
INT_64 = channel_pb.CHANNEL_DATA_TYPE_INT_64
245248
UINT_32 = channel_pb.CHANNEL_DATA_TYPE_UINT_32
246249
UINT_64 = channel_pb.CHANNEL_DATA_TYPE_UINT_64
250+
BYTES = channel_pb.CHANNEL_DATA_TYPE_BYTES
247251

248252
@classmethod
249253
def from_pb(cls, val: channel_pb.ChannelDataType.ValueType) -> "ChannelDataType":
@@ -267,6 +271,8 @@ def from_pb(cls, val: channel_pb.ChannelDataType.ValueType) -> "ChannelDataType"
267271
return cls.UINT_32
268272
elif val == cls.UINT_64.value:
269273
return cls.UINT_64
274+
elif val == cls.BYTES.value:
275+
return cls.BYTES
270276
else:
271277
raise ValueError(f"Unknown channel data type '{val}'.")
272278

@@ -302,6 +308,8 @@ def from_str(cls, raw: str) -> Optional["ChannelDataType"]:
302308
return cls.UINT_32
303309
elif val == ChannelDataTypeStrRep.UINT_64:
304310
return cls.UINT_64
311+
elif val == ChannelDataTypeStrRep.BYTES:
312+
return cls.BYTES
305313
else:
306314
raise Exception("Unreachable")
307315

@@ -334,6 +342,8 @@ def as_human_str(self, api_format: bool = False) -> str:
334342
return (
335343
"CHANNEL_DATA_TYPE_UINT_64" if api_format else ChannelDataTypeStrRep.UINT_64.value
336344
)
345+
elif self == ChannelDataType.BYTES:
346+
return "CHANNEL_DATA_TYPE_BYTES" if api_format else ChannelDataTypeStrRep.BYTES.value
337347
else:
338348
raise Exception("Unreachable.")
339349

@@ -421,6 +431,10 @@ def empty_value() -> IngestWithConfigDataChannelValue:
421431
return IngestWithConfigDataChannelValue(empty=Empty())
422432

423433

434+
def bytes_value(val: bytes) -> IngestWithConfigDataChannelValue:
435+
return IngestWithConfigDataChannelValue(bytes=val)
436+
437+
424438
def is_data_type(val: IngestWithConfigDataChannelValue, target_type: ChannelDataType) -> bool:
425439
if target_type == ChannelDataType.DOUBLE:
426440
return val.HasField("double")
@@ -442,3 +456,6 @@ def is_data_type(val: IngestWithConfigDataChannelValue, target_type: ChannelData
442456
return val.HasField("uint32")
443457
elif target_type == ChannelDataType.UINT_64:
444458
return val.HasField("uint64")
459+
elif target_type == ChannelDataType.BYTES:
460+
return val.HasField("bytes")
461+
raise ValueError(f"Unknown channel data type '{target_type}'.")

0 commit comments

Comments
 (0)