Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 33 additions & 12 deletions thingsboard_gateway/extensions/mqtt/custom_mqtt_uplink_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
#
# ------------------------------------------------------------------------------


from time import time
from simplejson import dumps

from thingsboard_gateway.connectors.mqtt.mqtt_uplink_converter import MqttUplinkConverter
from thingsboard_gateway.gateway.entities.converted_data import ConvertedData
from thingsboard_gateway.gateway.entities.telemetry_entry import TelemetryEntry
Expand All @@ -39,19 +38,41 @@ def convert(self, topic, body):
device_type="Thermostat") # Device profile name for devices
bytes_to_read = body.replace("0x", "") # Replacing the 0x (if '0x' in body), needs for converting to bytearray
converted_bytes = bytearray.fromhex(bytes_to_read) # Converting incoming data to bytearray
timestamp = int(time() * 1000)
extension_config_key = "extensionConfig" if self.__config.get("extensionConfig") is not None else "extension-config"
value = None
if self.__config.get(extension_config_key) is not None:
for telemetry_key in self.__config[extension_config_key]: # Processing every telemetry key in config for extension
value = 0
for _ in range(self.__config[extension_config_key][telemetry_key]): # reading every value with value length from config
value = value * 256 + converted_bytes.pop(0) # process and remove byte from processing
datapoint_key = TBUtility.convert_key_to_datapoint_key(telemetry_key.replace("Bytes", ""), None, {}, self._log)
TelemetryEntry({datapoint_key: value}) # creating telemetry entry
converted_data.add_to_telemetry(datapoint_key) # adding telemetry entry to telemetry array for sending data to platform
for telemetry_key in self.__config[extension_config_key]:
if not self.__config[extension_config_key][telemetry_key]:
continue
for _ in range(self.__config[extension_config_key][telemetry_key]):
try:
# Read each value with value length from config
value = converted_bytes.pop(0)
except Exception:
continue
# Process and remove byte from processing
if value is not None:
datapoint_key = TBUtility.convert_key_to_datapoint_key(
telemetry_key.replace("Bytes", ""),
None,
{},
self._log,
) # Creating telemetry entry
converted_data.add_to_telemetry(
TelemetryEntry({datapoint_key: value}, ts=timestamp)
) # Adding telemetry entry to telemetry array for sending data to platform
else:
datapoint_key = TBUtility.convert_key_to_datapoint_key("data", None, {}, self._log)
telemetry_entry = TelemetryEntry({datapoint_key: int(body, 0)})
converted_data.add_to_telemetry(telemetry_entry) # if no specific configuration in config file - just send data which received
datapoint_key = TBUtility.convert_key_to_datapoint_key(
"data",
None,
{},
self._log,
)
converted_data.add_to_telemetry(
TelemetryEntry({datapoint_key: int(body, 0)}, ts=timestamp)
) # If no specific configuration in config file - just send received data

return converted_data

except Exception as e:
Expand Down
Loading