/usr/lib/python3/dist-packages/influxdb/resultset.py is in python3-influxdb 2.12.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | # -*- coding: utf-8 -*-
import warnings
from influxdb.exceptions import InfluxDBClientError
_sentinel = object()
class ResultSet(object):
"""A wrapper around a single InfluxDB query result"""
def __init__(self, series, raise_errors=True):
self._raw = series
self._error = self.raw.get('error', None)
if self.error is not None and raise_errors is True:
raise InfluxDBClientError(self.error)
@property
def raw(self):
"""Raw JSON from InfluxDB"""
return self._raw
@raw.setter
def raw(self, value):
self._raw = value
@property
def error(self):
"""Error returned by InfluxDB"""
return self._error
def __getitem__(self, key):
"""
:param key: Either a serie name, or a tags_dict, or
a 2-tuple(serie_name, tags_dict).
If the serie name is None (or not given) then any serie
matching the eventual given tags will be given its points
one after the other.
To get the points of every serie in this resultset then
you have to provide None as key.
:return: A generator yielding `Point`s matching the given key.
NB:
The order in which the points are yielded is actually undefined but
it might change..
"""
warnings.warn(
("ResultSet's ``__getitem__`` method will be deprecated. Use"
"``get_points`` instead."),
DeprecationWarning
)
if isinstance(key, tuple):
if 2 != len(key):
raise TypeError('only 2-tuples allowed')
name = key[0]
tags = key[1]
if not isinstance(tags, dict) and tags is not None:
raise TypeError('tags should be a dict')
elif isinstance(key, dict):
name = None
tags = key
else:
name = key
tags = None
return self.get_points(name, tags)
def get_points(self, measurement=None, tags=None):
"""
Returns a generator for all the points that match the given filters.
:param measurement: The measurement name
:type measurement: str
:param tags: Tags to look for
:type tags: dict
:return: Points generator
"""
# Raise error if measurement is not str or bytes
if not isinstance(measurement,
(bytes, type(b''.decode()), type(None))):
raise TypeError('measurement must be an str or None')
for serie in self._get_series():
serie_name = serie.get('measurement', serie.get('name', 'results'))
if serie_name is None:
# this is a "system" query or a query which
# doesn't return a name attribute.
# like 'show retention policies' ..
if tags is None:
for item in self._get_points_for_serie(serie):
yield item
elif measurement in (None, serie_name):
# by default if no tags was provided then
# we will matches every returned serie
serie_tags = serie.get('tags', {})
if tags is None or self._tag_matches(serie_tags, tags):
for item in self._get_points_for_serie(serie):
yield item
def __repr__(self):
items = []
for item in self.items():
items.append("'%s': %s" % (item[0], list(item[1])))
return "ResultSet({%s})" % ", ".join(items)
def __iter__(self):
""" Iterating a ResultSet will yield one dict instance per serie result.
"""
for key in self.keys():
yield list(self.__getitem__(key))
def _tag_matches(self, tags, filter):
"""Checks if all key/values in filter match in tags"""
for tag_name, tag_value in filter.items():
# using _sentinel as I'm not sure that "None"
# could be used, because it could be a valid
# serie_tags value : when a serie has no such tag
# then I think it's set to /null/None/.. TBC..
serie_tag_value = tags.get(tag_name, _sentinel)
if serie_tag_value != tag_value:
return False
return True
def _get_series(self):
"""Returns all series"""
return self.raw.get('series', [])
def __len__(self):
return len(self.keys())
def keys(self):
"""
:return: List of keys. Keys are tuples (serie_name, tags)
"""
keys = []
for serie in self._get_series():
keys.append(
(serie.get('measurement',
serie.get('name', 'results')),
serie.get('tags', None))
)
return keys
def items(self):
"""
:return: List of tuples, (key, generator)
"""
items = []
for serie in self._get_series():
serie_key = (serie.get('measurement',
serie.get('name', 'results')),
serie.get('tags', None))
items.append(
(serie_key, self._get_points_for_serie(serie))
)
return items
def _get_points_for_serie(self, serie):
""" Return generator of dict from columns and values of a serie
:param serie: One serie
:return: Generator of dicts
"""
for point in serie.get('values', []):
yield self.point_from_cols_vals(
serie['columns'],
point
)
@staticmethod
def point_from_cols_vals(cols, vals):
""" Creates a dict from columns and values lists
:param cols: List of columns
:param vals: List of values
:return: Dict where keys are columns.
"""
point = {}
for col_index, col_name in enumerate(cols):
point[col_name] = vals[col_index]
return point
|