This file is indexed.

/usr/lib/python2.7/dist-packages/arcnagios/jobplugin.py is in nordugrid-arc-nagios-plugins 1.9.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
import re
from utils import ident, unspecified
from arcnagios import substitution
from arcnagios.nagutils import ServiceUNKNOWN

def boolean(s):
    w = s.lower()
    if w in ['0', 'false', 'no', 'off']:
	return False
    if w in ['1', 'true', 'yes', 'on']:
	return True
    raise ValueError('invalid literal for boolean: %r'%s)

_interp_re = re.compile(r'%\(([a-zA-Z0-9_]+)\)')

class JobPlugin(object):
    """A base-class for tests to run within a job script.  Implementations
    provide commands to run, and how to extract the result.  Optionally it may
    specify staging and cleanup."""

    def __init__(self, name, config, config_section, log, env = {}):
	self.name = name
	self.config = config
	self.config_section = config_section
	self.test_name = config_section[6:] # Strip "arcce."
	self.log = log
	self.environment = env

    def _import_interpolations(self, var):
	if not var in self.environment \
		and self.config.has_option(self.config_section, var):
	    raw_value = self.config.get(self.config_section, var, raw = True)
	    for mo in re.finditer(_interp_re, raw_value):
		v = mo.group(1)
		if not v in self.environment \
			and self.config.has_section('variable.' + v):
		    substitution.import_variable(self.config, v,
						 self.environment)

    def _update_vars(self, kwargs):
	if 'vars' in kwargs:
	    kwargs['vars'].update(self.environment)
	else:
	    kwargs['vars'] = self.environment

    def hasconf(self, var):
	return self.config.has_option(self.config_section, var)

    def getconf(self, var, default = unspecified, type = ident, **kwargs):
	if not default is unspecified and not self.hasconf(var):
	    return default
	self._import_interpolations(var)
	self._update_vars(kwargs)
	try:
	    return type(self.config.get(self.config_section, var, **kwargs))
	except ValueError, xc:
	    raise ServiceUNKNOWN('Bad value for configuration parameter %s in '
				 'section %s: %s'
				 % (var, self.config_section, xc))

    def getconf_int(self, var, default = unspecified, **kwargs):
	self.getconf(var, default = default, type = int, **kwargs)

    def getconf_bool(self, var, default = unspecified, **kwargs):
	self.getconf(var, default = default, type = boolean, **kwargs)

    def getconf_float(self, var, default = unspecified, **kwargs):
	self.getconf(var, default = default, type = float, **kwargs)

    def getconf_strlist(self, var, default = unspecified, sep = None, **kwargs):
	if not default is unspecified and not self.hasconf(var):
	    return default
	self._import_interpolations(var)
	self._update_vars(kwargs)
	raw = self.config.get(self.config_section, var, **kwargs)
	return [s.strip() for s in raw.split(sep)]

    @property
    def service_description(self):
	return self.getconf('service_description', None)

    def staged_inputs(self):
	"""Override this method to specify files used by the script."""
	return []

    def staged_outputs(self):
	"""Override this method to specify files produced by the script, which
	are needed by `extract_result`."""
	return []

    def runtime_environments(self):
	return filter(None,
		      [x.strip() for x in
			self.getconf('runtime_environments', '').split(',')])

    def write_script(self, fh):
	"""This method may write commands to run in the job script.  The
	commands are run in the standard shell (/bin/sh), and must not call
	exit or otherwise disrupt the control flow of the script, since other
	commands are run in the same script."""
	pass

    def check(self, report, jobdir, stored_urls):
	"""This method is run to check the output of a job."""
	raise NotImplementedError('extract_result')

    def cleanup(self, job_state):
	pass