Skip to content

Commit 24b7e08

Browse files
authored
Add API to create a pre-configured Producer and Consumer on the Application level (#261)
Add 2 new public methods: Application.get_producer() that initializes a disposable producer (i.e. we don't re-use the already initialized Producer, but instead create a one-time producer with the right config) that users can use when they simply need to produce the data. Application.get_consumer() which similarly creates a disposable Consumer.
1 parent 92c185e commit 24b7e08

File tree

10 files changed

+657
-303
lines changed

10 files changed

+657
-303
lines changed

docs/api-reference/application.md

Lines changed: 93 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
class Application()
1111
```
1212

13-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/780d6ed95124b27f911a7a7d8cd68572fd6d7f2a/quixstreams/app.py#L42)
13+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L42)
1414

1515
The main Application class.
1616

@@ -78,7 +78,7 @@ def __init__(broker_address: str,
7878
loglevel: Optional[LogLevel] = "INFO")
7979
```
8080

81-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/780d6ed95124b27f911a7a7d8cd68572fd6d7f2a/quixstreams/app.py#L81)
81+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L81)
8282

8383

8484
<br>
@@ -155,7 +155,7 @@ def Quix(cls,
155155
auto_create_topics: bool = True) -> Self
156156
```
157157

158-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/780d6ed95124b27f911a7a7d8cd68572fd6d7f2a/quixstreams/app.py#L178)
158+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L187)
159159

160160
Initialize an Application to work with Quix platform,
161161

@@ -265,7 +265,7 @@ def topic(name: str,
265265
creation_configs: Optional[TopicCreationConfigs] = None) -> Topic
266266
```
267267

268-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/780d6ed95124b27f911a7a7d8cd68572fd6d7f2a/quixstreams/app.py#L324)
268+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L333)
269269

270270
Create a topic definition.
271271

@@ -324,7 +324,7 @@ Its name will be overridden by this method's 'name' param.
324324
def dataframe(topic: Topic) -> StreamingDataFrame
325325
```
326326

327-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/780d6ed95124b27f911a7a7d8cd68572fd6d7f2a/quixstreams/app.py#L387)
327+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L396)
328328

329329
A simple helper method that generates a `StreamingDataFrame`, which is used
330330

@@ -374,7 +374,7 @@ to be used as an input topic.
374374
def stop()
375375
```
376376

377-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/780d6ed95124b27f911a7a7d8cd68572fd6d7f2a/quixstreams/app.py#L423)
377+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L432)
378378

379379
Stop the internal poll loop and the message processing.
380380

@@ -384,6 +384,86 @@ likely through some sort of threading).
384384
To otherwise stop an application, either send a `SIGTERM` to the process
385385
(like Kubernetes does) or perform a typical `KeyboardInterrupt` (`Ctrl+C`).
386386

