Skip to content

Commit 32671fd

Browse files
author
Tony Bedford
authored
[task/34235] - add missing event docs (#91)
* Add missing docs and examples for events
1 parent e18b0a6 commit 32671fd

File tree

2 files changed

+135
-3
lines changed

2 files changed

+135
-3
lines changed

docs/app-management.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ from quixstreams import App
132132
App.run()
133133
```
134134

135+
## Dispose events
136+
137+
When a topic is disposed, it is possible to run additional code by linking the `on_disposed` event to an appropriate handler. The `on_disposed` event occurs at the end of the topic disposal process.
138+
135139
## Keep alive
136140

137141
Unless you add an infinite loop or similar code, a Python code file will run each code statement sequentially until the end of the file, and then exit.

docs/subscribe.md

Lines changed: 131 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,38 @@ topic_consumer.subscribe()
244244

245245
The conversions from [TimeseriesData](#timeseriesdata-format) to pandas DataFrame have an intrinsic cost overhead. For high-performance models using pandas DataFrame, you should use the `on_dataframe_received` callback provided by the library, which is optimized to do as few conversions as possible.
246246

247+
### Raw data format
248+
249+
In addition to the `TimeseriesData` and pandas `DataFrame` formats (Python only), there is also the raw data format. You can use the `on_raw_received` callback (Python), or `OnRawRceived` event (C#) to handle this data format, as demonstrated in the following code:
250+
251+
=== "Python"
252+
253+
``` python
254+
from quixstreams import TopicConsumer, StreamConsumer, TimeseriesDataRaw
255+
256+
def on_stream_received_handler(stream_received: StreamConsumer):
257+
stream_received.timeseries.on_raw_received = on_timeseries_raw_received_handler
258+
259+
def on_timeseries_raw_received_handler(stream: StreamConsumer, data: TimeseriesDataRaw):
260+
with data:
261+
# consume from input stream
262+
print(data)
263+
264+
topic_consumer.on_stream_received = on_stream_received_handler
265+
topic_consumer.subscribe()
266+
```
267+
268+
=== "C\#"
269+
270+
In C#, you typically use the raw format when you want to maximize performance:
271+
272+
``` cs
273+
receivedStream.Timeseries.OnRawReceived += (sender, args) =>
274+
{
275+
streamWriter.Timeseries.Publish(args.Data);
276+
};
277+
```
278+
247279
### Using a Buffer
248280

249281
Quix Streams provides you with an optional programmable buffer which you can tailor to your needs. Using buffers to consume data allows you to process data in batches according to your needs. The buffer also helps you to develop models with a high-performance throughput.
@@ -304,6 +336,8 @@ Consuming data from that buffer is as simple as using its callback (Python) or e
304336
};
305337
```
306338

339+
Other calbacks are available in addition to `on_data_released` (for `TimeseriesData`), including `on_dataframe_released` (for pandas `DataFrame`) and `on_raw_released` (for `TimeseriesDataRaw`). You use the callback appropriate to your stream data format.
340+
307341
You can configure multiple conditions to determine when the buffer has to release data, if any of these conditions become true, the buffer will release a new packet of data and that data is cleared from the buffer:
308342

309343
=== "Python"
@@ -412,6 +446,98 @@ Event consumed for stream. Event Id: motor-off
412446
Event consumed for stream. Event Id: race-event3
413447
```
414448

449+
## Responding to changes in stream properties
450+
451+
If the properties of a stream are changed, the consumer can detect this and handle it using the `on_changed` method.
452+
453+
You can write the handler:
454+
455+
=== "Python"
456+
457+
``` python
458+
def on_stream_properties_changed_handler(stream_consumer: qx.StreamConsumer):
459+
print('stream properties changed for stream: ', stream_consumer.stream_id)
460+
```
461+
462+
=== "C\#"
463+
464+
``` cs
465+
streamConsumer.Properties.OnChanged += (sender, args) =>
466+
{
467+
Console.WriteLine($"Properties changed for stream: {streamConsumer.StreamId}");
468+
}
469+
```
470+
471+
Then register the properties change handler:
472+
473+
=== "Python"
474+
475+
``` python
476+
def on_stream_received_handler(stream_consumer: qx.StreamConsumer):
477+
stream_consumer.events.on_data_received = on_event_data_received_handler
478+
stream_consumer.properties.on_changed = on_stream_properties_changed_handler
479+
```
480+
481+
=== "C\#"
482+
483+
For C#, locate the properties changed handler inside the `OnStreamReceived` callback, for example:
484+
485+
``` cs
486+
topicConsumer.OnStreamReceived += (topic, streamConsumer) =>
487+
{
488+
streamConsumer.Timeseries.OnDataReceived += (sender, args) =>
489+
{
490+
Console.WriteLine("Data received");
491+
};
492+
493+
streamConsumer.Properties.OnChanged += (sender, args) =>
494+
{
495+
Console.WriteLine($"Properties changed for stream: {streamConsumer.StreamId}");
496+
}
497+
498+
};
499+
500+
topicConsumer.Subscribe();
501+
```
502+
503+
You can keep a copy of the properties if you need to find out which properties had changed.
504+
505+
## Responding to changes in parameter definitions
506+
507+
It is possible to handle changes in [parameter definitions](./publish.md#parameter-definitions). Parameter definitions are metadata attached to data in a stream. The `on_definitions_changed` event is linked to an appropriate event handler, as shown in the following example code:
508+
509+
=== "Python"
510+
511+
``` python
512+
def on_definitions_changed_handler(stream_consumer: qx.StreamConsumer):
513+
# handle change in definitions
514+
515+
516+
def on_stream_received_handler(stream_consumer: qx.StreamConsumer):
517+
stream_consumer.events.on_data_received = on_event_data_received_handler
518+
stream_consumer.events.on_definitions_changed = on_definitions_changed_handler
519+
```
520+
521+
=== "C\#"
522+
523+
``` cs
524+
topicConsumer.OnStreamReceived += (topic, streamConsumer) =>
525+
{
526+
streamConsumer.Events.OnDataReceived += (sender, args) =>
527+
{
528+
Console.WriteLine("Data received");
529+
};
530+
531+
streamConsumer.Events.OnDefinitionsChanged += (sender, args) =>
532+
{
533+
Console.WriteLine("Definitions changed");
534+
};
535+
536+
};
537+
538+
topicConsumer.Subscribe();
539+
```
540+
415541
## Committing / checkpointing
416542

417543
It is important to be aware of the commit concept when working with a broker. Committing allows you to mark how far data has been processed, also known as creating a [checkpoint](kafka.md#checkpointing). In the event of a restart or rebalance, the client only processes messages from the last committed position. Commits are done for each consumer group, so if you have several consumer groups in use, they do not affect each another when committing.
@@ -487,10 +613,10 @@ Then, whenever your commit condition fulfils, call:
487613

488614
The piece of code above will commit anything – like parameter, event or metadata - consumed and served to you from the topic you subscribed to up to this point.
489615

490-
### Commit callback
616+
### Committed and committing events
491617

492618
=== "Python"
493-
Whenever a commit occurs, a callback is raised to let you know. This callback is invoked for both manual and automatic commits. You can set the callback using the following code:
619+
Whenever a commit completes, a callback is raised that can be connected to a handler. This callback is invoked for both manual and automatic commits. You can set the callback using the following code:
494620

495621
``` python
496622
from quixstreams import TopicConsumer
@@ -502,7 +628,7 @@ Whenever a commit occurs, a callback is raised to let you know. This callback is
502628
```
503629

504630
=== "C\#"
505-
Whenever a commit occurs, an event is raised to let you know. This event is raised for both manual and automatic commits. You can subscribe to this event using the following code:
631+
Whenever a commit completes, an event is raised that can be connected to a handler. This event is raised for both manual and automatic commits. You can subscribe to this event using the following code:
506632

507633
``` cs
508634
topicConsumer.OnCommitted += (sender, args) =>
@@ -511,6 +637,8 @@ Whenever a commit occurs, an event is raised to let you know. This event is rais
511637
};
512638
```
513639

640+
While the `on_committed` event is triggered once the data has been committed, there is also the `on_committing` event which is triggered at the beginning of the commit cycle, should you need to carry out other tasks before the data is committed.
641+
514642
### Auto offset reset
515643

516644
You can control the offset that data is received from by optionally specifying `AutoOffsetReset` when you open the topic.

0 commit comments

Comments
 (0)