/usr/lib/python3/dist-packages/pgq/consumer.py is in python3-pgq 3.3.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | """PgQ consumer framework for Python.
"""
from __future__ import division, absolute_import, print_function
from pgq.baseconsumer import BaseConsumer, BaseBatchWalker
from pgq.event import Event
__all__ = ['Consumer']
# Event status codes
EV_UNTAGGED = -1
EV_RETRY = 0
EV_DONE = 1
class RetriableEvent(Event):
"""Event which can be retried
Consumer is supposed to tag them after processing.
"""
__slots__ = ('_status', )
def __init__(self, queue_name, row):
super(RetriableEvent, self).__init__(queue_name, row)
self._status = EV_DONE
def tag_done(self):
self._status = EV_DONE
def get_status(self):
return self._status
def tag_retry(self, retry_time=60):
self._status = EV_RETRY
self.retry_time = retry_time
class RetriableWalkerEvent(RetriableEvent):
"""Redirects status flags to RetriableBatchWalker.
That way event data can be gc'd immediately and
tag_done() events don't need to be remembered.
"""
__slots__ = ('_walker', )
def __init__(self, walker, queue_name, row):
super(RetriableWalkerEvent, self).__init__(queue_name, row)
self._walker = walker
def tag_done(self):
self._walker.tag_event_done(self)
def get_status(self):
self._walker.get_status(self)
def tag_retry(self, retry_time=60):
self._walker.tag_event_retry(self, retry_time)
class RetriableBatchWalker(BaseBatchWalker):
"""BatchWalker that returns RetriableEvents
"""
def __init__(self, curs, batch_id, queue_name, fetch_size=300, consumer_filter=None):
super(RetriableBatchWalker, self).__init__(curs, batch_id, queue_name, fetch_size, consumer_filter)
self.status_map = {}
def _make_event(self, queue_name, row):
return RetriableWalkerEvent(self, queue_name, row)
def tag_event_done(self, event):
if event.id in self.status_map:
del self.status_map[event.id]
def tag_event_retry(self, event, retry_time):
self.status_map[event.id] = (EV_RETRY, retry_time)
def get_status(self, event):
return self.status_map.get(event.id, (EV_DONE, 0))[0]
def iter_status(self):
for res in self.status_map.items():
yield res
class Consumer(BaseConsumer):
"""Normal consumer base class.
Can retry events
"""
_batch_walker_class = RetriableBatchWalker
def _make_event(self, queue_name, row):
return RetriableEvent(queue_name, row)
def _flush_retry(self, curs, batch_id, ev_list):
"""Tag retry events."""
retry = 0
if self.pgq_lazy_fetch:
for ev_id, stat in ev_list.iter_status():
if stat[0] == EV_RETRY:
self._tag_retry(curs, batch_id, ev_id, stat[1])
retry += 1
elif stat[0] != EV_DONE:
raise Exception("Untagged event: id=%d" % ev_id)
else:
for ev in ev_list:
if ev._status == EV_RETRY:
self._tag_retry(curs, batch_id, ev.id, ev.retry_time)
retry += 1
elif ev._status != EV_DONE:
raise Exception("Untagged event: (id=%d, type=%s, data=%s, ex1=%s" % (
ev.id, ev.type, ev.data, ev.extra1))
# report weird events
if retry:
self.stat_increase('retry-events', retry)
def _finish_batch(self, curs, batch_id, ev_list):
"""Tag events and notify that the batch is done."""
self._flush_retry(curs, batch_id, ev_list)
super(Consumer, self)._finish_batch(curs, batch_id, ev_list)
def _tag_retry(self, cx, batch_id, ev_id, retry_time):
"""Tag event for retry. (internal)"""
cx.execute("select pgq.event_retry(%s, %s, %s)",
[batch_id, ev_id, retry_time])
|