/usr/lib/python2.7/dist-packages/arcnagios/ldapnagutils.py is in nordugrid-arc-nagios-plugins 1.9.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 | import ldap, logging, time
from arcnagios.confargparse import UsageError
from arcnagios import nagutils, vomsutils
class LDAPNagiosPlugin(nagutils.NagiosPlugin, vomsutils.NagiosPluginVomsMixin):
def __init__(self, default_uri = None, default_basedn = None, **kwargs):
nagutils.NagiosPlugin.__init__(self, **kwargs)
ap = self.argparser.add_argument_group('LDAP Options')
ap.add_argument('-H', dest = 'host',
help = 'Host to query. This will be used for the LDAP '
'connection if --ldap-uri is not specified.')
if not 'use_port' in kwargs: # compat
ap.add_argument('-p', dest = 'port', type = int, default = 2135,
help = 'The LDAP port to use if --ldap-uri was not given. '
'The default is 2135.')
ap.add_argument('--ldap-basedn', dest = 'ldap_basedn',
default = default_basedn,
help = 'Base DN to query if non-standard.')
ap.add_argument('--ldap-uri', dest = 'ldap_uri',
default = default_uri,
help = 'LDAP URI of the infosystem to query.')
ap.add_argument('-t', '--timeout', dest = 'timeout',
type = int, default = 300,
help = 'Overall timeout of the probe.')
def parse_args(self, args):
nagutils.NagiosPlugin.parse_args(self, args)
if not self.opts.ldap_uri:
if not self.opts.host:
raise UsageError('Either --ldap-uri or -H must be specified.')
self.opts.ldap_uri = 'ldap://%s:%d'%(self.opts.host, self.opts.port)
@property
def time_left(self):
return self.time_limit - time.time()
def prepare_check(self):
self.log.debug('Using LDAP URI %s.'%self.opts.ldap_uri)
self.lconn = ldap.initialize(self.opts.ldap_uri)
self.time_limit = time.time() + self.opts.timeout
def search_s(self, basedn, scope, *args, **kwargs):
"""Customized LDAP search with timeout and Nagios error reporting."""
self.log.debug('Searching %s.'%basedn)
if self.time_left <= 0:
raise nagutils.ServiceCRITICAL('Timeout before LDAP search.')
try:
return self.lconn.search_st(basedn, scope, timeout = self.time_left,
*args, **kwargs)
except ldap.TIMEOUT:
raise nagutils.ServiceCRITICAL('Timeout during LDAP search.')
except ldap.NO_SUCH_OBJECT:
return []
except ldap.LDAPError, xc:
self.log.error('LDAP details: basedn = %s, scope = %d',
basedn, scope)
self.log.error('LDAP error: %s'%xc)
raise nagutils.ServiceCRITICAL('LDAP Search failed.')
def fetch_subschema(self):
"""Fetch the subschema for the given connection and return the
corresponding `ldap.schema.SubSchema` object."""
# TODO: Consider caching the subschema for a period of time, since it
# can be big.
self.log.debug('Fetching subschema.')
sr = self.search_s('cn=subschema', ldap.SCOPE_BASE,
attrlist = ['+', '*'])
if len(sr) != 1:
raise nagutils.ServiceCRITICAL('Could not fetch subschema.')
subschema_subentry = sr[0][1]
return ldap.schema.SubSchema(subschema_subentry)
def debug_dump_obj(self, obj, name):
# Dump vars(obj) if debugging is enabled.
if self.log.getEffectiveLevel() >= logging.DEBUG:
self.log.debug('Dump of %s:'%name)
for k, v in vars(obj).iteritems():
if isinstance(v, list) and len(v) > 4:
v = v[0:4] + ['...']
self.log.debug(' %s: %r'%(k, v))
|