/usr/lib/python2.7/dist-packages/cinder/policy.py is in python-cinder 2:12.0.0-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 | # Copyright (c) 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Policy Engine For Cinder"""
import sys
from oslo_config import cfg
from oslo_log import log as logging
from oslo_policy import opts as policy_opts
from oslo_policy import policy
from oslo_utils import excutils
from cinder import exception
from cinder import policies
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
policy_opts.set_defaults(cfg.CONF, 'policy.json')
_ENFORCER = None
def reset():
global _ENFORCER
if _ENFORCER:
_ENFORCER.clear()
_ENFORCER = None
def init(use_conf=True):
"""Init an Enforcer class.
:param use_conf: Whether to load rules from config file.
"""
global _ENFORCER
if not _ENFORCER:
_ENFORCER = policy.Enforcer(
CONF,
use_conf=use_conf)
register_rules(_ENFORCER)
_ENFORCER.load_rules()
def enforce(context, action, target):
"""Verifies that the action is valid on the target in this context.
:param context: cinder context
:param action: string representing the action to be checked
this should be colon separated for clarity.
i.e. ``compute:create_instance``,
``compute:attach_volume``,
``volume:attach_volume``
:param target: dictionary representing the object of the action for object
creation this should be a dictionary representing the
location of the object e.g.
``{'project_id': context.project_id}``
:raises PolicyNotAuthorized: if verification fails.
"""
init()
return _ENFORCER.enforce(action,
target,
context.to_policy_values(),
do_raise=True,
exc=exception.PolicyNotAuthorized,
action=action)
def set_rules(rules, overwrite=True, use_conf=False):
"""Set rules based on the provided dict of rules.
:param rules: New rules to use. It should be an instance of dict.
:param overwrite: Whether to overwrite current rules or update them
with the new rules.
:param use_conf: Whether to reload rules from config file.
"""
init(use_conf=False)
_ENFORCER.set_rules(rules, overwrite, use_conf)
def get_rules():
if _ENFORCER:
return _ENFORCER.rules
def register_rules(enforcer):
enforcer.register_defaults(policies.list_rules())
def get_enforcer():
# This method is for use by oslopolicy CLI scripts. Those scripts need the
# 'output-file' and 'namespace' options, but having those in sys.argv means
# loading the Cinder config options will fail as those are not expected to
# be present. So we pass in an arg list with those stripped out.
conf_args = []
# Start at 1 because cfg.CONF expects the equivalent of sys.argv[1:]
i = 1
while i < len(sys.argv):
if sys.argv[i].strip('-') in ['namespace', 'output-file']:
i += 2
continue
conf_args.append(sys.argv[i])
i += 1
cfg.CONF(conf_args, project='cinder')
init()
return _ENFORCER
def authorize(context, action, target, do_raise=True, exc=None):
"""Verifies that the action is valid on the target in this context.
:param context: cinder context
:param action: string representing the action to be checked
this should be colon separated for clarity.
i.e. ``compute:create_instance``,
``compute:attach_volume``,
``volume:attach_volume``
:param target: dictionary representing the object of the action for object
creation this should be a dictionary representing the
location of the object e.g.
``{'project_id': context.project_id}``
:param do_raise: if True (the default), raises PolicyNotAuthorized;
if False, returns False
:param exc: Class of the exception to raise if the check fails.
Any remaining arguments passed to :meth:`authorize` (both
positional and keyword arguments) will be passed to
the exception class. If not specified,
:class:`PolicyNotAuthorized` will be used.
:raises cinder.exception.PolicyNotAuthorized: if verification fails
and do_raise is True. Or if 'exc' is specified it will raise an
exception of that type.
:return: returns a non-False value (not necessarily "True") if
authorized, and the exact value False if not authorized and
do_raise is False.
"""
init()
credentials = context.to_policy_values()
if not exc:
exc = exception.PolicyNotAuthorized
try:
result = _ENFORCER.authorize(action, target, credentials,
do_raise=do_raise, exc=exc, action=action)
except policy.PolicyNotRegistered:
with excutils.save_and_reraise_exception():
LOG.exception('Policy not registered')
except Exception:
with excutils.save_and_reraise_exception():
LOG.error('Policy check for %(action)s failed with credentials '
'%(credentials)s',
{'action': action, 'credentials': credentials})
return result
def check_is_admin(context):
"""Whether or not user is admin according to policy setting."""
init()
# the target is user-self
credentials = context.to_policy_values()
target = credentials
return _ENFORCER.authorize('context_is_admin', target, credentials)
|