387+
<a id="quixstreams.app.Application.get_producer"></a>
388+
389+
<br><br>
390+
391+
#### Application.get\_producer
392+
393+
```python
394+
def get_producer() -> Producer
395+
```
396+
397+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L444)
398+
399+
Create and return a pre-configured Producer instance.
400+
The Producer is initialized with params passed to Application.
401+
402+
It's useful for producing data to Kafka outside the standard Application processing flow,
403+
(e.g. to produce test data into a topic).
404+
Using this within the StreamingDataFrame functions is not recommended, as it creates a new Producer
405+
instance each time, which is not optimized for repeated use in a streaming pipeline.
406+
407+
408+
<br>
409+
***Example Snippet:***
410+
411+
```python
412+
from quixstreams import Application
413+
414+
app = Application.Quix(...)
415+
topic = app.topic("input")
416+
417+
with app.get_producer() as producer:
418+
for i in range(100):
419+
producer.produce(topic=topic.name, key=b"key", value=b"value")
420+
```
421+
422+
<a id="quixstreams.app.Application.get_consumer"></a>
423+
424+
<br><br>
425+
426+
#### Application.get\_consumer
427+
428+
```python
429+
def get_consumer() -> Consumer
430+
```
431+
432+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L480)
433+
434+
Create and return a pre-configured Consumer instance.
435+
The Consumer is initialized with params passed to Application.
436+
437+
It's useful for consuming data from Kafka outside the standard Application processing flow.
438+
(e.g. to consume test data from a topic).
439+
Using it within the StreamingDataFrame functions is not recommended, as it creates a new Consumer instance
440+
each time, which is not optimized for repeated use in a streaming pipeline.
441+
442+
Note: By default this consumer does not autocommit consumed offsets to allow exactly-once processing.
443+
To store the offset call store_offsets() after processing a message.
444+
If autocommit is necessary set `enable.auto.offset.store` to True in the consumer config when creating the app.
445+
446+
447+
<br>
448+
***Example Snippet:***
449+
450+
```python
451+
from quixstreams import Application
452+
453+
app = Application.Quix(...)
454+
topic = app.topic("input")
455+
456+
with app.get_consumer() as consumer:
457+
consumer.subscribe([topic.name])
458+
while True:
459+
msg = consumer.poll(timeout=1.0)
460+
if msg is not None:
461+
# Process message
462+
# Optionally commit the offset
463+
# consumer.store_offsets(msg)
464+
465+
```
466+
387467
<a id="quixstreams.app.Application.clear_state"></a>
388468

389469
<br><br>
@@ -394,7 +474,7 @@ To otherwise stop an application, either send a `SIGTERM` to the process
394474
def clear_state()
395475
```
396476

397-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/780d6ed95124b27f911a7a7d8cd68572fd6d7f2a/quixstreams/app.py#L435)
477+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L529)
398478

399479
Clear the state of the application.
400480

@@ -408,7 +488,7 @@ Clear the state of the application.
408488
def run(dataframe: StreamingDataFrame)
409489
```
410490

411-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/780d6ed95124b27f911a7a7d8cd68572fd6d7f2a/quixstreams/app.py#L464)
491+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L558)
412492

413493
Start processing data from Kafka using provided `StreamingDataFrame`
414494

@@ -452,7 +532,7 @@ app.run(dataframe=df)
452532
class State(Protocol)
453533
```
454534

455-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/780d6ed95124b27f911a7a7d8cd68572fd6d7f2a/quixstreams/state/types.py#L95)
535+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/state/types.py#L95)
456536

457537
Primary interface for working with key-value state data from `StreamingDataFrame`
458538

@@ -466,7 +546,7 @@ Primary interface for working with key-value state data from `StreamingDataFrame
466546
def get(key: Any, default: Any = None) -> Optional[Any]
467547
```
468548

469-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/780d6ed95124b27f911a7a7d8cd68572fd6d7f2a/quixstreams/state/types.py#L100)
549+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/state/types.py#L100)
470550

471551
Get the value for key if key is present in the state, else default
472552

@@ -493,7 +573,7 @@ value or None if the key is not found and `default` is not provided
493573
def set(key: Any, value: Any)
494574
```
495575

496-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/780d6ed95124b27f911a7a7d8cd68572fd6d7f2a/quixstreams/state/types.py#L109)
576+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/state/types.py#L109)
497577

498578
Set value for the key.
499579

@@ -514,7 +594,7 @@ Set value for the key.
514594
def delete(key: Any)
515595
```
516596

517-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/780d6ed95124b27f911a7a7d8cd68572fd6d7f2a/quixstreams/state/types.py#L116)
597+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/state/types.py#L116)
518598

519599
Delete value for the key.
520600

@@ -536,7 +616,7 @@ This function always returns `None`, even if value is not found.
536616
def exists(key: Any) -> bool
537617
```
538618

539-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/780d6ed95124b27f911a7a7d8cd68572fd6d7f2a/quixstreams/state/types.py#L124)
619+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/state/types.py#L124)
540620

541621
Check if the key exists in state.
542622

0 commit comments

Comments
 (0)