This file is indexed.

/usr/share/pyshared/mrjob/examples/mr_log_sampler.py is in python-mrjob 0.3.3.2-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# Copyright 2011 Yelp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
MapReduce job to sample n lines from a file. The mapper iterates over each
line and yields them to the reducer, combined with a random seed, so that
Hadoop will resort the lines. Then, the reducer yields the first n lines.
"""
__author__ = 'Benjamin Goldenberg <benjamin@yelp.com>'

import random
import sys

from mrjob.job import MRJob
from mrjob.protocol import RawValueProtocol, ReprProtocol

SAMPLING_FUDGE_FACTOR = 1.2


class MRLogSampler(MRJob):
    # We use RawValueProtocol for input to be format agnostic
    # and avoid any type of parsing errors
    INPUT_PROTOCOL = RawValueProtocol

    # We use RawValueProtocol for output so we can output raw lines
    # instead of (k, v) pairs
    OUTPUT_PROTOCOL = RawValueProtocol

    # Encode the intermediate records using repr() instead of JSON, so the
    # record doesn't get Unicode-encoded
    INTERNAL_PROTOCOL = ReprProtocol

    def configure_options(self):
        super(MRLogSampler, self).configure_options()
        self.add_passthrough_option(
            '--sample-size',
            type=int,
            help='Number of entries to sample.'
        )
        self.add_passthrough_option(
            '--expected-length',
            type=int,
            help=("Number of entries you expect in the log. If not specified,"
                  " we'll pass every line to the reducer.")
        )

    def load_options(self, args):
        super(MRLogSampler, self).load_options(args)

        if self.options.sample_size is None:
            self.option_parser.error('You must specify the --sample-size')
        else:
            self.sample_size = self.options.sample_size

        # If we have an expected length, we can estimate the sampling
        # probability for the mapper, so that the reducer doesn't have to
        # process all records. Otherwise, pass everything thru to the reducer.
        if self.options.expected_length is None:
            self.sampling_probability = 1.
        else:
            # We should be able to bound this probability by using the binomial
            # distribution, but I haven't figured it out yet. So, let's just
            # fudge it.
            self.sampling_probability = (float(self.sample_size) *
                                         SAMPLING_FUDGE_FACTOR /
                                         self.options.expected_length)

    def mapper(self, _, line):
        """
        For each log line, with probability self.sampling_probability,
        yield a None key, and (random seed, line) as the value, so that
        the values get sorted randomly and fed into a single reducer.

        Args:
            line - raw log line

        Yields:
            key - None
            value - (random seed, line)
        """
        if random.random() < self.sampling_probability:
            seed = '%20i' % random.randint(0, sys.maxint)
            yield None, (seed, line)

    def reducer(self, _, values):
        """
        Now that the values have a random number attached,
        they'll come in in random order, so we yield the
        first n lines, and return early.

        Args:
            values - generator of (random_seed, line) pairs

        Yields:
            key - None
            value - random sample of log lines
        """
        for line_num, (seed, line) in enumerate(values):
            yield None, line

            # enumerate() is 0-indexed, so add 1
            if line_num + 1 >= self.sample_size:
                break


if __name__ == '__main__':
    MRLogSampler.run()