Releases: quixio/quix-streams
v3.22.0
What's Changed
- [BREAKING] Remove default value for
quix_portal_api
URL.
See the Connecting to Quix Cloud to learn how to get the Quix Portal API URL and connect to the Quix broker locally.
By @gwaramadze in #991 - Improvement: TDengine sink adds empty value checking and supports string-formatted timestamps by @huskar-t in #979
- Bump mypy from 1.17.0 to 1.17.1 by @dependabot[bot] in #992
New Contributors
Full Changelog: v3.21.0...v3.22.0
v3.21.0
What's Changed
💎 Split data into multiple topics with StreamingDataFrame.to_topic()
To dynamically route messages to different topics based on the message content, you can now provide a callable that returns a Topic
object to the StreamingDataFrame.to_topic()
method:
from quixstreams import Application
app = Application(...)
# Declare topics
input_topic = app.topic('sensor-data', value_deserializer='json')
normal_topic = app.topic('normal-readings', value_serializer='json')
alert_topic = app.topic('high-temp-alerts', value_serializer='json')
sdf = app.dataframe(input_topic)
def route_by_temperature(value, key, timestamp: int, headers):
"""
Send messages to different topics based on the temperature sensor value.
"""
if value.get('temperature', 0) > 80:
return alert_topic
else:
return normal_topic
sdf.to_topic(topic=route_by_temperature)
See more in the "Splitting data into multiple topics" section in the docs.
By @gwaramadze in #976
Dependencies
- Update confluent-kafka[avro,json,protobuf,schemaregistry] requirement from <2.10,>=2.8.2 to >=2.8.2,<2.12 by @dependabot[bot] in #849
- Bump testcontainers[postgres] from 4.10.0 to 4.12.0 by @dependabot[bot] in #984
- Bump types-jsonschema from 4.24.0.20250708 to 4.25.0.20250720 by @dependabot[bot] in #981
Full Changelog: v3.20.0...v3.21.0
v3.20.0
What's Changed
- Prefix
transactional.id
by the Quix workspace id when connecting to Quix brokers by @gwaramadze in #974 - Add
transactional:bool
parameter toApplication.get_producer()
to enable Kafka Transactions API in raw Producer by @gwaramadze in #974 lookup.quix_configuration_service
: add support for binary and JSON fields by @ovv in #971lookup.quix_configuration_service
: Handlevalid_from=None
in configuration version selection by @ovv in #946- PostgreSQLSink: add support for
ON CONFLICT DO UPDATE
on primary keys by @tim-quix in #966
Dependencies
- Update pydantic-settings requirement from <2.10,>=2.3 to >=2.3,<2.11 by @dependabot[bot] in #951
- Bump types-jsonschema from 4.24.0.20250528 to 4.24.0.20250708 by @dependabot[bot] in #973
- Bump mypy from 1.16.1 to 1.17.0 by @dependabot[bot] in #972
Full Changelog: v3.19.0...v3.20.0
v3.19.0
What's Changed
🔌 Connectors
- A new sink for TDengine - an open source time series database optimized for IoT, connected vehicles, and industrial applications.
See TDengineSink docs to learn more.
By @jbrass in #931
🛠️ Internal
- Update the
Application.run()
logs by @daniil-quix in #964 - TDengine sink updates by @daniil-quix in #968
Dependencies
- Bump types-protobuf from 6.30.2.20250516 to 6.30.2.20250703 by @dependabot in #965
New Contributors
Full Changelog: v3.18.1...v3.19.0
v3.18.1
What's Changed
- Fix PostgersLookup imports by @daniil-quix in #962
- Readme: convert the Roadmap block to Features by @daniil-quix in #961
Full Changelog: v3.18.0...v3.18.1
v3.18.0
What's Changed
💎 Join Lookup: PostgreSQL
Added a Lookup join implementation for enriching streaming data with data from a Postgres database.
The new PostgresLookup
allows querying a Postgres database for each field, using a persistent connection and per-field caching based on a configurable TTL.
The cache is a "Least Recently Used" (LRU) cache with a configurable maximum size.
See PostgresLookup API docs for more info.
from quixstreams import Application
from quixstreams.dataframe.joins.lookups.postgresql import PostgresLookup
app = Application(...)
sdf = app.dataframe(...)
# Initialize PostgresLookup with Postgres credentials
lookup = PostgresLookup(
host="<host>",
port=5432,
dbname="<db>",
user="<user>",
password="<password>",
cache_size=1000,
)
# Add columns "table_column1" and "table_column2" from "my_table" to the Kafka record as a new field "joined".
# Match by comparing "my_record_field" on the left and "table_column1" on the right.
fields = {
"joined": lookup.field(
table="my_table", columns=["table_column1", "table_column2"], on="table_column1"
),
}
sdf = sdf.join_lookup(lookup, fields, on="my_record_field")
app.run()
🦠 Bugfixes
- Fix typo in Producer by @gwaramadze in #953
Full Changelog: v3.17.0...v3.18.0
v3.17.0
What's Changed
💎 Interval joins: StreamingDataFrame.join_interval()
Use StreamingDataFrame.join_interval()
to join two topics into a new stream where each record is merged with records from the other topic that fall within a specified time interval.
This join is useful for cases where you need to match records that occur within a specific time window of each other, rather than just the latest record (as in as-of join).
from datetime import timedelta
from quixstreams import Application
app = Application(...)
sdf_measurements = app.dataframe(app.topic("measurements"))
sdf_events = app.dataframe(app.topic("events"))
# Join records from the topic "measurements"
# with records from "events" that occur within a 5-minute window
# before and after each measurement
sdf_joined = sdf_measurements.join_interval(
right=sdf_events,
how="inner", # Emit updates only if matches are found
on_merge="keep-left", # Prefer the columns from the left dataframe if they overlap
grace_ms=timedelta(days=7), # Keep the state for 7 days
backward_ms=timedelta(minutes=5), # Look for events up to 5 minutes before
forward_ms=timedelta(minutes=5), # Look for events up to 5 minutes after
)
if __name__ == '__main__':
app.run()
Please take a look at the Interval Join docs for more examples.
By @gwaramadze in #924
[breaking] 💥 Updated Application.run()
behavior with count
and timestamp
parameters
1. Changed the meaning of the count
parameter.
Previously, when calling Application.run(count=...)
or Application.run(count=..., timeout=...)
, the count
parameter meant
"Number of messages to process before stopping the app".
In this update, we're changing the meaning of the count
parameter to "number of outputs".
A simple way to think about outputs is "how many messages my application would send to an output topic."
This behavior is more intuitive instead of counting input messages.
Note that operations like filtering or aggregations reduce the number of outputs, and StreamingDataFrame.apply(..., expand=True)
may output more data than it receives.
2. Application.run()
can now collect and return outputs when collect=True
is passed.
You can now test and debug the applications more easily using count
and/or timeout
parameters:
from quixstreams import Application
app = Application(broker_address="localhost:9092")
topic = app.topic("some-topic")
# Assume the topic has one partition and three JSON messages:
# {"temperature": 30}
# {"temperature": 40}
# {"temperature": 50}
sdf = app.dataframe(topic=topic)
# Process one output and collect the values (stops if no messages for 10s)
result_values_only = app.run(count=1, timeout=10, collect=True)
# >>> result_values_only = [
# {"temperature": 30}
# ]
# Process one output and collect the values with metadata (stops if no messages for 10s)
result_values_and_metadata = app.run(count=1, timeout=10, collect=True, metadata=True)
# >>> result_values_and_metadata = [
# {"temperature": 40, "_key": "<message_key>", "_timestamp": 123, "_offset": 1, "_topic": "some-topic", "_partition": 1, "_headers": None},
# ]
# Process one output and without collecting (stops if no messages for 10s)
result_empty = app.run(count=1, timeout=10, collect=False)
# >>> result_empty = []
See more details in Inspecting Data and Debugging section.
By @daniil-quix in #932
💎 Log recovery progress
The application now logs the recovery progress every 10s to simplify the monitoring:
...
[INFO] [quixstreams] : Recovery progress for <RecoveryPartition "changelog__state-store[0]">: 100 / 1000
[INFO] [quixstreams] : Recovery progress for <RecoveryPartition "changelog__state-store[0]">: 199 / 1000
...
By @gwaramadze in #941
📖 Docs
- Add docs for InfluxDB1Sink by @daniil-quix in #947 #949
🛠️ Internal
- Use generic WindowType by @gwaramadze in #942
- Remove the window expiration logs by @daniil-quix in #945
🔌 Connectors
- PostgresqlSink: correctly handle jsonb values by @tim-quix in #935
- BigQuerySink: use quix-streams user agent in requests by @gwaramadze in #943
- InfluxDB1Sink: new by @tim-quix in #936
Dependencies
- Bump mypy from 1.16.0 to 1.16.1 by @dependabot in #934
- Bump types-requests from 2.32.0.20250602 to 2.32.4.20250611 by @dependabot in #933
Full Changelog: v3.16.1...v3.17.0
v3.16.1
What's Changed
🦠 Bugfixes
StreamingDataFrame: retain a custom stream_id across operations by @daniil-quix in #925
.group_by()
due to optimizations introduced in v3.14.0.
The stream_id
is used as part of the State stores' names, and it wasn't propagated correctly, leading to incorrect store names in some cases.
The fix in #925 corrects that, but the state stores created after .filter()
or .apply()
operations on the grouped DataFrame won't be accessible anymore because of the corrected stream ids.
See #925 for more examples of affected code.
Other fixes
- Fix conda requirements by @daniil-quix in #920
- sinks/InfluxDB3: adjust check to work with v2 and v3 by @tim-quix in #922
Full Changelog: v3.16.0...v3.16.1
v3.16.0
What's Changed
💎 New features
[experimental] StreamingDataFrame.join_lookup
StreamingDataFrame.join_lookup()
is a new special type of join that allows you to enrich records in a streaming dataframe with the data from external systems.
You can use it to enrich streaming data with configuration or reference data from an external source, like a database.
Note: This is an experimental feature, and the API may change in the future.
Docs - https://quix.io/docs/quix-streams/joins.html#lookup-join
Accept the Application(quix_portal_api=...)
parameter to specify the dedicated Quix Cloud control plane URL.
By @daniil-quix in #902
🦠 Bugfixes
- Add fix for using reducer with window with test added by @petrpan26 in #918
📖 Docs
- Add Solar Farm Enrichment tutorial by @daniil-quix in #903
🛠️ Internal
- API to collect duplicates in join-related store by @gwaramadze in #906
- Topics refactoring by @daniil-quix in #896
- Lazy creation of RocksDB column families by @gwaramadze in #905
Dependencies
- Bump types-requests from 2.32.0.20250515 to 2.32.0.20250602 by @dependabot in #914
- Bump mypy from 1.15.0 to 1.16.0 by @dependabot in #915
- Bump types-jsonschema from 4.23.0.20241208 to 4.23.0.20250516 by @dependabot in #897
New Contributors
- @petrpan26 made their first contribution in #918
Full Changelog: v3.15.0...v3.16.0
v3.15.0
What's Changed
💎 New streaming join: StreamingDataFrame.join_asof
With StreamingDataFrame.join_asof()
, you can join two topics into a new stream where each left record is merged with the right record with the same key whose timestamp is less than or equal to the left timestamp.
This join is built with the timeseries enrichment use cases in mind, where the left side represents some measurements and the right side represents events.
Some examples:
- Matching of the sensor measurements with the events in the system.
- Joining the purchases with the effective prices of the goods.
from datetime import timedelta
from quixstreams import Application
app = Application(...)
sdf_measurements = app.dataframe(app.topic("measurements"))
sdf_metadata = app.dataframe(app.topic("metadata"))
# Join records from the topic "measurements"
# with the latest effective records from the topic "metadata".
# using the "inner" join strategy and keeping the "metadata" records stored for 14 days in event time.
sdf_joined = sdf_measurements.join_asof(
right=sdf_metadata,
how="inner", # Emit updates only if the match is found in the store.
on_merge="keep-left", # Prefer the columns from the left dataframe if they overlap with the right.
grace_ms=timedelta(days=14), # Keep the state for 14 days (measured in event time, similar to windows).
)
if __name__ == '__main__':
app.run()
Learn more about it on the Joins docs page.
By @gwaramadze and @daniil-quix in #874 #841
State improvements
- Enable fsync in RocksDB by default by @gwaramadze in #883
- RocksDBStorePartition: log a number of bytes written by @daniil-quix in #890
- Optimize state operations by @daniil-quix in #891
- Add a parameter to clear corrupted RocksDBs by @gwaramadze in #888.
To re-create the corrupted RocksDB state store automatically, use the RocksDBOptions object:
from quixstreams import Application
from quixstreams.state.rocksdb import RocksDBOptions
app = Application(..., rocksdb_options=RocksDBOptions(on_corrupted_recreate=True))
Dependencies
- Bump types-protobuf from 6.30.2.20250503 to 6.30.2.20250516 by @dependabot in #885
Full Changelog: v3.14.1...v3.15.0