Skip to content

Commit

Permalink
Merge pull request #274 from Helene/query_last
Browse files Browse the repository at this point in the history
Add endpoint for querying last metric sample (OpenTSDB plugin)
  • Loading branch information
Helene authored Jan 20, 2025
2 parents 616ce1e + a2f202e commit 06923b2
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 24 deletions.
117 changes: 93 additions & 24 deletions source/opentsdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,29 @@ def TOPO(self):
def format_response(self, data: dict, jreq: dict) -> List[dict]:
respList = []
metrics = set(data.values())
for metric in metrics:
for st in metric.timeseries:
res = SingleTimeSeriesResponse(jreq.get('inputQuery'),
jreq.get('showQuery'),
jreq.get('globalAnnotations'),
st.tags, st.aggregatedTags)
# self.logger.trace(f'OpenTSDB queryResponse for :
# {data.keys()[0]} with {len(st.dps)} datapoints')
respList.append(res.to_dict(st.dps))
if jreq.get('start') == 'last':
for metric in metrics:
for st in metric.timeseries:
timestmp = ''
val = 'null'
if len(st.dps) > 0:
timestmp = list(st.dps.keys())[0]
val = st.dps[timestmp]
res = LastSingleTimeSeriesResponse(jreq.get('inputQuery'),
timestmp,
val,
st.tags)
respList.append(res.to_dict())
else:
for metric in metrics:
for st in metric.timeseries:
res = SingleTimeSeriesResponse(jreq.get('inputQuery'),
jreq.get('showQuery'),
jreq.get('globalAnnotations'),
st.tags, st.aggregatedTags)
# self.logger.trace(f'OpenTSDB queryResponse for :
# {data.keys()[0]} with {len(st.dps)} datapoints')
respList.append(res.to_dict(st.dps))
return respList

@execution_time()
Expand Down Expand Up @@ -115,29 +129,34 @@ def build_collector(self, jreq: dict) -> SensorCollector:

q = jreq.get('inputQuery')

period = self.md.getSensorPeriodForMetric(q.get('metric'))
sensor = self.TOPO.getSensorForMetric(q.get('metric'))
period = self.md.getSensorPeriod(sensor)
if period < 1:
self.logger.error(MSG['SensorDisabled'].format(q.get('metric')))
raise cherrypy.HTTPError(
400, MSG['SensorDisabled'].format(q.get('metric')))

sensor = self.TOPO.getSensorForMetric(q.get('metric'))

args = {}
args['metricsaggr'] = {q.get('metric'): q.get('aggregator')}
args['start'] = str(int(int(str(jreq.get('start'))) / 1000))
if jreq.get('end') is not None:
args['end'] = str(int(int(str(jreq.get('end'))) / 1000))

if q.get('downsample'):
args['dsOp'] = self._get_downsmpl_op(q.get('downsample'))
args['dsBucketSize'] = self._calc_bucket_size(q.get('downsample'))
if jreq.get('start') == 'last':
args['nsamples'] = 1
if q.get('tags'):
args['filters'] = q.get('tags')
else:
args['start'] = str(int(int(str(jreq.get('start'))) / 1000))
if jreq.get('end') is not None:
args['end'] = str(int(int(str(jreq.get('end'))) / 1000))

if q.get('downsample'):
args['dsOp'] = self._get_downsmpl_op(q.get('downsample'))
args['dsBucketSize'] = self._calc_bucket_size(q.get('downsample'))

if q.get('filters'):
filters, grouptags = self._parse_input_query_filters(
q.get('filters'))
args['filters'] = filters
args['grouptags'] = grouptags
if q.get('filters'):
filters, grouptags = self._parse_input_query_filters(
q.get('filters'))
args['filters'] = filters
args['grouptags'] = grouptags

args['rawData'] = q.get('explicitTags', False)

Expand Down Expand Up @@ -296,6 +315,42 @@ def GET(self, **params):
elif 'lookup' in cherrypy.request.script_name:
resp = self.lookup(params)

# /api/query/last
elif '/api/query/last' == cherrypy.request.script_name:
jreq = {}

if params.get('timeseries') is None:
self.logger.error(MSG['QueryError'].format('empty'))
raise cherrypy.HTTPError(400, ERR[400])

