Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions python/destinations/influxdb_1/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# InfluxDB v1

[This connector](https://github.com/quixio/quix-samples/tree/main/python/destinations/influxdb_1) demonstrates how to
consume data from a Kafka topic in Quix and persist the data to an InfluxDB v3 database using the InfluxDB v1 write API.

To learn more about how it functions, [check out the underlying
Quix Streams `InfluxDB1Source`](https://quix.io/docs/quix-streams/connectors/sinks/influxdb1-sink.html).

## How to run

Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log-in and visit the `Connectors` tab to use this connector.

Clicking `Set up connector` allows you to enter your connection details and runtime parameters.

Then either:
* click `Test connection & deploy` to deploy the pre-built and configured container into Quix.

* or click `Customise connector` to inspect or alter the code before deployment.

## Environment Variables

The connector uses the following environment variables:

### Required
- **input**: Quix input topic
- **INFLUXDB_HOST**: Host address for the InfluxDB instance.
- **INFLUXDB_PORT**: Port for the InfluxDB instance.
- **INFLUXDB_USERNAME**: Username for the InfluxDB instance.
- **INFLUXDB_PASSWORD**: Password for the InfluxDB instance.

### Optional
- **INFLUXDB_DATABASE**: Database name in InfluxDB where data should be stored.
Default: `quix`
- **INFLUXDB_TAG_KEYS**: A comma-separated list of column names (based on message value) to be used as tags when writing data to InfluxDB.
Can optionally replace with a callable in the template directly.
- **INFLUXDB_FIELD_KEYS**: A comma-separated list of column names (based on message value) to be used as fields when writing data to InfluxDB.
Can optionally replace with a callable in the template directly.
- **INFLUXDB_MEASUREMENT_NAME**: The InfluxDB measurement to write data to.
Can optionally replace with a callable in the template directly.
Default: `default`
- **TIMESTAMP_COLUMN**: This is the column in your data that represents the timestamp in nanoseconds.
Defaults to use the message timestamp received from the broker if not supplied.
Can optionally replace with a callable in the template directly.
- **BUFFER_SIZE**: Number of records to buffer before writing to TDengine.
Default: `50`
- **BUFFER_TIMEOUT**: Maximum time (in seconds) to buffer records before writing to TDengine.
Default: `1`

## Requirements / Prerequisites

You will need to have an InfluxDB 3.0 instance available and an API authentication token.

## Contribute

Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.

## Open Source

This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. Please star us and mention us on social to show your appreciation.
28 changes: 28 additions & 0 deletions python/destinations/influxdb_1/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
FROM python:3.12.5-slim-bookworm

# Set environment variables for non-interactive setup and unbuffered output
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8 \
PYTHONPATH="/app"

# Build argument for setting the main app path
ARG MAINAPPPATH=.

# Set working directory inside the container
WORKDIR /app

# Copy requirements to leverage Docker cache
COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt"

# Install dependencies without caching
RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt"

# Copy entire application into container
COPY . .

# Set working directory to main app path
WORKDIR "/app/${MAINAPPPATH}"

# Define the container's startup command
ENTRYPOINT ["python3", "main.py"]
Binary file added python/destinations/influxdb_1/icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
122 changes: 122 additions & 0 deletions python/destinations/influxdb_1/library.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
{
"libraryItemId": "influxdb-1-destination",
"name": "InfluxDB 1.0 Sink",
"language": "Python",
"tags": {
"Pipeline Stage": ["Destination"],
"Type": ["Connectors"],
"Category": ["Time series DB"]
},
"shortDescription": "Consume data from a Kafka topic in Quix and persist the data to an InfluxDB 3.0 database.",
"DefaultFile": "main.py",
"EntryPoint": "dockerfile",
"RunEntryPoint": "main.py",
"IconFile": "icon.png",
"Variables": [
{
"Name": "input",
"Type": "EnvironmentVariable",
"InputType": "InputTopic",
"Description": "This is the input topic",
"DefaultValue": "input-data",
"Required": true
},
{
"Name": "INFLUXDB_HOST",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Host address for the InfluxDB instance (formatted as https://<host>).",
"Required": true
},
{
"Name": "INFLUXDB_PORT",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Port for the InfluxDB instance.",
"Required": true
},
{
"Name": "INFLUXDB_USERNAME",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Username for the InfluxDB instance.",
"Required": true
},
{
"Name": "INFLUXDB_PASSWORD",
"Type": "EnvironmentVariable",
"InputType": "Secret",
"Description": "Password for the InfluxDB instance.",
"Required": true
},
{
"Name": "INFLUXDB_MEASUREMENT_NAME",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The InfluxDB measurement to write data to. Can optionally replace with a callable in the template directly.",
"DefaultValue": "default",
"Required": false
},
{
"Name": "INFLUXDB_DATABASE",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Database name in InfluxDB where data should be stored.",
"DefaultValue": "quix",
"Required": true
},
{
"Name": "INFLUXDB_TAG_KEYS",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The tags to include when writing the measurement data. Example: Tag1,Tag2. Can optionally replace with a callable in the template directly.",
"Required": false
},
{
"Name": "INFLUXDB_FIELD_KEYS",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The fields to include when writing the measurement data. Example: `Field1,Field2`. Can optionally replace with a callable in the template directly. ",
"Required": false
},
{
"Name": "CONSUMER_GROUP_NAME",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The name of the consumer group to use when consuming from Kafka",
"DefaultValue": "influxdb1-sink",
"Required": true
},
{
"Name": "TIMESTAMP_COLUMN",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The column containing the timestamp column. NOTE: Must be nanoseconds. Can optionally replace with a callable in the template directly.",
"Required": false
},
{
"Name": "BUFFER_SIZE",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The number of records that sink holds before flush data to the InfluxDb",
"DefaultValue": "1000",
"Required": false
},
{
"Name": "BUFFER_TIMEOUT",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The number of seconds that sink holds before flush data to the InfluxDb",
"DefaultValue": "1",
"Required": false
}
],
"DeploySettings": {
"DeploymentType": "Service",
"CpuMillicores": 200,
"MemoryInMb": 500,
"Replicas": 1,
"PublicAccess": false,
"ValidateConnection": true
}
}
60 changes: 60 additions & 0 deletions python/destinations/influxdb_1/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# import Utility modules
import os

from typing import Optional

# import vendor-specific modules
from quixstreams import Application
from quixstreams.sinks.core.influxdb1 import (
InfluxDB1Sink,
FieldsSetter,
MeasurementSetter,
TagsSetter,
TimeSetter,
)

# for local dev, load env vars from a .env file
from dotenv import load_dotenv
load_dotenv()


def _as_iterable(env_var) -> list[str]:
return keys.split(",") if (keys := os.environ.get(env_var)) else []


# Potential Callables - can manually edit these to instead use your own callables.
# --Required--
measurement_name: MeasurementSetter = os.getenv("INFLUXDB_MEASUREMENT_NAME", "default")
# --Optional--
tag_keys: TagsSetter = _as_iterable("INFLUXDB_TAG_KEYS")
field_keys: FieldsSetter = _as_iterable("INFLUXDB_FIELD_KEYS")
time_setter: Optional[TimeSetter] = col if (col := os.environ.get("TIMESTAMP_COLUMN")) else None


influxdb_v1_sink = InfluxDB1Sink(
host=os.environ["INFLUXDB_HOST"],
port=int(os.environ["INFLUXDB_PORT"]),
username=os.environ["INFLUXDB_USERNAME"],
password=os.environ["INFLUXDB_PASSWORD"],
tags_keys=tag_keys,
fields_keys=field_keys,
time_setter=time_setter,
database=os.getenv("INFLUXDB_DATABASE", "quix"),
measurement=measurement_name,
)


app = Application(
consumer_group=os.environ.get("CONSUMER_GROUP_NAME", "influxdb-data-writer"),
auto_offset_reset="earliest",
commit_every=int(os.environ.get("BUFFER_SIZE", "1000")),
commit_interval=float(os.environ.get("BUFFER_DELAY", "1")),
)
input_topic = app.topic(os.environ["input"])

sdf = app.dataframe(input_topic)
sdf.sink(influxdb_v1_sink)


if __name__ == "__main__":
app.run()
2 changes: 2 additions & 0 deletions python/destinations/influxdb_1/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
quixstreams[influxdb3]==3.19.0
python-dotenv