This file is indexed.

/usr/lib/python2.7/dist-packages/gst-0.10/gst/extend/leveller.py is in python-gst0.10 0.10.22-3ubuntu2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# -*- Mode: Python -*-
# vi:si:et:sw=4:sts=4:ts=4
#
# GStreamer python bindings
# Copyright (C) 2005 Thomas Vander Stichele <thomas at apestaart dot org>

# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
# 
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA

import os
import sys
import math

import gobject
import pygst
pygst.require('0.10')
import gst

import utils
from pygobject import gsignal

import sources
from sources import EOS, ERROR, UNKNOWN_TYPE, WRONG_TYPE

class Leveller(gst.Pipeline):
    """
    I am a pipeline that calculates RMS values and mix-in/out points.
    I will signal 'done' when I'm done scanning the file, with return value
    EOS, ERROR, UNKNOWN_TYPE, or WRONG_TYPE from gst.extend.sources
    """

    gsignal('done', str)

    def __init__(self, filename, threshold=-9.0):
        gst.Pipeline.__init__(self)

        self._filename = filename

        self._thresholddB = threshold
        self._threshold = math.pow(10, self._thresholddB / 10.0)

        self._source = sources.AudioSource(filename)
        self._source.connect('done', self._done_cb)

        self._level = gst.element_factory_make("level")

        self._fakesink = gst.element_factory_make("fakesink")

        self.add(self._source, self._level, self._fakesink)
        self._source.connect("pad-added", self._sourcePadAddedCb)
        self._level.link(self._fakesink)

        # temporary values for each timepoint
        self._rmsdB = {} # hash of channel, rmsdB value
        self._peakdB = 0.0 # highest value over all channels for this time

        # results over the whole file
        self._meansquaresums = [] # list of time -> mean square sum value
        self._peaksdB = [] # list of time -> peak value

        self._lasttime = 0

        # will be set when done
        self.mixin = 0
        self.mixout = 0
        self.length = 0
        self.rms = 0.0
        self.rmsdB = 0.0

    def _sourcePadAddedCb(self, source, pad):
        self._source.link(self._level)

    def do_handle_message(self, message):
        self.debug("got message %r" % message)
        if (message.type == gst.MESSAGE_ELEMENT) and (message.src == self._level):
            struc = message.structure
            endtime = struc["endtime"]
            rmss = struc["rms"]
            peaks = struc["peak"]
            decays = struc["decay"]
            infos = zip(rmss, peaks, decays)
            channid = 0
            for rms,peak,decay in infos:
                self._level_cb(message.src, endtime, channid, rms, peak, decay)
                channid += 1
        elif message.type == gst.MESSAGE_EOS:
            self._eos_cb(message.src)
        # chaining up 
        gst.Pipeline.do_handle_message(self, message)

    def _level_cb(self, element, time, channel, rmsdB, peakdB, decaydB):
        # rms is being signalled in dB
        # FIXME: maybe level should have a way of signalling actual values
        # signals are received in order, so I should get each channel one
        # by one
        if time > self._lasttime and self._lasttime > 0:
            # we have a new time point, so calculate stuff for the old block
            meansquaresum = 0.0
            for i in self._rmsdB.keys():
                meansquaresum += math.pow(10, self._rmsdB[i] / 10.0)
            # average over channels
            meansquaresum /= len(self._rmsdB.keys())
            try:
                rmsdBstr = str(10 * math.log10(meansquaresum))
            except OverflowError:
                rmsdBstr = "(-inf)"
            gst.log("meansquaresum %f (%s dB)" % (meansquaresum, rmsdBstr))

            # update values
            self._peaksdB.append((self._lasttime, peakdB))
            self._meansquaresums.append((self._lasttime, meansquaresum))
            self._rmsdB = {}
            self._peakdB = 0.0

        # store the current values for later processing
        gst.log("time %s, channel %d, rmsdB %f" % (gst.TIME_ARGS(time), channel, rmsdB))
        self._lasttime = time
        self._rmsdB[channel] = rmsdB
        if peakdB > self._peakdB:
            self._peakdB = peakdB

    def _done_cb(self, source, reason):
        gst.debug("done, reason %s" % reason)
        # we ignore eos because we want the whole pipeline to eos
        if reason == EOS:
            return
        self.emit('done', reason)

    def _eos_cb(self, source):
        gst.debug("eos, start calcing")

        # get the highest peak RMS for this track
        highestdB = self._peaksdB[0][1]

        for (time, peakdB) in self._peaksdB:
            if peakdB > highestdB:
                highestdB = peakdB
        gst.debug("highest peak(dB): %f" % highestdB)

        # get the length
        (self.length, peakdB) = self._peaksdB[-1]
        
        # find the mix in point
        for (time, peakdB) in self._peaksdB:
            gst.log("time %s, peakdB %f" % (gst.TIME_ARGS(time), peakdB))
            if peakdB > self._thresholddB + highestdB:
                gst.debug("found mix-in point of %f dB at %s" % (
                    peakdB, gst.TIME_ARGS(time)))
                self.mixin = time
                break

        # reverse and find out point
        self._peaksdB.reverse()
        found = None
        for (time, peakdB) in self._peaksdB:
            if found:
                self.mixout = time
                gst.debug("found mix-out point of %f dB right before %s" % (
                    found, gst.TIME_ARGS(time)))
                break
                
            if peakdB > self._thresholddB + highestdB:
                found = peakdB

        # now calculate RMS between these two points
        weightedsquaresums = 0.0
        lasttime = self.mixin
        for (time, meansquaresum) in self._meansquaresums:
            if time <= self.mixin:
                continue

            delta = time - lasttime
            weightedsquaresums += meansquaresum * delta
            gst.log("added MSS %f over time %s at time %s, now %f" % (
                meansquaresum, gst.TIME_ARGS(delta),
                gst.TIME_ARGS(time), weightedsquaresums))

            lasttime = time
            
            if time > self.mixout:
                break

        # calculate
        try:
            ms = weightedsquaresums / (self.mixout - self.mixin)
        except ZeroDivisionError:
            # this is possible when, for example, the whole sound file is
            # empty
            gst.warning('ZeroDivisionError on %s, mixin %s, mixout %s' % (
                self._filename, gst.TIME_ARGS(self.mixin),
                gst.TIME_ARGS(self.mixout)))
            self.emit('done', WRONG_TYPE)
            return

        self.rms = math.sqrt(ms)
        self.rmsdB = 10 * math.log10(ms)

        self.emit('done', EOS)

    def start(self):
        gst.debug("Setting to PLAYING")
        self.set_state(gst.STATE_PLAYING)
        gst.debug("Set to PLAYING")

    # FIXME: we might want to do this ourselves automatically ?
    def stop(self):
        """
        Stop the leveller, freeing all resources.
        Call after the leveller emitted 'done' to clean up.
        """
        gst.debug("Setting to NULL")
        self.set_state(gst.STATE_NULL)
        gst.debug("Set to NULL")
        utils.gc_collect('Leveller.stop()')

    def clean(self):
        # clean ourselves up completely
        self.stop()
        # let's be ghetto and clean out our bin manually
        self.remove(self._source)
        self.remove(self._level)
        self.remove(self._fakesink)
        gst.debug("Emptied myself")
        self._source.clean()
        utils.gc_collect('Leveller.clean() cleaned up source')
        self._source = None
        self._fakesink = None
        self._level = None
        utils.gc_collect('Leveller.clean() done')

