/usr/share/pyshared/mrjob/examples/mr_log_sampler.py is in python-mrjob 0.3.3.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 | # Copyright 2011 Yelp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
MapReduce job to sample n lines from a file. The mapper iterates over each
line and yields them to the reducer, combined with a random seed, so that
Hadoop will resort the lines. Then, the reducer yields the first n lines.
"""
__author__ = 'Benjamin Goldenberg <benjamin@yelp.com>'
import random
import sys
from mrjob.job import MRJob
from mrjob.protocol import RawValueProtocol, ReprProtocol
SAMPLING_FUDGE_FACTOR = 1.2
class MRLogSampler(MRJob):
# We use RawValueProtocol for input to be format agnostic
# and avoid any type of parsing errors
INPUT_PROTOCOL = RawValueProtocol
# We use RawValueProtocol for output so we can output raw lines
# instead of (k, v) pairs
OUTPUT_PROTOCOL = RawValueProtocol
# Encode the intermediate records using repr() instead of JSON, so the
# record doesn't get Unicode-encoded
INTERNAL_PROTOCOL = ReprProtocol
def configure_options(self):
super(MRLogSampler, self).configure_options()
self.add_passthrough_option(
'--sample-size',
type=int,
help='Number of entries to sample.'
)
self.add_passthrough_option(
'--expected-length',
type=int,
help=("Number of entries you expect in the log. If not specified,"
" we'll pass every line to the reducer.")
)
def load_options(self, args):
super(MRLogSampler, self).load_options(args)
if self.options.sample_size is None:
self.option_parser.error('You must specify the --sample-size')
else:
self.sample_size = self.options.sample_size
# If we have an expected length, we can estimate the sampling
# probability for the mapper, so that the reducer doesn't have to
# process all records. Otherwise, pass everything thru to the reducer.
if self.options.expected_length is None:
self.sampling_probability = 1.
else:
# We should be able to bound this probability by using the binomial
# distribution, but I haven't figured it out yet. So, let's just
# fudge it.
self.sampling_probability = (float(self.sample_size) *
SAMPLING_FUDGE_FACTOR /
self.options.expected_length)
def mapper(self, _, line):
"""
For each log line, with probability self.sampling_probability,
yield a None key, and (random seed, line) as the value, so that
the values get sorted randomly and fed into a single reducer.
Args:
line - raw log line
Yields:
key - None
value - (random seed, line)
"""
if random.random() < self.sampling_probability:
seed = '%20i' % random.randint(0, sys.maxint)
yield None, (seed, line)
def reducer(self, _, values):
"""
Now that the values have a random number attached,
they'll come in in random order, so we yield the
first n lines, and return early.
Args:
values - generator of (random_seed, line) pairs
Yields:
key - None
value - random sample of log lines
"""
for line_num, (seed, line) in enumerate(values):
yield None, line
# enumerate() is 0-indexed, so add 1
if line_num + 1 >= self.sample_size:
break
if __name__ == '__main__':
MRLogSampler.run()
|