Skip to content

Commit 428d567

Browse files
Daniil Gusevharisbotic
andauthored
Implement Tumbling and Hopping windows (#257)
* Implement Tumbling and Hopping windows with basic aggregations (reduce, mean, max, min, etc.) * Add the WindowedRocksDBStore to store window data * Add bloom filter settings to default Rocksdb options * Add default WAL size settings to default Rocksdb options * add a timestamp extractor to parse timestamps from incoming messages --------- Co-authored-by: Haris Botić <[email protected]>
1 parent 510b131 commit 428d567

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+4292
-417
lines changed

docs/api-reference/application.md

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
class Application()
1111
```
1212

13-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L42)
13+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L43)
1414

1515
The main Application class.
1616

@@ -78,7 +78,7 @@ def __init__(broker_address: str,
7878
loglevel: Optional[LogLevel] = "INFO")
7979
```
8080

81-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L81)
81+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L82)
8282

8383

8484
<br>
@@ -155,7 +155,7 @@ def Quix(cls,
155155
auto_create_topics: bool = True) -> Self
156156
```
157157

158-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L187)
158+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L188)
159159

160160
Initialize an Application to work with Quix platform,
161161

@@ -262,10 +262,11 @@ def topic(name: str,
262262
key_deserializer: DeserializerType = "bytes",
263263
value_serializer: SerializerType = "json",
264264
key_serializer: SerializerType = "bytes",
265-
creation_configs: Optional[TopicCreationConfigs] = None) -> Topic
265+
creation_configs: Optional[TopicCreationConfigs] = None,
266+
timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic
266267
```
267268

268-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L333)
269+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L334)
269270

270271
Create a topic definition.
271272

@@ -324,7 +325,7 @@ Its name will be overridden by this method's 'name' param.
324325
def dataframe(topic: Topic) -> StreamingDataFrame
325326
```
326327

327-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L396)
328+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L400)
328329

329330
A simple helper method that generates a `StreamingDataFrame`, which is used
330331

@@ -374,7 +375,7 @@ to be used as an input topic.
374375
def stop()
375376
```
376377

377-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L432)
378+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L436)
378379

379380
Stop the internal poll loop and the message processing.
380381

@@ -394,7 +395,7 @@ To otherwise stop an application, either send a `SIGTERM` to the process
394395
def get_producer() -> Producer
395396
```
396397

397-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L444)
398+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L448)
398399

399400
Create and return a pre-configured Producer instance.
400401
The Producer is initialized with params passed to Application.
@@ -429,7 +430,7 @@ with app.get_producer() as producer:
429430
def get_consumer() -> Consumer
430431
```
431432

432-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L480)
433+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L484)
433434

434435
Create and return a pre-configured Consumer instance.
435436
The Consumer is initialized with params passed to Application.
@@ -474,7 +475,7 @@ with app.get_consumer() as consumer:
474475
def clear_state()
475476
```
476477

477-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L529)
478+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L533)
478479

479480
Clear the state of the application.
480481

@@ -488,7 +489,7 @@ Clear the state of the application.
488489
def run(dataframe: StreamingDataFrame)
489490
```
490491

491-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L558)
492+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L562)
492493

493494
Start processing data from Kafka using provided `StreamingDataFrame`
494495

@@ -532,7 +533,7 @@ app.run(dataframe=df)
532533
class State(Protocol)
533534
```
534535

535-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/state/types.py#L95)
536+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/state/types.py#L102)
536537

537538
Primary interface for working with key-value state data from `StreamingDataFrame`
538539

@@ -546,7 +547,7 @@ Primary interface for working with key-value state data from `StreamingDataFrame
546547
def get(key: Any, default: Any = None) -> Optional[Any]
547548
```
548549

549-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/state/types.py#L100)
550+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/state/types.py#L107)
550551

551552
Get the value for key if key is present in the state, else default
552553

@@ -573,7 +574,7 @@ value or None if the key is not found and `default` is not provided
573574
def set(key: Any, value: Any)
574575
```
575576

576-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/state/types.py#L109)
577+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/state/types.py#L116)
577578

578579
Set value for the key.
579580

@@ -594,7 +595,7 @@ Set value for the key.
594595
def delete(key: Any)
595596
```
596597

597-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/state/types.py#L116)
598+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/state/types.py#L123)
598599

599600
Delete value for the key.
600601

@@ -616,7 +617,7 @@ This function always returns `None`, even if value is not found.
616617
def exists(key: Any) -> bool
617618
```
618619

619-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/state/types.py#L124)
620+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/state/types.py#L131)
620621

621622
Check if the key exists in state.
622623

0 commit comments

Comments
 (0)