10
10
class StreamingDataFrame (BaseStreaming )
11
11
```
12
12
13
- [[VIEW SOURCE ]](https :// github .com / quixio / quix - streams / blob / main / quixstreams / dataframe / dataframe .py # L63 )
13
+ [[VIEW SOURCE ]](https :// github .com / quixio / quix - streams / blob / main / quixstreams / dataframe / dataframe .py # L86 )
14
14
15
15
`StreamingDataFrame` is the main object you will use for ETL work .
16
16
@@ -69,6 +69,7 @@ sdf = sdf.to_topic(topic_obj)
69
69
# ### StreamingDataFrame.apply
70
70
71
71
```python
72
+ @ _ensure_unlocked
72
73
def apply (func: Union[
73
74
ApplyCallback,
74
75
ApplyCallbackStateful,
@@ -81,7 +82,7 @@ def apply(func: Union[
81
82
metadata: bool = False ) -> Self
82
83
```
83
84
84
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L178 )
85
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L203 )
85
86
86
87
Apply a function to transform the value and return a new value.
87
88
@@ -128,6 +129,7 @@ Default - `False`.
128
129
# ### StreamingDataFrame.update
129
130
130
131
```python
132
+ @ _ensure_unlocked
131
133
def update(func: Union[
132
134
UpdateCallback,
133
135
UpdateCallbackStateful,
@@ -139,7 +141,7 @@ def update(func: Union[
139
141
metadata: bool = False ) -> Self
140
142
```
141
143
142
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L267 )
144
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L293 )
143
145
144
146
Apply a function to mutate value in - place or to perform a side effect
145
147
@@ -196,6 +198,7 @@ the updated StreamingDataFrame instance (reassignment NOT required).
196
198
# ### StreamingDataFrame.filter
197
199
198
200
```python
201
+ @ _ensure_unlocked
199
202
def filter (func: Union[
200
203
FilterCallback,
201
204
FilterCallbackStateful,
@@ -207,7 +210,7 @@ def filter(func: Union[
207
210
metadata: bool = False ) -> Self
208
211
```
209
212
210
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L359 )
213
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L386 )
211
214
212
215
Filter value using provided function.
213
216
@@ -251,6 +254,7 @@ Default - `False`.
251
254
# ### StreamingDataFrame.group\_by
252
255
253
256
```python
257
+ @ _ensure_unlocked
254
258
def group_by(key: Union[str , Callable[[Any], Any]],
255
259
name: Optional[str ] = None ,
256
260
value_deserializer: Optional[DeserializerType] = " json" ,
@@ -259,7 +263,7 @@ def group_by(key: Union[str, Callable[[Any], Any]],
259
263
key_serializer: Optional[SerializerType] = " json" ) -> Self
260
264
```
261
265
262
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L445 )
266
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L473 )
263
267
264
268
" Groups" messages by re- keying them via the provided group_by operation
265
269
@@ -324,7 +328,7 @@ a clone with this operation added (assign to keep its effect).
324
328
def contains(key: str ) -> StreamingSeries
325
329
```
326
330
327
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L523 )
331
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L551 )
328
332
329
333
Check if the key is present in the Row value.
330
334
@@ -360,10 +364,11 @@ or False otherwise.
360
364
# ### StreamingDataFrame.to\_topic
361
365
362
366
```python
367
+ @ _ensure_unlocked
363
368
def to_topic(topic: Topic, key: Optional[Callable[[Any], Any]] = None ) -> Self
364
369
```
365
370
366
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L548 )
371
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L577 )
367
372
368
373
Produce current value to a topic. You can optionally specify a new key.
369
374
@@ -413,10 +418,11 @@ the updated StreamingDataFrame instance (reassignment NOT required).
413
418
# ### StreamingDataFrame.set\_timestamp
414
419
415
420
```python
421
+ @ _ensure_unlocked
416
422
def set_timestamp(func: Callable[[Any, Any, int , Any], int ]) -> Self
417
423
```
418
424
419
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L593 )
425
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L623 )
420
426
421
427
Set a new timestamp based on the current message value and its metadata.
422
428
@@ -461,6 +467,7 @@ a new StreamingDataFrame instance
461
467
# ### StreamingDataFrame.set\_headers
462
468
463
469
```python
470
+ @ _ensure_unlocked
464
471
def set_headers(
465
472
func: Callable[
466
473
[Any, Any, int , List[Tuple[str , HeaderValue]]],
@@ -469,7 +476,7 @@ def set_headers(
469
476
) -> Self
470
477
```
471
478
472
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L634 )
479
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L665 )
473
480
474
481
Set new message headers based on the current message value and metadata.
475
482
@@ -515,10 +522,11 @@ a new StreamingDataFrame instance
515
522
# ### StreamingDataFrame.print
516
523
517
524
```python
525
+ @ _ensure_unlocked
518
526
def print (pretty: bool = True , metadata: bool = False ) -> Self
519
527
```
520
528
521
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L685 )
529
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L717 )
522
530
523
531
Print out the current message value (and optionally, the message metadata) to
524
532
@@ -574,7 +582,7 @@ def compose(
574
582
) -> Dict[str , VoidExecutor]
575
583
```
576
584
577
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L727 )
585
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L759 )
578
586
579
587
Compose all functions of this StreamingDataFrame into one big closure.
580
588
@@ -628,7 +636,7 @@ def test(value: Any,
628
636
topic: Optional[Topic] = None ) -> List[Any]
629
637
```
630
638
631
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L764 )
639
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L796 )
632
640
633
641
A shorthand to test `StreamingDataFrame` with provided value
634
642
@@ -660,12 +668,13 @@ result of `StreamingDataFrame`
660
668
# ### StreamingDataFrame.tumbling\_window
661
669
662
670
```python
671
+ @ _ensure_unlocked
663
672
def tumbling_window(duration_ms: Union[int , timedelta],
664
673
grace_ms: Union[int , timedelta] = 0 ,
665
674
name: Optional[str ] = None ) -> TumblingWindowDefinition
666
675
```
667
676
668
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L801 )
677
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L834 )
669
678
670
679
Create a tumbling window transformation on this StreamingDataFrame.
671
680
@@ -745,13 +754,14 @@ sdf = (
745
754
# ### StreamingDataFrame.hopping\_window
746
755
747
756
```python
757
+ @ _ensure_unlocked
748
758
def hopping_window(duration_ms: Union[int , timedelta],
749
759
step_ms: Union[int , timedelta],
750
760
grace_ms: Union[int , timedelta] = 0 ,
751
761
name: Optional[str ] = None ) -> HoppingWindowDefinition
752
762
```
753
763
754
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L877 )
764
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L911 )
755
765
756
766
Create a hopping window transformation on this StreamingDataFrame.
757
767
@@ -840,11 +850,12 @@ sdf = (
840
850
# ### StreamingDataFrame.drop
841
851
842
852
```python
853
+ @ _ensure_unlocked
843
854
def drop(columns: Union[str , List[str ]],
844
855
errors: Literal[" ignore" , " raise" ] = " raise" ) -> Self
845
856
```
846
857
847
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L969 )
858
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1004 )
848
859
849
860
Drop column(s) from the message value (value must support `del ` , like a dict ).
850
861
@@ -878,6 +889,35 @@ Default - `"raise"`.
878
889
879
890
a new StreamingDataFrame instance
880
891
892
+ < a id = " quixstreams.dataframe.dataframe.StreamingDataFrame.sink" >< / a>
893
+
894
+ < br>< br>
895
+
896
+ # ### StreamingDataFrame.sink
897
+
898
+ ```python
899
+ @ _ensure_unlocked
900
+ def sink(sink: BaseSink)
901
+ ```
902
+
903
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1049)
904
+
905
+ Sink the processed data to the specified destination.
906
+
907
+ Internally, each processed record is added to a sink, and the sinks are
908
+ flushed on each checkpoint.
909
+ The offset will be committed only if all the sinks for all topic partitions
910
+ are flushed successfully.
911
+
912
+ Additionally, Sinks may signal the backpressure to the application
913
+ (e.g., when the destination is rate- limited).
914
+ When this happens, the application will pause the corresponding topic partition
915
+ and resume again after the timeout.
916
+ The backpressure handling and timeouts are defined by the specific sinks.
917
+
918
+ Note: `sink()` is a terminal operation, and you cannot add new operations
919
+ to the same StreamingDataFrame after it' s called.
920
+
881
921
< a id = " quixstreams.dataframe.series" >< / a>
882
922
883
923
# # quixstreams.dataframe.series
0 commit comments