queries = []
timeseries = params.get('timeseries')
if not isinstance(timeseries, list):
timeseries = [timeseries]
for timeserie in timeseries:
try:
metricDict = {}
params_list = re.split(r'\{(.*)\}', timeserie.strip())
if len(params_list[0]) == 0:
break
metricDict['metric'] = params_list[0]

if len(params_list) > 1:
attr = params_list[1]
filterBy = dict(x.split('=') for x in attr.split(','))
metricDict['tags'] = filterBy
queries.append(metricDict)

except Exception as e:
self.logger.exception(MSG['IntError'].format(str(e)))
raise cherrypy.HTTPError(500, MSG[500])
if len(queries) == 0:
raise cherrypy.HTTPError(400, ERR[400])
jreq['start'] = 'last'
jreq['queries'] = queries

resp = self.query(jreq)

elif 'aggregators' in cherrypy.request.script_name:
resp = ["noop", "sum", "avg", "max", "min", "rate"]

Expand Down Expand Up @@ -336,7 +391,7 @@ def POST(self):
raise cherrypy.HTTPError(400, ERR[400])

# /api/query
if 'query' in cherrypy.request.script_name:
if '/api/query' == cherrypy.request.script_name:

# read query request parameters
jreq = cherrypy.request.json
Expand Down Expand Up @@ -399,3 +454,17 @@ def to_dict(self, dps: dict = None):
if dps:
res['dps'] = dps
return res


class LastSingleTimeSeriesResponse(object):

def __init__(self, inputQuery, timestmp, value, tags: dict = None):
self.metric = inputQuery.get('metric')
self.timestamp = timestmp
self.value = value
self.tags = tags or defaultdict(list)

def to_dict(self):
''' Converts the LastSingleTimeSeriesResponse object to dict. '''
res = self.__dict__
return res
6 changes: 6 additions & 0 deletions source/zimonGrafanaIntf.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,12 @@ def main(argv):
}
}
)
# query metric last value
cherrypy.tree.mount(api, '/api/query/last',
{'/':
{'request.dispatch': cherrypy.dispatch.MethodDispatcher()}
}
)
# query for metric name (openTSDB: zimon extension returns keys as well)
cherrypy.tree.mount(api, '/api/suggest',
{'/':
Expand Down
30 changes: 30 additions & 0 deletions tests/test_opentsdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,23 @@ def my_setup():
}


def query_last_setup():
global key1, col1, labels, filtersMap, dps1, ts1, metricTS, data, jreq

key1 = Key._from_string('scale-16|CPU|cpu_user', '')
col1 = ColumnInfo(name='cpu_user', semType=1, keys=(key1,), column=0)
filtersMap = [{'node': 'scale-11'}, {'node': 'scale-12'}, {'node': 'scale-13'}, {'node': 'scale-14'}, {'node': 'scale-15'}, {'node': 'scale-16'}]
labels = ['node']
dps1 = {1737321193: 3.0}
ts1 = TimeSeries(col1, dps1, filtersMap, labels)
metricTS = MetricTimeSeries('cpu_user', '')
metricTS.timeseries = [ts1]
data = {'cpu_user': metricTS}
jreq = {'start': 'last', 'inputQuery': {'metric': 'cpu_user',
'tags': {'node': 'scale-16'},
'index': 0}}


@with_setup(my_setup)
def test_case01():
ts = TimeSeries(col3, dps2, filtersMap, labels)
Expand Down Expand Up @@ -109,3 +126,16 @@ def test_case04():
response = profiler.stats(os.path.join(profiler.path, "profiling_format_response.prof"))
assert response is not None
print('\n'.join(response) + '\n')


@with_setup(query_last_setup)
def test_case05():
with mock.patch('source.metadata.MetadataHandler') as md:
md_instance = md.return_value
logger = logging.getLogger(__name__)
opentsdb = OpenTsdbApi(logger, md_instance, '9999')
resp = opentsdb.format_response(data, jreq)
assert set(resp[0].keys()) == set(['metric', 'timestamp', 'value', 'tags'])
assert resp[0].get('metric') == "cpu_user"
assert 'gpfs_fs_name' not in resp[0].get('tags')
assert 'node' in resp[0].get('tags')

0 comments on commit 06923b2

Please sign in to comment.