This file is indexed.

/usr/lib/python2.7/dist-packages/arcnagios/plugins/arcce_submit.py is in nordugrid-arc-nagios-plugins 1.9.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
import os, pipes, time
from arcnagios import arcutils, jobutils, nagutils

class Check_arcce_submit(jobutils.JobNagiosPlugin):
    def __init__(self):
	jobutils.JobNagiosPlugin.__init__(self)
	ap = self.argparser.add_argument_group('Options for Job Submission')
	ap.add_argument('-H', dest = 'host', required = True,
		help = 'The host name of the CE to test.  This will be used '
		       'to connect to the CE unless --ce is given.  '
		       'This option is required.')
	ap.add_argument('-p', dest = 'port', type = int,
		help = 'An optional port number at which to connect.')
	ap.add_argument('--prev-status', dest = 'prev_status', type = int,
		default = 0, metavar = '{0..3}',
		help = 'The previous Nagios status for this metric.')
	ap.add_argument('--stage-input', dest = 'staged_inputs',
		default = [], action = 'append',
		metavar = 'URL',
		help = 'DEPRECATED, please use --test with the staging plugin. '
		       'Stage the existing URL as an input and check for it '
		       'in the job script. '
		       'The local file name will be the basename of URL, or '
		       'you can specify an alternative name by prefixing '
		       'the URL with ALTNAME=.')
	ap.add_argument('--stage-output', dest = 'staged_outputs',
		default = [], action = 'append',
		metavar = 'URL',
		help = 'DEPRECATED, please use --test with the staging plugin. '
		       'Create a file in the job script and stage it as URL. '
		       'The local file name will be the basename of URL, or '
		       'you can specify an alternative name by prefixing '
		       'the URL with ALTNAME=.')
	ap.add_argument('--termination-service', dest = 'termination_service',
		default = '',
		help = 'The name (NAGIOS "description") of the passive '
		       'service to which to submit the results.')
	ap.add_argument('--progress-service', metavar = 'SVC',
		help = 'Publish state timeout alerts to SVC.')
	ap.add_argument('--submission-service', dest = 'submission_service',
		default = '',
		help = 'Report submission-related alerts to this service '
		       'instead of raising the alert on the active service.')
	ap.add_argument('--submission-service-threshold',
		default = 2, type = int,
		help = 'Minimum severity before the submission result is '
		       'submitted to the passive service specified by '
		       '--submission-service.  This is the numeric status, '
		       '0 for OK, 1 for WARNING, 2 for ERROR (default), '
		       'and 3 for UNKNOWN.')
	ap.add_argument('--job-submit-timeout', dest = 'job_submit_timeout',
		type = int, default = 600,
		help = 'Timeout for job submission.')
	ap.add_argument('--job-discard-timeout', dest = 'job_discard_timeout',
		type = int, default = 6*3600,
		help = 'Timeout before discarding a job.')
	ap.add_argument('--ce', dest = 'ce',
		help = 'URL for connecting to the CE, using the same format '
		       'as the -c option of arcsub(1).')
	ap.add_argument('--queue', dest = 'queue',
		help = 'Target queue name. If unspecified, let ARC choose it.')
	ap.add_argument('--job-tag', dest = 'job_tag',
		help = 'A short string suitable in directory names to '
		       'distinguish different submission services for the '
		       'same hostname.')
	ap.add_argument('--job-description', dest = 'job_description',
		help = 'Use this job description instead of generating one.  '
		       'In this case --stage-input options are ignored and '
		       'URLs passed to --stage-output will be deleted when '
		       'the job finishes.')
	ap.add_argument('--test', dest = 'tests', action='append', default=[],
		metavar = 'TESTNAME',
		help = 'Add an additional test described in the configuration '
		       'file under the section "arcce.TESTNAME"')
	ap.add_argument('--runtime-environment', dest = 'runtime_environments',
		action = 'append', default = [], metavar = 'RTE',
		help = 'Request the given runtime environment.')
	ap.add_argument('--wall-time-limit', dest = 'wall_time_limit',
		type = int, default = 600,
		help = 'Soft limit of execution wall-time.')
	ap.add_argument('--memory-limit', dest = 'memory_limit',
		type = int, default = 536870912,
		help = 'The max. about of memory used by the job in bytes. '
		       'Default: 536870912 (512 MiB)')
	ap.add_argument('--enable-gmlog', dest = 'enable_gmlog',
		action = 'store_true', default = False,
		help = 'Request debug information from the CE.  This will be '
		       'stored in a subdirectory log of the output directory.')

    def parse_args(self, args):
	jobutils.JobNagiosPlugin.parse_args(self, args)
	self.staged_inputs = \
		map(jobutils.parse_staging, self.opts.staged_inputs)
	self.staged_outputs = \
		map(jobutils.parse_staging, self.opts.staged_outputs)

    def _report(self, status, msg):
	if status < self.opts.submission_service_threshold \
		or not self.opts.submission_service:
	    report = self.nagios_report
	else:
	    self.nagios_report.update_status(nagutils.OK,
					     'Reporting to passive service.')
	    report = self.nagios_report_for(self.opts.host,
					    self.opts.submission_service)
	report.update_status(status, msg)

    def check(self):
	"""Submit a job to a CE."""

	self.require_voms_proxy()

	workdir = self.workdir_for(self.opts.host, self.opts.job_tag)

	jobid_file = os.path.join(workdir, self.JOBID_FILENAME)
	jobinfo = self.load_active_job(self.opts.host, self.opts.job_tag)
	if not jobinfo is None:
	    t_sub = jobinfo.submission_time
	    job_state = jobinfo.job_state

	    if not job_state.is_final():
		s_sub = time.strftime('%FT%T', time.localtime(t_sub))
		self.log.info('Last job was submitted %s.'%s_sub)
		t_dis = t_sub + self.opts.job_discard_timeout
		if int(time.time()) >= t_dis:
		    self.log.warning('Discarding last job due to timeout.')
		    self.discard_job(jobinfo)
		    self._report(nagutils.WARNING,
			    'Re-submitting due to timeout of %s from %s'
			    % (jobinfo.job_id, s_sub))
		else:
		    s_dis = time.strftime('%FT%T', time.localtime(t_dis))
		    self.log.info('Job will be discarded %s.'%s_dis)
		    status = self.opts.prev_status or 0
		    self.log.info('Keeping previous status %d.'%status)
		    self._report(status, 'Job not finished.')
		    return
	    else:
		self.log.debug('Job in terminal state %s.\n'%job_state)
		self._report(nagutils.OK,
			     'Waiting for monitoring service to fetch the job.')
		return

	# Prepare the working directory for a new job.
	job_output_dir = os.path.join(workdir, self.JOB_OUTPUT_DIRNAME)
	if not os.path.exists(job_output_dir):
	    try:
		os.makedirs(job_output_dir)
	    except OSError, e:
		msg = 'Failed to create working directory: %s'%e
		return nagutils.ServiceUNKNOWN(msg)

	self.log.debug('Submitting new job.')
	job_script_file = os.path.join(workdir, self.JOB_SCRIPT_FILENAME)

	# Create job script.
	fh = open(job_script_file, 'w')
	fh.write('#! /bin/sh\n\n'
		 'status=0\n'
		 'echo "Job started `date -Is`."\n')
	for filename, _0, _1 in self.staged_inputs:
	    fh.write('test -e %(fname)s || error "Missing file "%(fname)s\n'
		     % {'fname': pipes.quote(filename)})
	for filename, _0, _1 in self.staged_outputs:
	    fh.write('hostname >%s\n'%pipes.quote(filename))
	runtime_environments = set(self.opts.runtime_environments)
	fh.write('\n')
	for test_name in self.opts.tests:
	    test = self.load_jobtest(test_name, hostname = self.opts.host)
	    test.write_script(fh)

	    def adjust_staged(spec):
		if isinstance(spec, tuple):
		    filename, spec, urloptions = spec
		else:
		    if ';' in spec:
			xs = spec.split(';')
			spec, urloptions = xs[0], xs[1:]
		    else:
			urloptions = []
		    filename = os.path.basename(spec)
		if spec is None or ':/' in spec:
		    url = spec
		elif os.path.isabs(spec):
		    url = 'file:' + spec
		else:
		    url = 'file:' + os.path.join(workdir, spec)
		return filename, url, urloptions
	    for stagespec in test.staged_inputs():
		self.staged_inputs.append(adjust_staged(stagespec))
	    for stagespec in test.staged_outputs():
		self.staged_outputs.append(adjust_staged(stagespec))
	    runtime_environments.update(test.runtime_environments())
	fh.write('echo "Present files before termination:"\n'
		 'ls -l\n'
		 'echo "Job finished `date -Is`, status = $status."\n'
		 'exit $status\n')
	fh.close()

	# Create JSDL file.
	if self.opts.job_description:
	    jsdl_file = self.opts.job_description
	else:
	    jsdl_file = os.path.join(workdir, self.JSDL_FILENAME)
	    jobdesc = jobutils.JobDescription(
		    script_path = job_script_file,
		    application_name = 'ARCCE-probe',
		    logdir = self.opts.enable_gmlog and 'log',
		    job_name = self.opts.termination_service,
		    output = 'stdout.txt',
		    error = 'stderr.txt',
		    staged_inputs = self.staged_inputs,
		    staged_outputs = self.staged_outputs,
		    runtime_environments = runtime_environments,
		    wall_time_limit = self.opts.wall_time_limit,
		    memory_limit = self.opts.memory_limit,
		    queue_name = self.opts.queue)
	    self.write_jsdl(jsdl_file, jobdesc)

	# Submit the job.
	if self.opts.ce:
	    connection_url = self.opts.ce
	elif self.config.has_option('arcce.connection_urls', self.opts.host):
	    connection_url = self.config.get('arcce.connection_urls',
					     self.opts.host)
	# COMPAT 2011-11-22.
	elif self.config.has_option('arc-ce.connection_urls', self.opts.host):
	    self.log.warn('The section name arc-ce.connection_urls is '
			  'deprecated, please use arcce.connection_urls.')
	    connection_url = self.config.get('arc-ce.connection_urls',
					     self.opts.host)
	else:
	    if self.opts.port:
		connection_url = self.opts.host + ':' + str(self.opts.port)
	    else:
		connection_url = self.opts.host
	rc, output = \
	    self.run_arc_cmd('arcsub',
		'-c', connection_url,
		'-o', jobid_file,
		'-t', self.opts.job_submit_timeout,
		jsdl_file)
	try:
	    fh = open(jobid_file)
	    job_id = fh.readline().strip()
	    fh.close()
	except StandardError:
	    job_id = None
	if not job_id:
	    for ln in output.strip().split('\n'):
		self.log.error('arcsub: %s'%ln)
	    if rc != 0:
		self.cleanup_job_files(self.opts.host, self.opts.job_tag)
		self.log.error('arcsub %s.' % arcutils.explain_wait_status(rc))
		self._report(nagutils.CRITICAL, 'Job submission failed.')
		return
	    else:
		self.log.info('The job ID should have been saved to %s.'
			      % jobid_file)
		self._report(nagutils.CRITICAL, 'Failed to submit job.')
		return
	if rc == 0:
	    self._report(nagutils.OK, 'Job submitted.')
	else:
	    for ln in output.strip().split('\n'):
		self.log.error('arcsub: %s'%ln)
	    self.log.error('Received a JID despite the error, proceeding.')
	    self._report(nagutils.WARNING,
		    'Job seems to be submitted but arcsub %s.'
		    % arcutils.explain_wait_status(rc))

	t_now = int(time.time())
	jobinfo = jobutils.JobInfo(
		submission_time = t_now,
		host = self.opts.host,
		job_tag = self.opts.job_tag,
		termination_service = self.opts.termination_service,
		progress_service = self.opts.progress_service,
		job_id = job_id,
		job_state = arcutils.J_NOT_SEEN,
		job_state_time = int(time.time()),
		check_time = t_now,
		stored_urls = [url for _0, url, _1 in self.staged_outputs
				   if url],
		tests = self.opts.tests)
	self.save_active_job(jobinfo, self.opts.host, self.opts.job_tag)