This file is indexed.

/usr/share/pyshared/numm/sound.py is in python-numm 0.5-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""
Conversion between audio files and numpy arrays.

Audio data is represented by arrays of shape (n_frames, n_channels),
with a sample rate of 44.1kHz by default.
"""

import gst
import numpy

import numm.io
from numm.async import NummBuffer

def _make_sound_pipeline(path, sample_rate):
    pipeline = gst.parse_launch(
        '''
        filesrc name=filesrc !
        decodebin !
        audioconvert !
        audioresample !
        audio/x-raw-int,rate=%s,width=16,depth=16,signed=true,channels=2 !
        appsink name=appsink
        ''' % (sample_rate))

    filesrc = pipeline.get_by_name('filesrc')
    filesrc.props.location = path

    appsink = pipeline.get_by_name('appsink')
    return (pipeline, appsink)

class SoundReader(numm.io.Reader):
    def __init__(self, path, cb, sample_rate=44100, start=0, n_frames=-1):
        # XXX: These caps ought to be kept in sync with the ones in numm.run.
        (pipeline, appsink) = _make_sound_pipeline(path, sample_rate)
        start_time = int(start * gst.SECOND / float(sample_rate))
        numm.io.Reader.__init__(self, pipeline, appsink, cb, start_time)

        self.sample_rate = sample_rate
        self.n_frames = n_frames

        self.frame_idx = 0
        self.seek_done = False
        self.n_channels = None


    def _process_buffer(self, buffer):

        if self.n_channels is None:
            self.n_channels = buffer.caps[0]['channels']

        l = len(buffer) / (2 * self.n_channels)
        np = numpy.fromstring(buffer, numpy.int16).reshape(l, self.n_channels).view(NummBuffer)
        np.timestamp = buffer.timestamp

        if self.n_frames > 0 and self.frame_idx + np.shape[0] >= self.n_frames:
            np = np[:self.n_frames - self.frame_idx]

        if np.shape[0] == 0:
            self.eos = True
            return None

        self.frame_idx += np.shape[0]
        return np

def _raw_sound_chunks(path, **kw):
    chunks = []
    reader = SoundReader(path, chunks.append, **kw)

    for _ in reader:
        while chunks:
            yield chunks.pop(0)

    while chunks:
        yield chunks.pop(0)

def sound_chunks(path, chunk_size=None, **kw):
    leftover = numpy.zeros((0,2), dtype=numpy.int16)

    for chunk in _raw_sound_chunks(path, **kw):
        if chunk_size is not None:
            leftover = numpy.concatenate([leftover, chunk])

            while len(leftover) >= chunk_size:
                b = leftover[:chunk_size].view(NummBuffer)
                # XXX: Should adjust timestamp.
                b.timestamp = chunk.timestamp
                yield b
                leftover = leftover[chunk_size:]
        else:
            yield chunk

    if len(leftover) > 0:
        extrashape = list(leftover.shape)
        extrashape[0] = chunk_size - extrashape[0]
        extra = numpy.zeros(extrashape, dtype=numpy.int16)
        b = numpy.concatenate([leftover, extra]).view(NummBuffer)
        b.timestamp = chunk.timestamp
        yield b

_extension_map = {"wav": "wavenc !",
                  "mp3": "lamemp3enc ! id3v2mux !",
                  "ogg": "audioconvert ! vorbisenc ! oggmux !",
                  "flac": "flacenc !",
                  "aac": "faac !"
                  }

def _write_sound(np, filepath, opts={}):
    defaults = {
        'sample_rate': 44100,
        'channels': len(np.shape), # assume stereo if >1 dimension
        'format': filepath.split('.')[-1]
        }
    options = dict(defaults)
    options.update(opts)

    options["apipe"] = _extension_map[options["format"]]

    pipeline = gst.parse_launch(
        '''
        appsrc name=appsrc !
        audio/x-raw-int,rate=%(sample_rate)s,width=16,depth=16,channels=%(channels)d,signed=true,endianness=1234 !
        %(apipe)s
        filesink name=filesink
        ''' % (options))

    def get_chunk(position, length):
        return (length, np[position:position+length])

    appsrc = pipeline.get_by_name('appsrc')

    filesink = pipeline.get_by_name('filesink')
    filesink.props.location = filepath

    numm.io._run_appsrc_pipeline(pipeline, appsrc, get_chunk)

def sound2np(filepath, start=0, n_frames=-1, sample_rate=44100):
    """
    Load audio data from a file.
    """

    chunks = []
    reader = SoundReader(filepath, chunks.append, start=start, n_frames=n_frames, sample_rate=sample_rate)
    reader.run()

    if chunks:
        return numpy.concatenate(chunks)
    else:
        return numpy.ndarray((0, 2))

def np2sound(np, filepath, opts={}):
    """
    Save audio data to a file.

    Currently, audio is always saved as WAV data.
    """

    _write_sound(np, filepath, opts)