This file is indexed.

/usr/lib/python3/dist-packages/influxdb/helper.py is in python3-influxdb 2.12.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# -*- coding: utf-8 -*-
"""
Helper class for InfluxDB
"""
from collections import namedtuple, defaultdict
from warnings import warn

import six


class SeriesHelper(object):

    """
    Subclassing this helper eases writing data points in bulk.
    All data points are immutable, insuring they do not get overwritten.
    Each subclass can write to its own database.
    The time series names can also be based on one or more defined fields.

    Annotated example::

        class MySeriesHelper(SeriesHelper):
            class Meta:
                # Meta class stores time series helper configuration.
                series_name = 'events.stats.{server_name}'
                # Series name must be a string, curly brackets for dynamic use.
                fields = ['time', 'server_name']
                # Defines all the fields in this time series.
                ### Following attributes are optional. ###
                client = TestSeriesHelper.client
                # Client should be an instance of InfluxDBClient.
                :warning: Only used if autocommit is True.
                bulk_size = 5
                # Defines the number of data points to write simultaneously.
                # Only applicable if autocommit is True.
                autocommit = True
                # If True and no bulk_size, then will set bulk_size to 1.

    """
    __initialized__ = False

    def __new__(cls, *args, **kwargs):
        """
        Initializes class attributes for subsequent constructor calls.

        :note: *args and **kwargs are not explicitly used in this function,
        but needed for Python 2 compatibility.
        """
        if not cls.__initialized__:
            cls.__initialized__ = True
            try:
                _meta = getattr(cls, 'Meta')
            except AttributeError:
                raise AttributeError(
                    'Missing Meta class in {0}.'.format(
                        cls.__name__))

            for attr in ['series_name', 'fields', 'tags']:
                try:
                    setattr(cls, '_' + attr, getattr(_meta, attr))
                except AttributeError:
                    raise AttributeError(
                        'Missing {0} in {1} Meta class.'.format(
                            attr,
                            cls.__name__))

            cls._autocommit = getattr(_meta, 'autocommit', False)

            cls._client = getattr(_meta, 'client', None)
            if cls._autocommit and not cls._client:
                raise AttributeError(
                    'In {0}, autocommit is set to True, but no client is set.'
                    .format(cls.__name__))

            try:
                cls._bulk_size = getattr(_meta, 'bulk_size')
                if cls._bulk_size < 1 and cls._autocommit:
                    warn(
                        'Definition of bulk_size in {0} forced to 1, '
                        'was less than 1.'.format(cls.__name__))
                    cls._bulk_size = 1
            except AttributeError:
                cls._bulk_size = -1
            else:
                if not cls._autocommit:
                    warn(
                        'Definition of bulk_size in {0} has no affect because'
                        ' autocommit is false.'.format(cls.__name__))

            cls._datapoints = defaultdict(list)
            cls._type = namedtuple(cls.__name__, cls._fields + cls._tags)

        return super(SeriesHelper, cls).__new__(cls)

    def __init__(self, **kw):
        """
        Constructor call creates a new data point. All fields must be present.

        :note: Data points written when `bulk_size` is reached per Helper.
        :warning: Data points are *immutable* (`namedtuples`).
        """
        cls = self.__class__

        if sorted(cls._fields + cls._tags) != sorted(kw.keys()):
            raise NameError(
                'Expected {0}, got {1}.'.format(
                    sorted(cls._fields + cls._tags),
                    kw.keys()))

        cls._datapoints[cls._series_name.format(**kw)].append(cls._type(**kw))

        if cls._autocommit and \
                sum(len(series) for series in cls._datapoints.values()) \
                >= cls._bulk_size:
            cls.commit()

    @classmethod
    def commit(cls, client=None):
        """
        Commit everything from datapoints via the client.

        :param client: InfluxDBClient instance for writing points to InfluxDB.
        :attention: any provided client will supersede the class client.
        :return: result of client.write_points.
        """
        if not client:
            client = cls._client
        rtn = client.write_points(cls._json_body_())
        cls._reset_()
        return rtn

    @classmethod
    def _json_body_(cls):
        """
        :return: JSON body of these datapoints.
        """
        json = []
        for series_name, data in six.iteritems(cls._datapoints):
            for point in data:
                json_point = {
                    "measurement": series_name,
                    "fields": {},
                    "tags": {},
                }

                for field in cls._fields:
                    json_point['fields'][field] = getattr(point, field)

                for tag in cls._tags:
                    json_point['tags'][tag] = getattr(point, tag)

                json.append(json_point)
        return json

    @classmethod
    def _reset_(cls):
        """
        Reset data storage.
        """
        cls._datapoints = defaultdict(list)