Skip to content

Commit 5a68b3b

Browse files
Sinks API docs (#439)
Co-authored-by: Tun <[email protected]>
1 parent ad576dd commit 5a68b3b

File tree

9 files changed

+387
-36
lines changed

9 files changed

+387
-36
lines changed

docs/api-reference/sinks.md

Whitespace-only changes.

docs/build/build.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,14 @@
107107
"quixstreams.context",
108108
]
109109
},
110+
"sinks.md": {
111+
k: None
112+
for k in [
113+
"quixstreams.sinks.influxdb3",
114+
"quixstreams.sinks.csv",
115+
"quixstreams.sinks.base.sink",
116+
]
117+
},
110118
}
111119

112120
# Go over all modules and assign them to doc files

docs/connectors/sinks/README.md

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Sinks (beta)
2+
3+
In many stream processing use cases the results need to be written to external destinations to be shared with other subsystems.
4+
5+
Quix Streams provides a sink API to achieve that.
6+
7+
An example using InfluxDB Sink:
8+
9+
```python
10+
from quixstreams import Application
11+
from quixstreams.sinks.influxdb3 import InfluxDB3Sink
12+
13+
app = Application(broker_address="localhost:9092")
14+
topic = app.topic("numbers-topic")
15+
16+
# Initialize InfluxDB3Sink
17+
influx_sink = InfluxDB3Sink(
18+
token="<influxdb-access-token>",
19+
host="<influxdb-host>",
20+
organization_id="<influxdb-org>",
21+
database="<influxdb-database>",
22+
measurement="numbers",
23+
fields_keys=["number"],
24+
tags_keys=["tag"]
25+
)
26+
27+
sdf = app.dataframe(topic)
28+
# Do some processing here ...
29+
# Sink data to InfluxDB
30+
sdf.sink(influx_sink)
31+
```
32+
33+
## Sinks Are Destinations
34+
When `.sink()` is called on a StreamingDataFrame instance, it marks the end of the processing pipeline, and
35+
the StreamingDataFrame can't be changed anymore.
36+
37+
Make sure you call `StreamingDataFrame.sink()` as the last operation.
38+
39+
40+
## Supported Sinks
41+
42+
Currently, Quix Streams provides these sinks out of the box:
43+
- [CSV Sink](csv-sink.md) - a simple CSV sinks that writes data to a single CSV file.
44+
- [InfluxDB 3 Sink](influxdb3-sink.md) - a sink to write data to InfluxDB 3.
45+
46+
It's also possible to implement your own custom sinks.
47+
Please see the [Creating a Custom Sink](custom-sinks.md) page on how to do that.
48+
49+
## Performance Considerations
50+
Since the implementation of `BatchingSink` accumulates data in-memory, it will increase memory usage.
51+
52+
If the batches become large enough, it can also put additional load on the destination and decrease the overall throughput.
53+
54+
To adjust the number of messages that are batched and written in one go, you may provide a `commit_every` parameter to the `Application`.
55+
It will limit the amount of data processed and sinked during a single checkpoint.
56+
Note that it only limits the amount of incoming messages, and not the number of records being written to sinks.
57+
58+
**Example:**
59+
60+
```python
61+
from quixstreams import Application
62+
from quixstreams.sinks.influxdb3 import InfluxDB3Sink
63+
64+
# Commit the checkpoints after processing 1000 messages or after a 5 second interval has elapsed (whichever is sooner).
65+
app = Application(
66+
broker_address="localhost:9092",
67+
commit_interval=5.0,
68+
commit_every=1000,
69+
)
70+
topic = app.topic('numbers-topic')
71+
sdf = app.dataframe(topic)
72+
73+
# Create an InfluxDB sink that batches data between checkpoints.
74+
influx_sink = InfluxDB3Sink(
75+
token="<influxdb-access-token>",
76+
host="<influxdb-host>",
77+
organization_id="<influxdb-org>",
78+
database="<influxdb-database>",
79+
measurement="numbers",
80+
fields_keys=["number"],
81+
tags_keys=["tag"]
82+
)
83+
84+
# The sink will write to InfluxDB across all assigned partitions.
85+
sdf.sink(influx_sink)
86+
```

