This file is indexed.

/usr/lib/python3/dist-packages/pgq/consumer.py is in python3-pgq 3.3.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""PgQ consumer framework for Python.

"""

from __future__ import division, absolute_import, print_function

from pgq.baseconsumer import BaseConsumer, BaseBatchWalker
from pgq.event import Event

__all__ = ['Consumer']


# Event status codes
EV_UNTAGGED = -1
EV_RETRY = 0
EV_DONE = 1


class RetriableEvent(Event):
    """Event which can be retried

    Consumer is supposed to tag them after processing.
    """
    __slots__ = ('_status', )

    def __init__(self, queue_name, row):
        super(RetriableEvent, self).__init__(queue_name, row)
        self._status = EV_DONE

    def tag_done(self):
        self._status = EV_DONE

    def get_status(self):
        return self._status

    def tag_retry(self, retry_time=60):
        self._status = EV_RETRY
        self.retry_time = retry_time


class RetriableWalkerEvent(RetriableEvent):
    """Redirects status flags to RetriableBatchWalker.

    That way event data can be gc'd immediately and
    tag_done() events don't need to be remembered.
    """
    __slots__ = ('_walker', )

    def __init__(self, walker, queue_name, row):
        super(RetriableWalkerEvent, self).__init__(queue_name, row)
        self._walker = walker

    def tag_done(self):
        self._walker.tag_event_done(self)

    def get_status(self):
        self._walker.get_status(self)

    def tag_retry(self, retry_time=60):
        self._walker.tag_event_retry(self, retry_time)


class RetriableBatchWalker(BaseBatchWalker):
    """BatchWalker that returns RetriableEvents
    """

    def __init__(self, curs, batch_id, queue_name, fetch_size=300, consumer_filter=None):
        super(RetriableBatchWalker, self).__init__(curs, batch_id, queue_name, fetch_size, consumer_filter)
        self.status_map = {}

    def _make_event(self, queue_name, row):
        return RetriableWalkerEvent(self, queue_name, row)

    def tag_event_done(self, event):
        if event.id in self.status_map:
            del self.status_map[event.id]

    def tag_event_retry(self, event, retry_time):
        self.status_map[event.id] = (EV_RETRY, retry_time)

    def get_status(self, event):
        return self.status_map.get(event.id, (EV_DONE, 0))[0]

    def iter_status(self):
        for res in self.status_map.items():
            yield res


class Consumer(BaseConsumer):
    """Normal consumer base class.
    Can retry events
    """

    _batch_walker_class = RetriableBatchWalker

    def _make_event(self, queue_name, row):
        return RetriableEvent(queue_name, row)

    def _flush_retry(self, curs, batch_id, ev_list):
        """Tag retry events."""

        retry = 0
        if self.pgq_lazy_fetch:
            for ev_id, stat in ev_list.iter_status():
                if stat[0] == EV_RETRY:
                    self._tag_retry(curs, batch_id, ev_id, stat[1])
                    retry += 1
                elif stat[0] != EV_DONE:
                    raise Exception("Untagged event: id=%d" % ev_id)
        else:
            for ev in ev_list:
                if ev._status == EV_RETRY:
                    self._tag_retry(curs, batch_id, ev.id, ev.retry_time)
                    retry += 1
                elif ev._status != EV_DONE:
                    raise Exception("Untagged event: (id=%d, type=%s, data=%s, ex1=%s" % (
                                    ev.id, ev.type, ev.data, ev.extra1))

        # report weird events
        if retry:
            self.stat_increase('retry-events', retry)

    def _finish_batch(self, curs, batch_id, ev_list):
        """Tag events and notify that the batch is done."""

        self._flush_retry(curs, batch_id, ev_list)

        super(Consumer, self)._finish_batch(curs, batch_id, ev_list)

    def _tag_retry(self, cx, batch_id, ev_id, retry_time):
        """Tag event for retry. (internal)"""
        cx.execute("select pgq.event_retry(%s, %s, %s)",
                    [batch_id, ev_id, retry_time])