gobject.type_register(Leveller)

if __name__ == "__main__":
    main = gobject.MainLoop()

    try:
        leveller = Leveller(sys.argv[1])
    except IndexError:
        sys.stderr.write("Please give a file to calculate level of\n")
        sys.exit(1)

    print "Starting"
    bus = leveller.get_bus()
    bus.add_signal_watch()
    dontstop = True

    leveller.set_state(gst.STATE_PLAYING)
    
    while dontstop:
        message = bus.poll(gst.MESSAGE_ANY, gst.SECOND)
        if message:
            gst.debug("got message from poll:%s/%r" % (message.type, message))
        else:
            gst.debug("got NOTHING from poll")
        if message:
            if message.type == gst.MESSAGE_EOS:
                print "in: %s, out: %s, length: %s" % (gst.TIME_ARGS(leveller.mixin),
                                                       gst.TIME_ARGS(leveller.mixout),
                                                       gst.TIME_ARGS(leveller.length))
                print "rms: %f, %f dB" % (leveller.rms, leveller.rmsdB)
                dontstop = False
            elif message.type == gst.MESSAGE_ERROR:
                error,debug = message.parse_error()
                print "ERROR[%s] %s" % (error.domain, error.message)
                dontstop = False

    leveller.stop()
    leveller.clean()

    gst.debug('deleting leveller, verify objects are freed')
    utils.gc_collect('quit main loop')
    del leveller
    utils.gc_collect('deleted leveller')
    gst.debug('stopping forever')