/usr/lib/python2.7/dist-packages/arcnagios/substitution.py is in nordugrid-arc-nagios-plugins 1.9.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | from arcnagios.nagutils import ServiceUNKNOWN
from arcnagios.utils import unspecified
import ldap, os, random, re, time
def get_interp(config, section, var, target_env, default = unspecified):
if config.has_option(section, var):
import_interpolated_variables(config, section, var, target_env)
return config.get(section, var, vars = target_env)
elif default == unspecified:
raise ServiceUNKNOWN('Configuration error: Missing %s in [%s].'
% (var, section))
else:
return default
def get_interp_opt(config, section, var, target_env):
return get_interp(config, section, var, target_env, default = None)
def _subst_option(config, section, var, target_env):
default = get_interp_opt(config, section, 'default', target_env)
if default is None:
raise ServiceUNKNOWN('Missing required option (-O %s=...).'%var)
target_env[var] = default
def _subst_getenv(config, section, var, target_env):
default = get_interp_opt(config, section, 'default', target_env)
if config.has_option(section, 'envvar'):
envvar = get_interp_opt(config, section, 'envvar', target_env)
else:
prefix = get_interp(config, section, 'prefix', target_env, '')
envvar = prefix + var
v = os.getenv(envvar, default)
if v is None:
raise ServiceUNKNOWN('Missing required option (-O) or '
'enviroment variable %s.' % envvar)
target_env[var] = v
def _subst_ldap(config, section, var, target_env):
basedn = get_interp(config, section, 'basedn', target_env)
filterstr = get_interp(config, section, 'filter', target_env)
attribute = get_interp(config, section, 'attribute', target_env)
attrlist = map(str.strip, attribute.split(','))
scope = ldap.SCOPE_SUBTREE
sr = None
for uri in get_interp(config, section, 'uri', target_env).split():
lc = ldap.initialize(uri)
try:
sr = lc.search_s(basedn, scope, filterstr, attrlist = attrlist)
break
except ldap.LDAPError, xc:
pass
if sr is None:
raise ServiceUNKNOWN('LDAP query %s failed with: %s' % (section, xc))
for dn, ent in sr:
for attr in attrlist:
if attr in ent:
target_env[var] = ent[attr][0]
return
if config.has_option(section, 'default'):
target_env[var] = get_interp(config, section, 'default', target_env)
else:
raise ServiceUNKNOWN('LDAP query %s did not provide a value for %s.'
% (filterstr, section))
def _subst_pipe(config, section, var, target_env):
fh = os.popen(get_interp(config, section, 'command', target_env))
target_env[var] = fh.read().strip()
fh.close()
def _subst_random_line(config, section, var, target_env):
path = get_interp(config, section, 'input_file', target_env)
try:
fh = open(path)
except IOError, xc:
raise ServiceUNKNOWN(str(xc))
rng = random.Random(time.time())
include = None
exclude = set()
if config.has_option(section, 'exclude'):
exclude = set(get_interp(config, section, 'exclude',target_env).split())
if config.has_option(section, 'include'):
include = set(get_interp(config, section, 'include',target_env).split())
include.difference_update(exclude)
n = 0
chosen_line = None
while True:
try:
line = fh.next().strip()
if not line or line.startswith('#') \
or not include is None and not line in include \
or include is None and line in exclude:
continue
if rng.randint(0, n) == 0:
chosen_line = line
except StopIteration:
break
n += 1
fh.close()
if chosen_line is None:
raise ServiceUNKNOWN('%s must contain at least one non-excluded line'
% path)
target_env[var] = chosen_line
def _subst_strftime(config, section, var, target_env):
if config.has_option(section, 'raw_format'):
format = config.get(section, 'raw_format', vars = target_env, raw=True)
else:
format = get_interp(config, section, 'format', target_env)
target_env[var] = time.strftime(format)
def _subst_switch(config, section, var, target_env):
case = 'case[%s]'%get_interp(config, section, 'index', target_env)
if config.has_option(section, case):
import_interpolated_variables(config, section, case, target_env)
target_env[var] = get_interp(config, section, case, target_env)
else:
if not config.has_option(section, 'default'):
raise ServiceUNKNOWN(
'No %s and no default in section variable.%s.'%(case, var))
import_interpolated_variables(config, section, 'default', target_env)
target_env[var] = get_interp(config, section, 'default', target_env)
_method_by_name = {
'getenv': _subst_getenv,
'ldap': _subst_ldap,
'option': _subst_option,
'pipe': _subst_pipe,
'random_line': _subst_random_line,
'strftime': _subst_strftime,
'switch': _subst_switch,
}
def register_substitution_method(name, f):
_method_by_name[name] = f
def import_section_variables(config, section, target_env):
"""Import all variables listed in ``variables`` in ``section``."""
if config.has_option(section, 'variables'):
vars = config.get(section, 'variables').split()
for var in vars:
if var in target_env:
if target_env[var] == None:
raise ServiceUNKNOWN('Substitutions involving %s are '
'cyclic.'%var)
continue
target_env[var] = None
import_variable(config, var, target_env)
_interp_re = re.compile(r'%\(([a-zA-Z0-9_]+)\)')
def import_interpolated_variables(config, section, var, target_env):
"""Import variables needed for expanding ``var`` in ``section``."""
raw_value = config.get(section, var, raw = True)
for mo in re.finditer(_interp_re, raw_value):
v = mo.group(1)
if not v in target_env:
import_variable(config, v, target_env)
def import_variable(config, var, target_env):
"""Import ``var`` by executing its defining section, populating
``target_env`` with its value and the values of any dependent
variables."""
section = 'variable.' + var
import_section_variables(config, section, target_env)
method = config.get(section, 'method')
try:
return _method_by_name[method](config, section, var, target_env)
except KeyError:
raise ServiceUNKNOWN('Unknown substitution method %s.'%method)
|