This file is indexed.

/usr/share/pyshared/numm/io.py is in python-numm 0.5-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import os

import gobject
import gst

gobject.threads_init()

class WakePipe():
    def __init__(self):
        (self._read_fd, self._write_fd) = os.pipe()

    def sleep(self):
        return os.read(self._read_fd, 1)

    def wake(self):
        os.write(self._write_fd, '\0')

    def close(self):
        os.close(self._read_fd)
        os.close(self._write_fd)

class RunPipeline(object):
    def __init__(self, pipeline):
        self.pipeline = pipeline
        self.errors = []
        self.eos = False
        self.wake = WakePipe()

        bus = pipeline.get_bus()
        bus.set_sync_handler(self._message)

    def _message(self, bus, msg):
        if msg.type == gst.MESSAGE_ERROR:
            self.errors.append(msg)
            self.wake.wake()
        elif msg.type == gst.MESSAGE_EOS:
            self.eos = True
            self.wake.wake()

        return gst.BUS_PASS

    def start(self):
        self.pipeline.set_state(gst.STATE_PLAYING)

    def finish(self):
        ok = self.pipeline.set_state(gst.STATE_NULL)
        # set_sync_handler call is necessary for the pipeline to be gc'd.
        # it must come after STATE_NULL or gst frequently segfaults.
        self.pipeline.get_bus().set_sync_handler(None)

        if ok != gst.STATE_CHANGE_SUCCESS:
            raise RuntimeError()

        state = self.pipeline.get_state()
        assert state[1] == gst.STATE_NULL, state
        self.wake.close()

    def wait(self):
        self.wake.sleep()

    def __iter__(self):
        try:
            self.start()

            while not (self.eos or self.errors):
                self.wake.sleep()

                yield
        finally:
            self.finish()

        if self.errors:
            raise RuntimeError(self.errors[0])

    def run(self):
        for _ in self:
            pass

class Reader(RunPipeline):
    def __init__(self, pipeline, appsink, cb, start_time=0):
        RunPipeline.__init__(self, pipeline)
        self.appsink = appsink
        self.cb = cb

        self.start_time = start_time
        appsink.props.emit_signals = True
        appsink.props.max_buffers = 10
        appsink.props.sync = False
        self.handler_id = appsink.connect('new-buffer', self._new_buffer)

    def _new_buffer(self, _appsink):
        buf = self.appsink.emit('pull-buffer')
        self.wake.wake()

        if buf is None:
            return

        v = self._process_buffer(buf)

        if v is not None:
            self.cb(v)


    def finish(self):
        self.appsink.disconnect(self.handler_id)
        RunPipeline.finish(self)

    def run(self):
        if self.start_time != 0:
            ok = self.pipeline.set_state(gst.STATE_PAUSED)

            state = self.pipeline.get_state()
            assert state[1] == gst.STATE_PAUSED, state

            seek_success = self.pipeline.seek_simple(
                    gst.FORMAT_TIME, gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE,
                    self.start_time)
            if not seek_success:
                raise RuntimeError("seek failed")
        RunPipeline.run(self)


class Writer(RunPipeline):
    def __init__(self, pipeline, appsrc):
        RunPipeline.__init__(self, pipeline)
        self.appsrc = appsrc
        self.appsrc.props.emit_signals = True
        self.appsrc.connect('need-data', self._need_data)
        self.queue = []

        self.start()

    def _need_data(self, src, length):
        self.wake.sleep()
        if len(self.queue) > 0:
            fr = self.queue.pop()
            self.appsrc.emit('push-buffer', gst.Buffer(fr.data))
        else:
            self.appsrc.emit('end-of-stream')

    def write(self, np):
        self.queue.insert(0, np)
        self.wake.wake()

    def close(self):
        self.wake.wake()

def _run_appsrc_pipeline(pipeline, appsrc, get_chunk):
    # XXX: deprecated (need to refactor audio for streaming write)
    position = [0]

    def need_data(src, length):
        try:
            (delta_p, a) = get_chunk(position[0], length)
        except IndexError:
            src.emit('end-of-stream')
            return

        if len(a) == 0:
            src.emit('end-of-stream')
            return

        src.emit('push-buffer', gst.Buffer(a.data))
        position[0] += delta_p

    appsrc.props.emit_signals = True
    appsrc.connect('need-data', need_data)
    run = RunPipeline(pipeline)
    run.run()