This file is indexed.

/usr/share/pyshared/couchdbkit/consumer/cgevent.py is in python-couchdbkit 0.6.5-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# -*- coding: utf-8 -
#
# This file is part of couchdbkit released under the MIT license.
# See the NOTICE for more information.

from __future__ import with_statement

import traceback

import gevent
from gevent import event
from gevent import monkey

from .base import check_callable
from .sync import SyncConsumer
from ..utils import json


class ChangeConsumer(object):
    def __init__(self, db, callback, **params):
        self.process_change = callback
        self.params = params
        self.db = db
        self.stop_event = event.Event()

    def stop(self):
        self.stop_event.set()

    def wait(self):
        gevent.spawn(self._run)
        self.stop_event.wait()

    def wait_async(self):
        gevent.spawn(self._run)

    def _run(self):
        while True:
            try:
                resp = self.db.res.get("_changes", **self.params)
                return self.consume(resp)
            except (SystemExit, KeyboardInterrupt):
                gevent.sleep(5)
                break
            except:
                traceback.print_exc()
                gevent.sleep(5)
                break
        self.stop_event.set()

    def consume(self, resp):
        raise NotImplementedError

class ContinuousChangeConsumer(ChangeConsumer):

    def consume(self, resp):
        with resp.body_stream() as body:
            while True:
                line = body.readline()
                if not line:
                    break
                if line.endswith("\r\n"):
                    line = line[:-2]
                else:
                    line = line[:-1]
                if not line:
                    continue
                self.process_change(line)

class LongPollChangeConsumer(ChangeConsumer):

    def consume(self, resp):
        with resp.body_stream() as body:
            buf = []
            while True:
                data = body.read()
                if not data:
                    break
                buf.append(data)
            change = "".join(buf)
            try:
                change = json.loads(change)
            except ValueError:
                pass
            self.process_change(change)


class GeventConsumer(SyncConsumer):
    def __init__(self, db):
        monkey.patch_socket()
        super(GeventConsumer, self).__init__(db)

    def _fetch(self, cb, **params):
        resp = self.db.res.get("_changes", **params)
        cb(resp.json_body)

    def fetch(self, cb=None, **params):
        if cb is None:
            return super(GeventConsumer, self).wait_once(**params)
        return gevent.spawn(self._fetch, cb, **params)

    def wait_once(self, cb=None, **params):
        if cb is None:
            return super(GeventConsumer, self).wait_once(**params)

        check_callable(cb)
        params.update({"feed": "longpoll"})
        consumer = LongPollChangeConsumer(self.db, callback=cb,
                **params)
        consumer.wait()

    def wait(self, cb, **params):
        check_callable(cb)
        params.update({"feed": "continuous"})
        consumer = ContinuousChangeConsumer(self.db, callback=cb,
                **params)
        consumer.wait()

    def wait_once_async(self, cb, **params):
        check_callable(cb)
        params.update({"feed": "longpoll"})
        consumer = LongPollChangeConsumer(self.db, callback=cb,
                **params)
        return consumer.wait_async()

    def wait_async(self, cb, **params):
        check_callable(cb)
        params.update({"feed": "continuous"})
        consumer = ContinuousChangeConsumer(self.db, callback=cb,
                **params)
        return consumer.wait_async()