docs/connectors/sinks/csv-sink.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# CSV Sink
2+
3+
A basic sink to write processed data to a single CSV file.
4+
5+
It's meant to be used mostly for local debugging.
6+
7+
## How To Use CSV Sink
8+
9+
To use a CSV sink, you need to create an instance of `CSVSink` and pass
10+
it to the `StreamingDataFrame.sink()` method:
11+
12+
```python
13+
from quixstreams import Application
14+
from quixstreams.sinks.csv import CSVSink
15+
16+
app = Application(broker_address="localhost:9092")
17+
topic = app.topic("input-topic")
18+
19+
# Initialize a CSV sink with a file path
20+
csv_sink = CSVSink(path="file.csv")
21+
22+
sdf = app.dataframe(topic)
23+
# Do some processing here ...
24+
# Sink data to a CSV file
25+
sdf.sink(csv_sink)
26+
```
27+
28+
## How the CSV Sink Works
29+
`CSVSink` is a batching sink.
30+
It batches processed records in memory per topic partition, and writes them to the file when a checkpoint is committed.
31+
32+
The output file format is the following:
33+
```
34+
key,value,timestamp,topic,partition,offset
35+
b'afd7e8ab-4af5-4322-8417-dbfc7a0d7694',"{""number"": 0}",1722945524540,numbers-10k-keys,0,0
36+
b'557bae7f-14b6-46c4-abc3-12f232b54c8e',"{""number"": 1}",1722945524546,numbers-10k-keys,0,1
37+
```
38+
## Serialization Formats
39+
By default, `CSVSink` serializes record keys by calling `str()` on them, and message values with `json.dumps()`.
40+
41+
To use your own serializer, pass `key_serializer` and `value_serializer` to `CSVSink`:
42+
43+
```python
44+
import json
45+
from quixstreams.sinks.csv import CSVSink
46+
47+
# Initialize a CSVSink with a file path
48+
csv_sink = CSVSink(
49+
path="file.csv",
50+
# Define custom serializers for keys and values here.
51+
# The callables must accept one argument for key/value, and return a string
52+
key_serializer=lambda key: json.dumps(key),
53+
value_serializer=lambda value: str(value),
54+
)
55+
```
56+
57+
## Delivery Guarantees
58+
The `CSVSink` provides at-least-once guarantees, and the resulting CSV file may contain duplicated rows of data if there were errors during processing.
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# Creating a Custom Sink
2+
3+
Quix Streams provides basic facilities to implement custom sinks for external destinations (currently in beta).
4+
5+
To create a new sink, extend and implement the following Python base classes:
6+
- `quixstreams.sinks.base.sink.BaseSink` - the parent interface for all sinks.
7+
The `StreamingDataFrame.sink()` accepts implementations of this class.
8+
9+
- `quixstreams.sinks.base.sink.BatchingSink` - a base class for batching sinks, that need to batch data first before writing it to the external destination.
10+
Check out [InfluxDB3Sink](influxdb3-sink.md) and [CSVSink](csv-sink.md) for example implementations of the batching sinks.
11+
12+
13+
Here is the code for `BaseSink` class for the reference:
14+
15+
```python
16+
import abc
17+
18+
class BaseSink(abc.ABC):
19+
"""
20+
This is a base class for all sinks.
21+
22+
Subclass and implement its methods to create your own sink.
23+
24+
Note that sinks are currently in beta, and their design may change over time.
25+
"""
26+
27+
@abc.abstractmethod
28+
def flush(self, topic: str, partition: int):
29+
"""
30+
This method is triggered by the Checkpoint class when it commits.
31+
32+
You can use `flush()` to write the batched data to the destination (in case of
33+
a batching sink), or confirm the delivery of the previously sent messages
34+
(in case of a streaming sink).
35+
36+
If flush() fails, the checkpoint will be aborted.
37+
"""
38+
39+
@abc.abstractmethod
40+
def add(
41+
self,
42+
value: Any,
43+
key: Any,
44+
timestamp: int,
45+
headers: List[Tuple[str, HeaderValue]],
46+
topic: str,
47+
partition: int,
48+
offset: int,
49+
):
50+
"""
51+
This method is triggered on every new record sent to this sink.
52+
53+
You can use it to accumulate batches of data before sending them outside, or
54+
to send results right away in a streaming manner and confirm a delivery later
55+
on flush().
56+
"""
57+
58+
def on_paused(self, topic: str, partition: int):
59+
"""
60+
This method is triggered when the sink is paused due to backpressure, when
61+
the `SinkBackpressureError` is raised.
62+
63+
Here you can react to backpressure events.
64+
"""
65+
```
66+
67+
68+
## Sinks Workflow
69+
70+
During processing, sinks do the following operations:
71+
72+
1. When a new record arrives, the application calls `BaseSink.add()` method.
73+
At this point, the sink implementation can decide what to do with the new record.
74+
For example, the `BatchingSink` will add a record to an in-memory batch.
75+
Other sinks may write the data straight away.
76+
77+
2. When the current checkpoint is committed, the app calls `BaseSink.flush()`.
78+
For example, `BatchingSink` will write the accumulated data during `flush()`.
79+
1. If the destination cannot accept new data, sinks can raise a special exception `SinkBackpressureError(topic, partition, retry_after)` and specify the timeout for the writes to be retried later.
80+
2. The application will react to `SinkBackpressureError` by pausing the corresponding topic-partition for the given time and seeking the partition offset back to the beginning of the checkpoint.
81+
3. When the timeout elapses, the app will resume consuming from this partition, re-process the data, and try to sink it again.
82+
83+
3. If any of the sinks fail during `flush()`, the application will abort the checkpoint, and the data will be re-processed again.
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# InfluxDB v3 Sink
2+
3+
InfluxDB is an open source time series database for metrics, events, and real-time analytics.
4+
5+
Quix Streams provides a sink to write processed data to InfluxDB v3.
6+
7+
>***NOTE***: This sink only supports InfluxDB v3. Versions 1 and 2 are not supported.
8+
9+
## How To Use the InfluxDB Sink
10+
11+
To sink data to InfluxDB, you need to create an instance of `InfluxDB3Sink` and pass
12+
it to the `StreamingDataFrame.sink()` method:
13+
14+
```python
15+
from quixstreams import Application
16+
from quixstreams.sinks.influxdb3 import InfluxDB3Sink
17+
18+
app = Application(broker_address="localhost:9092")
19+
topic = app.topic("numbers-topic")
20+
21+
# Initialize InfluxDB3Sink
22+
influx_sink = InfluxDB3Sink(
23+
token="<influxdb-access-token>",
24+
host="<influxdb-host>",
25+
organization_id="<influxdb-org>",
26+
database="<influxdb-database>",
27+
measurement="numbers",
28+
fields_keys=["number"],
29+
tags_keys=["tag"]
30+
)
31+
32+
sdf = app.dataframe(topic)
33+
# Do some processing here ...
34+
# Sink data to InfluxDB
35+
sdf.sink(influx_sink)
36+
```
37+
38+
## How the InfluxDB Sink Works
39+
`InfluxDB3Sink` is a batching sink.
40+
It batches processed records in memory per topic partition, and writes them to the InfluxDB instance when a checkpoint has been committed.
41+
42+
Under the hood, it transforms data to the Influx format using and writes processed records in batches.
43+
44+
### What data can be sent to InfluxDB?
45+
46+
`InfluxDB3Sink` can accept only dictionaries values.
47+
48+
If the record values are not dicts, you need to convert them to dicts using `StreamingDataFrame.apply()` before sinking.
49+
50+
The structure of the sinked data is defined by the `fields_keys` and `tags_keys` parameters provided to the sink class.
51+
52+
- `fields_keys` - a list of keys to be used as "fields" when writing to InfluxDB.
53+
If present, its keys cannot overlap with any in `tags_keys`.
54+
If empty, the whole record value will be used.
55+
The fields' values can only be strings, floats, integers, or booleans.
56+
57+
- `tags_keys` - a list of keys to be used as "tags" when writing to InfluxDB.
58+
If present, its keys cannot overlap with any in `fields_keys`.
59+
These keys will be popped from the value dictionary automatically because InfluxDB doesn't allow the same keys be both in tags and fields.
60+
If empty, no tags will be sent.
61+
>***NOTE***: InfluxDB client always converts tag values to strings.
62+
63+
To learn more about schema design and data types in InfluxDB, please read [InfluxDB schema design recommendations](https://docs.influxdata.com/influxdb/cloud-serverless/write-data/best-practices/schema-design/).
64+
65+
## Delivery Guarantees
66+
`InfluxDB3Sink` provides at-least-once guarantees, and the same records may be written multiple times in case of errors during processing.
67+
68+
## Backpressure Handling
69+
InfluxDB sink automatically handles events when the database cannot accept new data due to write limits.
70+
71+
When this happens, the application loses the accumulated in-memory batch and pauses the corresponding topic partition for a timeout duration returned by InfluxDB API (it returns an HTTP error with 429 status code and a `Retry-After` header with a timeout).
72+
When the timeout expires, the app automatically resumes the partition to re-process the data and sink it again.
73+
74+
## Configuration
75+
InfluxDB3Sink accepts the following configuration parameters:
76+
77+
- `token` - InfluxDB access token.
78+
79+
- `host` - InfluxDB host in format "https://<host>"
80+
81+
- `organization_id` - InfluxDB organization ID.
82+
83+
- `database` - a database name.
84+
85+
- `measurement` - a measurement name, required.
86+
87+
- `fields_keys` - a list of keys to be used as "fields" when writing to InfluxDB.
88+
See the [What data can be sent to InfluxDB](#what-data-can-be-sent-to-influxdb) for more info.
89+
90+
- `tags_keys` - a list of keys to be used as "tags" when writing to InfluxDB.
91+
See the [What data can be sent to InfluxDB](#what-data-can-be-sent-to-influxdb) for more info.
92+
93+
94+
- `time_key` - a key to be used as "time" when writing to InfluxDB.
95+
By default, the record timestamp will be used with millisecond time precision.
96+
When using a custom key, you may need to adjust the `time_precision` setting to match.
97+
98+
- `time_precision` - a time precision to use when writing to InfluxDB.
99+
Default - `ms`.
100+
101+
- `include_metadata_tags` - if True, includes the record's key, topic, and partition as tags.
102+
Default - `False`.
103+
104+
- `batch_size` - the number of records to write to InfluxDB in one request.
105+
Note that it only affects the size of one write request, and not the number of records flushed on each checkpoint.
106+
Default - `1000`.
107+
108+
- `enable_gzip` - if True, enables gzip compression for writes.
109+
Default - `True`.
110+
111+
- `request_timeout_ms` - an HTTP request timeout in milliseconds.
112+
Default - `10000`.
113+
114+
- `debug` - if True, print debug logs from InfluxDB client.
115+
Default - `False`.

mkdocs.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ nav:
4141
- Managing Kafka Topics: advanced/topics.md
4242
- Using Producer & Consumer: advanced/producer-consumer-lowlevel.md
4343
- Connecting to Quix Cloud: quix-platform.md
44+
- 'Connectors [beta]':
45+
- Sinks:
46+
- 'connectors/sinks/README.md'
47+
- CSV Sink: connectors/sinks/csv-sink.md
48+
- InfluxDB v3 Sink: connectors/sinks/influxdb3-sink.md
49+
- Creating a Custom Sink: connectors/sinks/custom-sinks.md
4450
- Upgrading Guide:
4551
- Upgrading from Quix Streams v0.5: upgrading-legacy.md
4652

0 commit comments

Comments
 (0)