/usr/lib/python3/dist-packages/pgq/localconsumer.py is in python3-pgq 3.3.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 | """
Consumer that stores last applied position in local file.
For cases where the consumer cannot use single database for remote tracking.
To be subclassed, then override .process_local_batch() or .process_local_event()
methods.
"""
from __future__ import division, absolute_import, print_function
import sys
import os
import errno
import skytools
from pgq.baseconsumer import BaseConsumer
__all__ = ['LocalConsumer']
class LocalConsumer(BaseConsumer):
"""Consumer that applies batches sequentially in second database.
Requirements:
- Whole batch in one TX.
- Must not use retry queue.
Features:
- Can detect if several batches are already applied to dest db.
- If some ticks are lost, allows to seek back on queue.
Whether it succeeds, depends on pgq configuration.
Config options::
## Parameters for LocalConsumer ##
# file location where last applied tick is tracked
local_tracking_file = ~/state/%(job_name)s.tick
"""
def reload(self):
super(LocalConsumer, self).reload()
self.local_tracking_file = self.cf.getfile('local_tracking_file')
if not os.path.exists(os.path.dirname(self.local_tracking_file)):
raise skytools.UsageError("path does not exist: %s" % self.local_tracking_file)
def init_optparse(self, parser=None):
p = super(LocalConsumer, self).init_optparse(parser)
p.add_option("--rewind", action="store_true",
help="change queue position according to local tick")
p.add_option("--reset", action="store_true",
help="reset local tick based on queue position")
return p
def startup(self):
if self.options.rewind:
self.rewind()
sys.exit(0)
if self.options.reset:
self.dst_reset()
sys.exit(0)
super(LocalConsumer, self).startup()
self.check_queue()
def check_queue(self):
queue_tick = -1
local_tick = self.load_local_tick()
db = self.get_database(self.db_name)
curs = db.cursor()
q = "select last_tick from pgq.get_consumer_info(%s, %s)"
curs.execute(q, [self.queue_name, self.consumer_name])
rows = curs.fetchall()
if len(rows) == 1:
queue_tick = rows[0]['last_tick']
db.commit()
if queue_tick < 0:
if local_tick >= 0:
self.log.info("Registering consumer at tick %d", local_tick)
q = "select * from pgq.register_consumer_at(%s, %s, %s)"
curs.execute(q, [self.queue_name, self.consumer_name, local_tick])
else:
self.log.info("Registering consumer at queue top")
q = "select * from pgq.register_consumer(%s, %s)"
curs.execute(q, [self.queue_name, self.consumer_name])
elif local_tick < 0:
self.log.info("Local tick missing, storing queue tick %d", queue_tick)
self.save_local_tick(queue_tick)
elif local_tick > queue_tick:
self.log.warning("Tracking out of sync: queue=%d local=%d. Repositioning on queue. [Database failure?]",
queue_tick, local_tick)
q = "select * from pgq.register_consumer_at(%s, %s, %s)"
curs.execute(q, [self.queue_name, self.consumer_name, local_tick])
elif local_tick < queue_tick:
self.log.warning("Tracking out of sync: queue=%d local=%d. Rewinding queue. [Lost file data?]",
queue_tick, local_tick)
q = "select * from pgq.register_consumer_at(%s, %s, %s)"
curs.execute(q, [self.queue_name, self.consumer_name, local_tick])
else:
self.log.info("Ticks match: Queue=%d Local=%d", queue_tick, local_tick)
def work(self):
if self.work_state < 0:
self.check_queue()
return super(LocalConsumer, self).work()
def process_batch(self, db, batch_id, event_list):
"""Process all events in batch.
"""
# check if done
if self.is_batch_done():
return
# actual work
self.process_local_batch(db, batch_id, event_list)
# finish work
self.set_batch_done()
def process_local_batch(self, db, batch_id, event_list):
"""Overridable method to process whole batch."""
for ev in event_list:
self.process_local_event(db, batch_id, ev)
def process_local_event(self, db, batch_id, ev):
"""Overridable method to process one event at a time."""
raise Exception('process_local_event not implemented')
def is_batch_done(self):
"""Helper function to keep track of last successful batch in external database.
"""
local_tick = self.load_local_tick()
cur_tick = self.batch_info['tick_id']
prev_tick = self.batch_info['prev_tick_id']
if local_tick < 0:
# seems this consumer has not run yet?
return False
if prev_tick == local_tick:
# on track
return False
if cur_tick == local_tick:
# current batch is already applied, skip it
return True
# anything else means problems
raise Exception('Lost position: batch %d..%d, dst has %d' % (
prev_tick, cur_tick, local_tick))
def set_batch_done(self):
"""Helper function to set last successful batch in external database.
"""
tick_id = self.batch_info['tick_id']
self.save_local_tick(tick_id)
def register_consumer(self):
new = super(LocalConsumer, self).register_consumer()
if new: # fixme
self.dst_reset()
def unregister_consumer(self):
"""If unregistering, also clean completed tick table on dest."""
super(LocalConsumer, self).unregister_consumer()
self.dst_reset()
def rewind(self):
dst_tick = self.load_local_tick()
if dst_tick >= 0:
src_db = self.get_database(self.db_name)
src_curs = src_db.cursor()
self.log.info("Rewinding queue to local tick %d", dst_tick)
q = "select pgq.register_consumer_at(%s, %s, %s)"
src_curs.execute(q, [self.queue_name, self.consumer_name, dst_tick])
src_db.commit()
else:
self.log.error('Cannot rewind, no tick found in local file')
def dst_reset(self):
self.log.info("Removing local tracking file")
try:
os.remove(self.local_tracking_file)
except:
pass
def load_local_tick(self):
"""Reads stored tick or -1."""
try:
f = open(self.local_tracking_file, 'r')
buf = f.read()
f.close()
data = buf.strip()
if data:
tick_id = int(data)
else:
tick_id = -1
return tick_id
except IOError as ex:
if ex.errno == errno.ENOENT:
return -1
raise
def save_local_tick(self, tick_id):
"""Store tick in local file."""
data = str(tick_id)
skytools.write_atomic(self.local_tracking_file, data)
|