This file is indexed.

/usr/share/pyshared/mrjob/tools/emr/mrboss.py is in python-mrjob 0.3.3.2-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# Copyright 2009-2010 Yelp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Run a command on the master and all slaves. Store stdout and stderr for
results in OUTPUT_DIR.

Usage::

    python -m mrjob.tools.emr.mrboss JOB_FLOW_ID [options] "command string"

Options::

  -c CONF_PATH, --conf-path=CONF_PATH
  --ec2-key-pair-file=EC2_KEY_PAIR_FILE
                        Path to file containing SSH key for EMR
  -h, --help            show this help message and exit
  --no-conf             Don't load mrjob.conf even if it's available
  -o, --output-dir      Specify an output directory (default: JOB_FLOW_ID)
  -q, --quiet           Don't print anything to stderr
  -v, --verbose         print more messages to stderr
"""
from __future__ import with_statement

from optparse import OptionParser
import os
import shlex
import sys

from mrjob.emr import EMRJobRunner
from mrjob.job import MRJob
from mrjob.ssh import ssh_run_with_recursion
from mrjob.util import scrape_options_into_new_groups


def main():
    usage = 'usage: %prog JOB_FLOW_ID OUTPUT_DIR [options] "command string"'
    description = ('Run a command on the master and all slaves of an EMR job'
                   ' flow. Store stdout and stderr for results in OUTPUT_DIR.')

    option_parser = OptionParser(usage=usage, description=description)

    assignments = {
        option_parser: ('conf_path', 'quiet', 'verbose',
                        'ec2_key_pair_file')
    }

    option_parser.add_option('-o', '--output-dir', dest='output_dir',
                             default=None,
                             help="Specify an output directory (default:"
                             " JOB_FLOW_ID)")

    mr_job = MRJob()
    scrape_options_into_new_groups(mr_job.all_option_groups(), assignments)

    options, args = option_parser.parse_args()

    MRJob.set_up_logging(quiet=options.quiet, verbose=options.verbose)

    runner_kwargs = options.__dict__.copy()
    for unused_arg in ('output_dir', 'quiet', 'verbose'):
        del runner_kwargs[unused_arg]

    if len(args) < 2:
        option_parser.print_help()
        sys.exit(1)

    job_flow_id, cmd_string = args[:2]
    cmd_args = shlex.split(cmd_string)

    output_dir = os.path.abspath(options.output_dir or job_flow_id)

    with EMRJobRunner(emr_job_flow_id=job_flow_id, **runner_kwargs) as runner:
        runner._enable_slave_ssh_access()
        run_on_all_nodes(runner, output_dir, cmd_args)


def run_on_all_nodes(runner, output_dir, cmd_args, print_stderr=True):
    """Given an :py:class:`EMRJobRunner`, run the command specified by
    *cmd_args* on all nodes in the job flow and save the stdout and stderr of
    each run to subdirectories of *output_dir*.

    You should probably have run :py:meth:`_enable_slave_ssh_access()` on the
    runner before calling this function.
    """

    master_addr = runner._address_of_master()
    addresses = [master_addr]
    if runner._opts['num_ec2_instances'] > 1:
        addresses += ['%s!%s' % (master_addr, slave_addr)
                      for slave_addr in runner._addresses_of_slaves()]

    for addr in addresses:
        stdout, stderr = ssh_run_with_recursion(
            runner._opts['ssh_bin'],
            addr,
            runner._opts['ec2_key_pair_file'],
            runner._ssh_key_name,
            cmd_args,
        )

        if print_stderr:
            print '---'
            print 'Command completed on %s.' % addr
            print stderr,

        if '!' in addr:
            base_dir = os.path.join(output_dir, 'slave ' + addr.split('!')[1])
        else:
            base_dir = os.path.join(output_dir, 'master')

        if not os.path.exists(base_dir):
            os.makedirs(base_dir)

        with open(os.path.join(base_dir, 'stdout'), 'w') as f:
            f.write(stdout)

        with open(os.path.join(base_dir, 'stderr'), 'w') as f:
            f.write(stderr)


if __name__ == '__main__':
    main()