Skip to content

Commit 86c2d62

Browse files
feat: added callback function for getting profilers (#393)
1 parent 376f371 commit 86c2d62

File tree

7 files changed

+127
-25
lines changed

7 files changed

+127
-25
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
1. [#378](https://github.com/influxdata/influxdb-client-python/pull/378): Correct serialization DataFrame with nan values [DataFrame]
66
1. [#384](https://github.com/influxdata/influxdb-client-python/pull/384): Timeout can be specified as a `float`
77
1. [#380](https://github.com/influxdata/influxdb-client-python/pull/380): Correct data types for querying [DataFrame]
8-
1. [#391](https://github.com/influxdata/influxdb-client-python/pull/391): Ping function uses debug for log
8+
1. [#391](https://github.com/influxdata/influxdb-client-python/pull/391): Ping function uses debug for log
9+
10+
### Features
11+
1. [#393](https://github.com/influxdata/influxdb-client-python/pull/393): Added callback function for getting profilers output with example and test
912

1013
### CI
1114
1. [#370](https://github.com/influxdata/influxdb-client-python/pull/370): Add Python 3.10 to CI builds

README.rst

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,36 @@ Example of a profiler output:
337337
DurationSum : 940500
338338
MeanDuration : 940500.0
339339
340+
You can also use callback function to get profilers output.
341+
Return value of this callback is type of FluxRecord.
342+
343+
Example how to use profilers with callback:
344+
345+
.. code-block:: python
346+
347+
class ProfilersCallback(object):
348+
def __init__(self):
349+
self.records = []
350+
351+
def __call__(self, flux_record):
352+
self.records.append(flux_record.values)
353+
354+
callback = ProfilersCallback()
355+
356+
query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"], profiler_callback=callback))
357+
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
358+
359+
for profiler in callback.records:
360+
print(f'Custom processing of profiler result: {profiler}')
361+
362+
Example output of this callback:
363+
364+
.. code-block::
365+
366+
Custom processing of profiler result: {'result': '_profiler', 'table': 0, '_measurement': 'profiler/query', 'TotalDuration': 18843792, 'CompileDuration': 1078666, 'QueueDuration': 93375, 'PlanDuration': 0, 'RequeueDuration': 0, 'ExecuteDuration': 17371000, 'Concurrency': 0, 'MaxAllocated': 448, 'TotalAllocated': 0, 'RuntimeErrors': None, 'flux/query-plan': 'digraph {\r\n ReadRange2\r\n generated_yield\r\n\r\n ReadRange2 -> generated_yield\r\n}\r\n\r\n', 'influxdb/scanned-bytes': 0, 'influxdb/scanned-values': 0}
367+
Custom processing of profiler result: {'result': '_profiler', 'table': 1, '_measurement': 'profiler/operator', 'Type': '*influxdb.readFilterSource', 'Label': 'ReadRange2', 'Count': 1, 'MinDuration': 3274084, 'MaxDuration': 3274084, 'DurationSum': 3274084, 'MeanDuration': 3274084.0}
368+
369+
340370
.. marker-index-end
341371
342372

examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
- [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV`
1515
- [query_from_file.py](query_from_file.py) - How to use a Flux query defined in a separate file
1616
- [query_response_to_json.py](query_response_to_json.py) - How to serialize Query response to JSON
17+
- [query_with_profilers.py](query_with_profilers.py) - How to process profilers output by callback
1718

1819

1920
## Management API

examples/query_with_profilers.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from influxdb_client import InfluxDBClient, Point
2+
from influxdb_client.client.query_api import QueryOptions
3+
from influxdb_client.client.write_api import SYNCHRONOUS
4+
5+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True) as client:
6+
7+
"""
8+
Define callback to process profiler results.
9+
"""
10+
class ProfilersCallback(object):
11+
def __init__(self):
12+
self.records = []
13+
14+
def __call__(self, flux_record):
15+
self.records.append(flux_record.values)
16+
17+
18+
callback = ProfilersCallback()
19+
20+
write_api = client.write_api(write_options=SYNCHRONOUS)
21+
22+
"""
23+
Prepare data
24+
"""
25+
_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
26+
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)
27+
write_api.write(bucket="my-bucket", record=[_point1, _point2])
28+
29+
"""
30+
Pass callback to QueryOptions
31+
"""
32+
query_api = client.query_api(
33+
query_options=QueryOptions(profilers=["query", "operator"], profiler_callback=callback))
34+
35+
"""
36+
Perform query
37+
"""
38+
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
39+
40+
for profiler in callback.records:
41+
print(f'Custom processing of profiler result: {profiler}')

influxdb_client/client/flux_csv_parser.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,15 @@ class FluxCsvParser(object):
4545
"""Parse to processing response from InfluxDB to FluxStructures or DataFrame."""
4646

4747
def __init__(self, response: HTTPResponse, serialization_mode: FluxSerializationMode,
48-
data_frame_index: List[str] = None, profilers: List[str] = None) -> None:
48+
data_frame_index: List[str] = None, query_options=None) -> None:
4949
"""Initialize defaults."""
5050
self._response = response
5151
self.tables = []
5252
self._serialization_mode = serialization_mode
5353
self._data_frame_index = data_frame_index
5454
self._data_frame_values = []
55-
self._profilers = profilers
55+
self._profilers = query_options.profilers if query_options is not None else None
56+
self._profiler_callback = query_options.profiler_callback if query_options is not None else None
5657
pass
5758

5859
def __enter__(self):
@@ -289,16 +290,18 @@ def table_list(self) -> List[FluxTable]:
289290
else:
290291
return list(filter(lambda table: not self._is_profiler_table(table), self.tables))
291292

292-
@staticmethod
293-
def _print_profiler_info(flux_record: FluxRecord):
293+
def _print_profiler_info(self, flux_record: FluxRecord):
294294
if flux_record.get_measurement().startswith("profiler/"):
295-
msg = "Profiler: " + flux_record.get_measurement()
296-
print("\n" + len(msg) * "=")
297-
print(msg)
298-
print(len(msg) * "=")
299-
for name in flux_record.values:
300-
val = flux_record[name]
301-
if isinstance(val, str) and len(val) > 50:
302-
print(f"{name:<20}: \n\n{val}")
303-
elif val is not None:
304-
print(f"{name:<20}: {val:<20}")
295+
if self._profiler_callback:
296+
self._profiler_callback(flux_record)
297+
else:
298+
msg = "Profiler: " + flux_record.get_measurement()
299+
print("\n" + len(msg) * "=")
300+
print(msg)
301+
print(len(msg) * "=")
302+
for name in flux_record.values:
303+
val = flux_record[name]
304+
if isinstance(val, str) and len(val) > 50:
305+
print(f"{name:<20}: \n\n{val}")
306+
elif val is not None:
307+
print(f"{name:<20}: {val:<20}")

influxdb_client/client/query_api.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import codecs
88
import csv
99
from datetime import datetime, timedelta
10-
from typing import List, Generator, Any, Union, Iterable
10+
from typing import List, Generator, Any, Union, Iterable, Callable
1111

1212
from influxdb_client import Dialect, IntegerLiteral, BooleanLiteral, FloatLiteral, DateTimeLiteral, StringLiteral, \
1313
VariableAssignment, Identifier, OptionStatement, File, DurationLiteral, Duration, UnaryExpression, Expression, \
@@ -22,13 +22,15 @@
2222
class QueryOptions(object):
2323
"""Query options."""
2424

25-
def __init__(self, profilers: List[str] = None) -> None:
25+
def __init__(self, profilers: List[str] = None, profiler_callback: Callable = None) -> None:
2626
"""
2727
Initialize query options.
2828
2929
:param profilers: list of enabled flux profilers
30+
:param profiler_callback: callback function return profilers (FluxRecord)
3031
"""
3132
self.profilers = profilers
33+
self.profiler_callback = profiler_callback
3234

3335

3436
class QueryApi(object):
@@ -101,7 +103,7 @@ def query(self, query: str, org=None, params: dict = None) -> List['FluxTable']:
101103
async_req=False, _preload_content=False, _return_http_data_only=False)
102104

103105
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables,
104-
profilers=self._profilers())
106+
query_options=self._get_query_options())
105107

106108
list(_parser.generator())
107109

@@ -123,7 +125,7 @@ def query_stream(self, query: str, org=None, params: dict = None) -> Generator['
123125
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
124126
async_req=False, _preload_content=False, _return_http_data_only=False)
125127
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream,
126-
profilers=self._profilers())
128+
query_options=self._get_query_options())
127129

128130
return _parser.generator()
129131

@@ -176,17 +178,18 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s
176178

177179
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,
178180
data_frame_index=data_frame_index,
179-
profilers=self._profilers())
181+
query_options=self._get_query_options())
180182
return _parser.generator()
181183

182-
def _profilers(self):
184+
def _get_query_options(self):
183185
if self._query_options and self._query_options.profilers:
184-
return self._query_options.profilers
185-
else:
186-
return self._influxdb_client.profilers
186+
return self._query_options
187+
elif self._influxdb_client.profilers:
188+
return QueryOptions(profilers=self._influxdb_client.profilers)
187189

188190
def _create_query(self, query, dialect=default_dialect, params: dict = None):
189-
profilers = self._profilers()
191+
query_options = self._get_query_options()
192+
profilers = query_options.profilers if query_options is not None else None
190193
q = Query(query=query, dialect=dialect, extern=QueryApi._build_flux_ast(params, profilers))
191194

192195
if profilers:

tests/test_QueryApi.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,27 @@ def test_query_profiler_present(self):
376376
print(f"Profiler record: {flux_record}")
377377
self.assertTrue(found_profiler_records)
378378

379+
def test_profilers_callback(self):
380+
381+
class ProfilersCallback(object):
382+
def __init__(self):
383+
self.records = []
384+
385+
def __call__(self, flux_record):
386+
self.records.append(flux_record.values)
387+
388+
def get_record(self, num, val):
389+
return (self.records[num])[val]
390+
391+
callback = ProfilersCallback()
392+
393+
query_api = self.client.query_api(query_options=QueryOptions(profilers=["query", "operator"],
394+
profiler_callback=callback))
395+
query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
396+
397+
self.assertEqual("profiler/query", callback.get_record(0, "_measurement"))
398+
self.assertEqual("profiler/operator", callback.get_record(1, "_measurement"))
399+
379400
def test_profiler_ast(self):
380401

381402
expect = {

0 commit comments

Comments
 (0)