/usr/lib/python2.7/dist-packages/arcnagios/ldaputils.py is in nordugrid-arc-nagios-plugins 1.9.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 | import ldap
try:
from ldap.dn import str2dn, dn2str
except ImportError:
def str_strip(s): return s.strip()
def str2dn(s):
def split_attr(s): return tuple(map(str_strip, s.split('=', 1)) + [1])
def split_rdn(s): return map(split_attr, s.split('+'))
return map(split_rdn, s.split(','))
def dn2str(dn):
def join_attr(tup): return '%s=%s'%(tup[0], tup[1])
def join_rdn(rdn): return '+'.join(map(join_attr, rdn))
return ','.join(map(join_rdn, dn))
class LDAPValidationError(Exception):
pass
def _ldap_bool(x):
x = x.upper()
if x == 'TRUE':
return True
elif x == 'FALSE':
return False
else:
raise ValueError('Invalid boolean %s.'%x)
_decoders = {
'1.3.6.1.4.1.1466.115.121.1.7': _ldap_bool, # TRUE | FALSE
'1.3.6.1.4.1.1466.115.121.1.27': int,
}
def decoder_for_syntax(syn):
return _decoders.get(syn, None)
# Cf http://bugzilla.nordugrid.org/show_bug.cgi?id=2693
_ignore_must = set([
'1.3.6.1.4.1.11604.2.1.8.0.1', # Mds-validfrom
'1.3.6.1.4.1.11604.2.1.8.0.2', # Mds-validto
'1.3.6.1.4.1.11604.2.1.8.1.4.1.0.1',# Mds-Vo-Op-name
'1.3.6.1.4.1.11604.2.1.8.2.7.1.0.2',# Mds-Service-protocol
])
# The boolean "in" method on Entry is incomplete or unimplemented in
# python-ldap-2.2.0.
def Entry_in(key, entry):
try:
entry[key]
return True
except KeyError:
return False
class LDAPObject(object):
"""This is a read-only representation of an LDAP entry with attribute
values decoded according to their schema."""
structural_objectclass = None
strip_attribute_prefixes = []
lowercase_all_attributes = False
def __init__(self, schema, dn, entry_dict):
if not self.structural_objectclass in entry_dict['objectClass']:
raise ValueError('The LDAP entry at %s is not of structural object '
'class %s.'%(dn, self.structural_objectclass))
self.dn = dn
entry = ldap.schema.models.Entry(schema, dn, entry_dict)
must, attrs = entry.attribute_types()
attrs.update(must)
for oid, at in attrs.iteritems():
if Entry_in(oid, entry):
decode = decoder_for_syntax(at.syntax)
if decode is None:
dvs = entry[oid]
else:
dvs = []
for av in entry[oid]:
try:
dv = decode(av)
except (TypeError, ValueError):
raise LDAPValidationError(
'%s has invalid value %s for attribute %s '
'of syntax %s.'
%(dn, av, at.names[0], at.syntax))
dvs.append(dv)
if at.single_value:
if len(dvs) == 1:
value = dvs[0]
else:
raise LDAPValidationError(
'%s has multiple values for single-valued '
'attribute %s.'%(dn, at.names[0]))
else:
value = dvs
else:
if at.single_value:
value = None
else:
value = []
name = at.names[0]
for prefix in self.strip_attribute_prefixes:
if name.startswith(prefix):
name = name[len(prefix):]
name = name.replace('-', '_')
if self.lowercase_all_attributes:
name = name.lower()
setattr(self, name, value)
missing = []
for oid, at in must.iteritems():
if not Entry_in(oid, entry) and not oid in _ignore_must:
missing.append(at.names[0])
if missing:
raise LDAPValidationError(
'%s lacks required attribute(s) %s.'
%(dn, ', '.join(missing)))
def canonical_dn(dn):
return dn2str(str2dn(dn))
def is_proper_subdn(subdn, dn):
subdn = canonical_dn(subdn)
dn = canonical_dn(dn)
return subdn != dn and subdn.endswith(dn)
def is_immediate_subdn(subdn, dn):
subdn_comps = str2dn(subdn)
dn_comps = str2dn(dn)
return len(subdn_comps) > 0 and subdn_comps[1:] == dn_comps
|