@@ -54,15 +54,29 @@ def TOPO(self):
5454 def format_response (self , data : dict , jreq : dict ) -> List [dict ]:
5555 respList = []
5656 metrics = set (data .values ())
57- for metric in metrics :
58- for st in metric .timeseries :
59- res = SingleTimeSeriesResponse (jreq .get ('inputQuery' ),
60- jreq .get ('showQuery' ),
61- jreq .get ('globalAnnotations' ),
62- st .tags , st .aggregatedTags )
63- # self.logger.trace(f'OpenTSDB queryResponse for :
64- # {data.keys()[0]} with {len(st.dps)} datapoints')
65- respList .append (res .to_dict (st .dps ))
57+ if jreq .get ('start' ) == 'last' :
58+ for metric in metrics :
59+ for st in metric .timeseries :
60+ timestmp = ''
61+ val = 'null'
62+ if len (st .dps ) > 0 :
63+ timestmp = list (st .dps .keys ())[0 ]
64+ val = st .dps [timestmp ]
65+ res = LastSingleTimeSeriesResponse (jreq .get ('inputQuery' ),
66+ timestmp ,
67+ val ,
68+ st .tags )
69+ respList .append (res .to_dict ())
70+ else :
71+ for metric in metrics :
72+ for st in metric .timeseries :
73+ res = SingleTimeSeriesResponse (jreq .get ('inputQuery' ),
74+ jreq .get ('showQuery' ),
75+ jreq .get ('globalAnnotations' ),
76+ st .tags , st .aggregatedTags )
77+ # self.logger.trace(f'OpenTSDB queryResponse for :
78+ # {data.keys()[0]} with {len(st.dps)} datapoints')
79+ respList .append (res .to_dict (st .dps ))
6680 return respList
6781
6882 @execution_time ()
@@ -115,29 +129,34 @@ def build_collector(self, jreq: dict) -> SensorCollector:
115129
116130 q = jreq .get ('inputQuery' )
117131
118- period = self .md .getSensorPeriodForMetric (q .get ('metric' ))
132+ sensor = self .TOPO .getSensorForMetric (q .get ('metric' ))
133+ period = self .md .getSensorPeriod (sensor )
119134 if period < 1 :
120135 self .logger .error (MSG ['SensorDisabled' ].format (q .get ('metric' )))
121136 raise cherrypy .HTTPError (
122137 400 , MSG ['SensorDisabled' ].format (q .get ('metric' )))
123138
124- sensor = self .TOPO .getSensorForMetric (q .get ('metric' ))
125-
126139 args = {}
127140 args ['metricsaggr' ] = {q .get ('metric' ): q .get ('aggregator' )}
128- args ['start' ] = str (int (int (str (jreq .get ('start' ))) / 1000 ))
129- if jreq .get ('end' ) is not None :
130- args ['end' ] = str (int (int (str (jreq .get ('end' ))) / 1000 ))
131141
132- if q .get ('downsample' ):
133- args ['dsOp' ] = self ._get_downsmpl_op (q .get ('downsample' ))
134- args ['dsBucketSize' ] = self ._calc_bucket_size (q .get ('downsample' ))
142+ if jreq .get ('start' ) == 'last' :
143+ args ['nsamples' ] = 1
144+ if q .get ('tags' ):
145+ args ['filters' ] = q .get ('tags' )
146+ else :
147+ args ['start' ] = str (int (int (str (jreq .get ('start' ))) / 1000 ))
148+ if jreq .get ('end' ) is not None :
149+ args ['end' ] = str (int (int (str (jreq .get ('end' ))) / 1000 ))
150+
151+ if q .get ('downsample' ):
152+ args ['dsOp' ] = self ._get_downsmpl_op (q .get ('downsample' ))
153+ args ['dsBucketSize' ] = self ._calc_bucket_size (q .get ('downsample' ))
135154
136- if q .get ('filters' ):
137- filters , grouptags = self ._parse_input_query_filters (
138- q .get ('filters' ))
139- args ['filters' ] = filters
140- args ['grouptags' ] = grouptags
155+ if q .get ('filters' ):
156+ filters , grouptags = self ._parse_input_query_filters (
157+ q .get ('filters' ))
158+ args ['filters' ] = filters
159+ args ['grouptags' ] = grouptags
141160
142161 args ['rawData' ] = q .get ('explicitTags' , False )
143162
@@ -296,6 +315,42 @@ def GET(self, **params):
296315 elif 'lookup' in cherrypy .request .script_name :
297316 resp = self .lookup (params )
298317
318+ # /api/query/last
319+ elif '/api/query/last' == cherrypy .request .script_name :
320+ jreq = {}
321+
322+ if params .get ('timeseries' ) is None :
323+ self .logger .error (MSG ['QueryError' ].format ('empty' ))
324+ raise cherrypy .HTTPError (400 , ERR [400 ])
325+
326+ queries = []
327+ timeseries = params .get ('timeseries' )
328+ if not isinstance (timeseries , list ):
329+ timeseries = [timeseries ]
330+ for timeserie in timeseries :
331+ try :
332+ metricDict = {}
333+ params_list = re .split (r'\{(.*)\}' , timeserie .strip ())
334+ if len (params_list [0 ]) == 0 :
335+ break
336+ metricDict ['metric' ] = params_list [0 ]
337+
338+ if len (params_list ) > 1 :
339+ attr = params_list [1 ]
340+ filterBy = dict (x .split ('=' ) for x in attr .split (',' ))
341+ metricDict ['tags' ] = filterBy
342+ queries .append (metricDict )
343+
344+ except Exception as e :
345+ self .logger .exception (MSG ['IntError' ].format (str (e )))
346+ raise cherrypy .HTTPError (500 , MSG [500 ])
347+ if len (queries ) == 0 :
348+ raise cherrypy .HTTPError (400 , ERR [400 ])
349+ jreq ['start' ] = 'last'
350+ jreq ['queries' ] = queries
351+
352+ resp = self .query (jreq )
353+
299354 elif 'aggregators' in cherrypy .request .script_name :
300355 resp = ["noop" , "sum" , "avg" , "max" , "min" , "rate" ]
301356
@@ -336,7 +391,7 @@ def POST(self):
336391 raise cherrypy .HTTPError (400 , ERR [400 ])
337392
338393 # /api/query
339- if 'query' in cherrypy .request .script_name :
394+ if '/api/ query' == cherrypy .request .script_name :
340395
341396 # read query request parameters
342397 jreq = cherrypy .request .json
@@ -399,3 +454,17 @@ def to_dict(self, dps: dict = None):
399454 if dps :
400455 res ['dps' ] = dps
401456 return res
457+
458+
459+ class LastSingleTimeSeriesResponse (object ):
460+
461+ def __init__ (self , inputQuery , timestmp , value , tags : dict = None ):
462+ self .metric = inputQuery .get ('metric' )
463+ self .timestamp = timestmp
464+ self .value = value
465+ self .tags = tags or defaultdict (list )
466+
467+ def to_dict (self ):
468+ ''' Converts the LastSingleTimeSeriesResponse object to dict. '''
469+ res = self .__dict__
470+ return res
0 commit comments