This file is indexed.

/usr/lib/python3/dist-packages/gnocchi/incoming/redis.py is in python3-gnocchi 4.2.0-0ubuntu5.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# -*- encoding: utf-8 -*-
#
# Copyright © 2017 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib

import six

from gnocchi.common import redis
from gnocchi import incoming


class RedisStorage(incoming.IncomingDriver):

    def __init__(self, conf, greedy=True):
        super(RedisStorage, self).__init__(conf)
        self._client = redis.get_client(conf)
        self.greedy = greedy

    def __str__(self):
        return "%s: %s" % (self.__class__.__name__, self._client)

    def _get_storage_sacks(self):
        return self._client.hget(self.CFG_PREFIX, self.CFG_SACKS)

    def set_storage_settings(self, num_sacks):
        self._client.hset(self.CFG_PREFIX, self.CFG_SACKS, num_sacks)

    @staticmethod
    def remove_sack_group(num_sacks):
        # NOTE(gordc): redis doesn't maintain keys with empty values
        pass

    def _build_measure_path_with_sack(self, metric_id, sack_name):
        return redis.SEP.join([sack_name.encode(), str(metric_id).encode()])

    def _build_measure_path(self, metric_id):
        return self._build_measure_path_with_sack(
            metric_id, self.get_sack_name(self.sack_for_metric(metric_id)))

    def add_measures_batch(self, metrics_and_measures):
        notified_sacks = set()
        pipe = self._client.pipeline(transaction=False)
        for metric_id, measures in six.iteritems(metrics_and_measures):
            sack_name = self.get_sack_name(self.sack_for_metric(metric_id))
            path = self._build_measure_path_with_sack(metric_id, sack_name)
            pipe.rpush(path, self._encode_measures(measures))
            if self.greedy and sack_name not in notified_sacks:
                # value has no meaning, we just use this for notification
                pipe.setnx(sack_name, 1)
                notified_sacks.add(sack_name)
        pipe.execute()

    def _build_report(self, details):
        report_vars = {'measures': 0, 'metric_details': {}}

        def update_report(results, m_list):
            report_vars['measures'] += sum(results)
            if details:
                report_vars['metric_details'].update(
                    dict(six.moves.zip(m_list, results)))

        match = redis.SEP.join([self.get_sack_name("*").encode(), b"*"])
        metrics = 0
        m_list = []
        pipe = self._client.pipeline()
        for key in self._client.scan_iter(match=match, count=1000):
            metrics += 1
            pipe.llen(key)
            if details:
                m_list.append(key.split(redis.SEP)[1].decode("utf8"))
            # group 100 commands/call
            if metrics % 100 == 0:
                results = pipe.execute()
                update_report(results, m_list)
                m_list = []
                pipe = self._client.pipeline()
        else:
            results = pipe.execute()
            update_report(results, m_list)
        return (metrics, report_vars['measures'],
                report_vars['metric_details'] if details else None)

    def list_metric_with_measures_to_process(self, sack):
        match = redis.SEP.join([self.get_sack_name(sack).encode(), b"*"])
        keys = self._client.scan_iter(match=match, count=1000)
        return set([k.split(redis.SEP)[1].decode("utf8") for k in keys])

    def delete_unprocessed_measures_for_metric(self, metric_id):
        self._client.delete(self._build_measure_path(metric_id))

    def has_unprocessed(self, metric_id):
        return bool(self._client.exists(self._build_measure_path(metric_id)))

    @contextlib.contextmanager
    def process_measure_for_metric(self, metric_id):
        key = self._build_measure_path(metric_id)
        item_len = self._client.llen(key)
        # lrange is inclusive on both ends, decrease to grab exactly n items
        item_len = item_len - 1 if item_len else item_len

        yield self._unserialize_measures(metric_id, b"".join(
            self._client.lrange(key, 0, item_len)))

        # ltrim is inclusive, bump 1 to remove up to and including nth item
        self._client.ltrim(key, item_len + 1, -1)

    def iter_on_sacks_to_process(self):
        self._client.config_set("notify-keyspace-events", "K$")
        p = self._client.pubsub()
        db = self._client.connection_pool.connection_kwargs['db']
        keyspace = b"__keyspace@" + str(db).encode() + b"__:"
        pattern = keyspace + self.SACK_PREFIX.encode() + b"*"
        p.psubscribe(pattern)
        for message in p.listen():
            if message['type'] == 'pmessage' and message['pattern'] == pattern:
                # FIXME(jd) This is awful, we need a better way to extract this
                # Format is defined by get_sack_prefix: incoming128-17
                yield int(message['channel'].split(b"-")[-1])

    def finish_sack_processing(self, sack):
        # Delete the sack key which handles no data but is used to get a SET
        # notification in iter_on_sacks_to_process
        self._client.delete(self.get_sack_name(sack))