This file is indexed.

/usr/lib/python2.7/dist-packages/gnocchiclient/v1/archive_policy_cli.py is in python-gnocchiclient 7.0.1-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

from cliff import command
from cliff import lister
from cliff import show

from gnocchiclient import utils


class CliArchivePolicyList(lister.Lister):
    """List archive policies"""

    COLS = ('name',
            'back_window', 'definition', 'aggregation_methods')

    def take_action(self, parsed_args):
        policies = utils.get_client(self).archive_policy.list()
        if parsed_args.formatter == 'table':
            for ap in policies:
                utils.format_archive_policy(ap)
        return utils.list2cols(self.COLS, policies)


class CliArchivePolicyShow(show.ShowOne):
    """Show an archive policy"""

    def get_parser(self, prog_name):
        parser = super(CliArchivePolicyShow, self).get_parser(prog_name)
        parser.add_argument("name",
                            help="Name of the archive policy")
        return parser

    def take_action(self, parsed_args):
        ap = utils.get_client(self).archive_policy.get(
            name=parsed_args.name)
        if parsed_args.formatter == 'table':
            utils.format_archive_policy(ap)
        return self.dict2columns(ap)


def archive_policy_definition(string):
    parts = string.split(",")
    defs = {}
    for part in parts:
        attr, __, value = part.partition(":")
        if (attr not in ['granularity', 'points', 'timespan']
                or value is None):
            raise ValueError
        defs[attr] = value
    if len(defs) < 2:
        raise ValueError
    return defs


class CliArchivePolicyWriteBase(show.ShowOne):
    def get_parser(self, prog_name):
        parser = super(CliArchivePolicyWriteBase, self).get_parser(prog_name)
        parser.add_argument("name", help="name of the archive policy")
        parser.add_argument("-d", "--definition", action='append',
                            required=True, type=archive_policy_definition,
                            metavar="<DEFINITION>",
                            help=("two attributes (separated by ',') of an "
                                  "archive policy definition with its name "
                                  "and value separated with a ':'"))
        return parser


class CliArchivePolicyCreate(CliArchivePolicyWriteBase):
    """Create an archive policy"""

    def get_parser(self, prog_name):
        parser = super(CliArchivePolicyCreate, self).get_parser(prog_name)
        parser.add_argument("-b", "--back-window", dest="back_window",
                            type=int,
                            help="back window of the archive policy")
        parser.add_argument("-m", "--aggregation-method",
                            action="append",
                            dest="aggregation_methods",
                            help="aggregation method of the archive policy")
        return parser

    def take_action(self, parsed_args):
        archive_policy = utils.dict_from_parsed_args(
            parsed_args, ['name', 'back_window', 'aggregation_methods',
                          'definition'])
        ap = utils.get_client(self).archive_policy.create(
            archive_policy=archive_policy)
        if parsed_args.formatter == 'table':
            utils.format_archive_policy(ap)
        return self.dict2columns(ap)


class CliArchivePolicyUpdate(CliArchivePolicyWriteBase):
    """Update an archive policy"""

    def take_action(self, parsed_args):
        archive_policy = utils.dict_from_parsed_args(
            parsed_args, ['definition'])
        ap = self.app.client.archive_policy.update(
            name=parsed_args.name, archive_policy=archive_policy)
        if parsed_args.formatter == 'table':
            utils.format_archive_policy(ap)
        return self.dict2columns(ap)


class CliArchivePolicyDelete(command.Command):
    """Delete an archive policy"""

    def get_parser(self, prog_name):
        parser = super(CliArchivePolicyDelete, self).get_parser(prog_name)
        parser.add_argument("name",
                            help="Name of the archive policy")
        return parser

    def take_action(self, parsed_args):
        utils.get_client(self).archive_policy.delete(name=parsed_args.name)