10
10
class Application ()
11
11
```
12
12
13
- [[VIEW SOURCE ]](https :// github .com / quixio / quix - streams / blob / main / quixstreams / app .py # L87 )
13
+ [[VIEW SOURCE ]](https :// github .com / quixio / quix - streams / blob / main / quixstreams / app .py # L89 )
14
14
15
15
The main Application class .
16
16
@@ -87,7 +87,7 @@ def __init__(broker_address: Optional[Union[str, ConnectionConfig]] = None,
87
87
max_partition_buffer_size: int = 10000 )
88
88
```
89
89
90
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L125 )
90
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L127 )
91
91
92
92
93
93
< br>
@@ -188,7 +188,7 @@ instead of the default one.
188
188
def Quix(cls , * args, ** kwargs)
189
189
```
190
190
191
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L390 )
191
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L397 )
192
192
193
193
RAISES EXCEPTION : DEPRECATED .
194
194
@@ -211,7 +211,7 @@ def topic(name: str,
211
211
timestamp_extractor: Optional[TimestampExtractor] = None ) -> Topic
212
212
```
213
213
214
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L422 )
214
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L429 )
215
215
216
216
Create a topic definition.
217
217
@@ -293,7 +293,7 @@ def dataframe(topic: Optional[Topic] = None,
293
293
source: Optional[BaseSource] = None ) -> StreamingDataFrame
294
294
```
295
295
296
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L502 )
296
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L509 )
297
297
298
298
A simple helper method that generates a `StreamingDataFrame` , which is used
299
299
@@ -349,7 +349,7 @@ to be used as an input topic.
349
349
def stop(fail: bool = False )
350
350
```
351
351
352
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L558 )
352
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L565 )
353
353
354
354
Stop the internal poll loop and the message processing.
355
355
@@ -373,12 +373,13 @@ to unhandled exception, and it shouldn't commit the current checkpoint.
373
373
# ### Application.get\_producer
374
374
375
375
```python
376
- def get_producer() -> Producer
376
+ def get_producer(transactional: bool = False ) -> Producer
377
377
```
378
378
379
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L603 )
379
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L610 )
380
380
381
381
Create and return a pre- configured Producer instance.
382
+
382
383
The Producer is initialized with params passed to Application.
383
384
384
385
It' s useful for producing data to Kafka outside the standard Application processing flow,
@@ -387,6 +388,15 @@ Using this within the StreamingDataFrame functions is not recommended, as it cre
387
388
instance each time, which is not optimized for repeated use in a streaming pipeline.
388
389
389
390
391
+ < br>
392
+ ** * Arguments:***
393
+
394
+ - `transactional` : if True , the producer will be configured to use transactions
395
+ regardless of Application' s processing guarantee setting. But the responsibility
396
+ for beginning and committing the transaction is on the user.
397
+ Default - False .
398
+
399
+
390
400
< br>
391
401
** * Example Snippet:***
392
402
@@ -411,7 +421,7 @@ with app.get_producer() as producer:
411
421
def get_consumer(auto_commit_enable: bool = True ) -> Consumer
412
422
```
413
423
414
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L658 )
424
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L678 )
415
425
416
426
Create and return a pre- configured Consumer instance.
417
427
@@ -468,7 +478,7 @@ with app.get_consumer() as consumer:
468
478
def clear_state()
469
479
```
470
480
471
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L707 )
481
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L727 )
472
482
473
483
Clear the state of the application.
474
484
@@ -482,7 +492,7 @@ Clear the state of the application.
482
492
def add_source(source: BaseSource, topic: Optional[Topic] = None ) -> Topic
483
493
```
484
494
485
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L713 )
495
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L733 )
486
496
487
497
Add a source to the application.
488
498
@@ -513,7 +523,7 @@ def run(dataframe: Optional[StreamingDataFrame] = None,
513
523
metadata: bool = False ) -> list[dict ]
514
524
```
515
525
516
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L746 )
526
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L766 )
517
527
518
528
Start processing data from Kafka using provided `StreamingDataFrame`
519
529
@@ -589,7 +599,7 @@ Default - `False`.
589
599
class ApplicationConfig(BaseSettings)
590
600
```
591
601
592
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1104 )
602
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1124 )
593
603
594
604
Immutable object holding the application configuration
595
605
@@ -612,7 +622,7 @@ def settings_customise_sources(
612
622
) -> Tuple[PydanticBaseSettingsSource, ... ]
613
623
```
614
624
615
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1140 )
625
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1160 )
616
626
617
627
Included to ignore reading/ setting values from the environment
618
628
@@ -626,7 +636,23 @@ Included to ignore reading/setting values from the environment
626
636
def copy(** kwargs) -> " ApplicationConfig"
627
637
```
628
638
629
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1153 )
639
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1173 )
630
640
631
641
Update the application config and return a copy
632
642
643
+ < a id = " quixstreams.app.resolve_transactional_id" >< / a>
644
+
645
+ < br>< br>
646
+
647
+ # ### resolve\_transactional\_id
648
+
649
+ ```python
650
+ def resolve_transactional_id(transactional_id: Optional[str ],
651
+ prefix: str ) -> str
652
+ ```
653
+
654
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1195)
655
+
656
+ Utility function to resolve the transactional.id based
657
+ on existing config and provided prefix.
658
+
0 commit comments