/usr/lib/python2.7/dist-packages/arcnagios/plugins/arcce_submit.py is in nordugrid-arc-nagios-plugins 1.9.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 | import os, pipes, time
from arcnagios import arcutils, jobutils, nagutils
class Check_arcce_submit(jobutils.JobNagiosPlugin):
def __init__(self):
jobutils.JobNagiosPlugin.__init__(self)
ap = self.argparser.add_argument_group('Options for Job Submission')
ap.add_argument('-H', dest = 'host', required = True,
help = 'The host name of the CE to test. This will be used '
'to connect to the CE unless --ce is given. '
'This option is required.')
ap.add_argument('-p', dest = 'port', type = int,
help = 'An optional port number at which to connect.')
ap.add_argument('--prev-status', dest = 'prev_status', type = int,
default = 0, metavar = '{0..3}',
help = 'The previous Nagios status for this metric.')
ap.add_argument('--stage-input', dest = 'staged_inputs',
default = [], action = 'append',
metavar = 'URL',
help = 'DEPRECATED, please use --test with the staging plugin. '
'Stage the existing URL as an input and check for it '
'in the job script. '
'The local file name will be the basename of URL, or '
'you can specify an alternative name by prefixing '
'the URL with ALTNAME=.')
ap.add_argument('--stage-output', dest = 'staged_outputs',
default = [], action = 'append',
metavar = 'URL',
help = 'DEPRECATED, please use --test with the staging plugin. '
'Create a file in the job script and stage it as URL. '
'The local file name will be the basename of URL, or '
'you can specify an alternative name by prefixing '
'the URL with ALTNAME=.')
ap.add_argument('--termination-service', dest = 'termination_service',
default = '',
help = 'The name (NAGIOS "description") of the passive '
'service to which to submit the results.')
ap.add_argument('--progress-service', metavar = 'SVC',
help = 'Publish state timeout alerts to SVC.')
ap.add_argument('--submission-service', dest = 'submission_service',
default = '',
help = 'Report submission-related alerts to this service '
'instead of raising the alert on the active service.')
ap.add_argument('--submission-service-threshold',
default = 2, type = int,
help = 'Minimum severity before the submission result is '
'submitted to the passive service specified by '
'--submission-service. This is the numeric status, '
'0 for OK, 1 for WARNING, 2 for ERROR (default), '
'and 3 for UNKNOWN.')
ap.add_argument('--job-submit-timeout', dest = 'job_submit_timeout',
type = int, default = 600,
help = 'Timeout for job submission.')
ap.add_argument('--job-discard-timeout', dest = 'job_discard_timeout',
type = int, default = 6*3600,
help = 'Timeout before discarding a job.')
ap.add_argument('--ce', dest = 'ce',
help = 'URL for connecting to the CE, using the same format '
'as the -c option of arcsub(1).')
ap.add_argument('--queue', dest = 'queue',
help = 'Target queue name. If unspecified, let ARC choose it.')
ap.add_argument('--job-tag', dest = 'job_tag',
help = 'A short string suitable in directory names to '
'distinguish different submission services for the '
'same hostname.')
ap.add_argument('--job-description', dest = 'job_description',
help = 'Use this job description instead of generating one. '
'In this case --stage-input options are ignored and '
'URLs passed to --stage-output will be deleted when '
'the job finishes.')
ap.add_argument('--test', dest = 'tests', action='append', default=[],
metavar = 'TESTNAME',
help = 'Add an additional test described in the configuration '
'file under the section "arcce.TESTNAME"')
ap.add_argument('--runtime-environment', dest = 'runtime_environments',
action = 'append', default = [], metavar = 'RTE',
help = 'Request the given runtime environment.')
ap.add_argument('--wall-time-limit', dest = 'wall_time_limit',
type = int, default = 600,
help = 'Soft limit of execution wall-time.')
ap.add_argument('--memory-limit', dest = 'memory_limit',
type = int, default = 536870912,
help = 'The max. about of memory used by the job in bytes. '
'Default: 536870912 (512 MiB)')
ap.add_argument('--enable-gmlog', dest = 'enable_gmlog',
action = 'store_true', default = False,
help = 'Request debug information from the CE. This will be '
'stored in a subdirectory log of the output directory.')
def parse_args(self, args):
jobutils.JobNagiosPlugin.parse_args(self, args)
self.staged_inputs = \
map(jobutils.parse_staging, self.opts.staged_inputs)
self.staged_outputs = \
map(jobutils.parse_staging, self.opts.staged_outputs)
def _report(self, status, msg):
if status < self.opts.submission_service_threshold \
or not self.opts.submission_service:
report = self.nagios_report
else:
self.nagios_report.update_status(nagutils.OK,
'Reporting to passive service.')
report = self.nagios_report_for(self.opts.host,
self.opts.submission_service)
report.update_status(status, msg)
def check(self):
"""Submit a job to a CE."""
self.require_voms_proxy()
workdir = self.workdir_for(self.opts.host, self.opts.job_tag)
jobid_file = os.path.join(workdir, self.JOBID_FILENAME)
jobinfo = self.load_active_job(self.opts.host, self.opts.job_tag)
if not jobinfo is None:
t_sub = jobinfo.submission_time
job_state = jobinfo.job_state
if not job_state.is_final():
s_sub = time.strftime('%FT%T', time.localtime(t_sub))
self.log.info('Last job was submitted %s.'%s_sub)
t_dis = t_sub + self.opts.job_discard_timeout
if int(time.time()) >= t_dis:
self.log.warning('Discarding last job due to timeout.')
self.discard_job(jobinfo)
self._report(nagutils.WARNING,
'Re-submitting due to timeout of %s from %s'
% (jobinfo.job_id, s_sub))
else:
s_dis = time.strftime('%FT%T', time.localtime(t_dis))
self.log.info('Job will be discarded %s.'%s_dis)
status = self.opts.prev_status or 0
self.log.info('Keeping previous status %d.'%status)
self._report(status, 'Job not finished.')
return
else:
self.log.debug('Job in terminal state %s.\n'%job_state)
self._report(nagutils.OK,
'Waiting for monitoring service to fetch the job.')
return
# Prepare the working directory for a new job.
job_output_dir = os.path.join(workdir, self.JOB_OUTPUT_DIRNAME)
if not os.path.exists(job_output_dir):
try:
os.makedirs(job_output_dir)
except OSError, e:
msg = 'Failed to create working directory: %s'%e
return nagutils.ServiceUNKNOWN(msg)
self.log.debug('Submitting new job.')
job_script_file = os.path.join(workdir, self.JOB_SCRIPT_FILENAME)
# Create job script.
fh = open(job_script_file, 'w')
fh.write('#! /bin/sh\n\n'
'status=0\n'
'echo "Job started `date -Is`."\n')
for filename, _0, _1 in self.staged_inputs:
fh.write('test -e %(fname)s || error "Missing file "%(fname)s\n'
% {'fname': pipes.quote(filename)})
for filename, _0, _1 in self.staged_outputs:
fh.write('hostname >%s\n'%pipes.quote(filename))
runtime_environments = set(self.opts.runtime_environments)
fh.write('\n')
for test_name in self.opts.tests:
test = self.load_jobtest(test_name, hostname = self.opts.host)
test.write_script(fh)
def adjust_staged(spec):
if isinstance(spec, tuple):
filename, spec, urloptions = spec
else:
if ';' in spec:
xs = spec.split(';')
spec, urloptions = xs[0], xs[1:]
else:
urloptions = []
filename = os.path.basename(spec)
if spec is None or ':/' in spec:
url = spec
elif os.path.isabs(spec):
url = 'file:' + spec
else:
url = 'file:' + os.path.join(workdir, spec)
return filename, url, urloptions
for stagespec in test.staged_inputs():
self.staged_inputs.append(adjust_staged(stagespec))
for stagespec in test.staged_outputs():
self.staged_outputs.append(adjust_staged(stagespec))
runtime_environments.update(test.runtime_environments())
fh.write('echo "Present files before termination:"\n'
'ls -l\n'
'echo "Job finished `date -Is`, status = $status."\n'
'exit $status\n')
fh.close()
# Create JSDL file.
if self.opts.job_description:
jsdl_file = self.opts.job_description
else:
jsdl_file = os.path.join(workdir, self.JSDL_FILENAME)
jobdesc = jobutils.JobDescription(
script_path = job_script_file,
application_name = 'ARCCE-probe',
logdir = self.opts.enable_gmlog and 'log',
job_name = self.opts.termination_service,
output = 'stdout.txt',
error = 'stderr.txt',
staged_inputs = self.staged_inputs,
staged_outputs = self.staged_outputs,
runtime_environments = runtime_environments,
wall_time_limit = self.opts.wall_time_limit,
memory_limit = self.opts.memory_limit,
queue_name = self.opts.queue)
self.write_jsdl(jsdl_file, jobdesc)
# Submit the job.
if self.opts.ce:
connection_url = self.opts.ce
elif self.config.has_option('arcce.connection_urls', self.opts.host):
connection_url = self.config.get('arcce.connection_urls',
self.opts.host)
# COMPAT 2011-11-22.
elif self.config.has_option('arc-ce.connection_urls', self.opts.host):
self.log.warn('The section name arc-ce.connection_urls is '
'deprecated, please use arcce.connection_urls.')
connection_url = self.config.get('arc-ce.connection_urls',
self.opts.host)
else:
if self.opts.port:
connection_url = self.opts.host + ':' + str(self.opts.port)
else:
connection_url = self.opts.host
rc, output = \
self.run_arc_cmd('arcsub',
'-c', connection_url,
'-o', jobid_file,
'-t', self.opts.job_submit_timeout,
jsdl_file)
try:
fh = open(jobid_file)
job_id = fh.readline().strip()
fh.close()
except StandardError:
job_id = None
if not job_id:
for ln in output.strip().split('\n'):
self.log.error('arcsub: %s'%ln)
if rc != 0:
self.cleanup_job_files(self.opts.host, self.opts.job_tag)
self.log.error('arcsub %s.' % arcutils.explain_wait_status(rc))
self._report(nagutils.CRITICAL, 'Job submission failed.')
return
else:
self.log.info('The job ID should have been saved to %s.'
% jobid_file)
self._report(nagutils.CRITICAL, 'Failed to submit job.')
return
if rc == 0:
self._report(nagutils.OK, 'Job submitted.')
else:
for ln in output.strip().split('\n'):
self.log.error('arcsub: %s'%ln)
self.log.error('Received a JID despite the error, proceeding.')
self._report(nagutils.WARNING,
'Job seems to be submitted but arcsub %s.'
% arcutils.explain_wait_status(rc))
t_now = int(time.time())
jobinfo = jobutils.JobInfo(
submission_time = t_now,
host = self.opts.host,
job_tag = self.opts.job_tag,
termination_service = self.opts.termination_service,
progress_service = self.opts.progress_service,
job_id = job_id,
job_state = arcutils.J_NOT_SEEN,
job_state_time = int(time.time()),
check_time = t_now,
stored_urls = [url for _0, url, _1 in self.staged_outputs
if url],
tests = self.opts.tests)
self.save_active_job(jobinfo, self.opts.host, self.opts.job_tag)
|