/usr/share/pyshared/pgq/maint.py is in skytools 2.1.12-6.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 | """PgQ maintenance functions."""
import skytools, time
def get_pgq_api_version(curs):
q = "select count(1) from pg_proc p, pg_namespace n"\
" where n.oid = p.pronamespace and n.nspname='pgq'"\
" and p.proname='version';"
curs.execute(q)
if not curs.fetchone()[0]:
return '1.0.0'
curs.execute("select pgq.version()")
return curs.fetchone()[0]
def version_ge(curs, want_ver):
"""Check is db version of pgq is greater than want_ver."""
db_ver = get_pgq_api_version(curs)
want_tuple = map(int, want_ver.split('.'))
db_tuple = map(int, db_ver.split('.'))
if db_tuple[0] != want_tuple[0]:
raise Exception('Wrong major version')
if db_tuple[1] >= want_tuple[1]:
return 1
return 0
class MaintenanceJob(skytools.DBScript):
"""Periodic maintenance."""
def __init__(self, ticker, args):
skytools.DBScript.__init__(self, 'pgqadm', args)
self.ticker = ticker
self.last_time = 0 # start immidiately
self.last_ticks = 0
self.clean_ticks = 1
self.maint_delay = 5*60
def startup(self):
# disable regular DBScript startup()
pass
def reload(self):
skytools.DBScript.reload(self)
# force loop_delay
self.loop_delay = 5
# compat var
self.maint_delay = 60 * self.cf.getfloat('maint_delay_min', -1)
if self.maint_delay < 0:
self.maint_delay = self.cf.getfloat('maint_delay', 5*60)
self.maint_delay = self.cf.getfloat('maint_delay', self.maint_delay)
def work(self):
t = time.time()
if self.last_time + self.maint_delay > t:
return
self.do_maintenance()
self.last_time = t
duration = time.time() - t
self.stat_add('maint_duration', duration)
def do_maintenance(self):
"""Helper function for running maintenance."""
db = self.get_database('db', autocommit=1)
cx = db.cursor()
if skytools.exists_function(cx, "pgq.maint_rotate_tables_step1", 1):
# rotate each queue in own TX
q = "select queue_name from pgq.get_queue_info()"
cx.execute(q)
for row in cx.fetchall():
cx.execute("select pgq.maint_rotate_tables_step1(%s)", [row[0]])
res = cx.fetchone()[0]
if res:
self.log.info('Rotating %s' % row[0])
else:
cx.execute("select pgq.maint_rotate_tables_step1();")
# finish rotation
cx.execute("select pgq.maint_rotate_tables_step2();")
# move retry events to main queue in small blocks
rcount = 0
while 1:
cx.execute('select pgq.maint_retry_events();')
res = cx.fetchone()[0]
rcount += res
if res == 0:
break
if rcount:
self.log.info('Got %d events for retry' % rcount)
# vacuum tables that are needed
cx.execute('set maintenance_work_mem = 32768')
cx.execute('select * from pgq.maint_tables_to_vacuum()')
for row in cx.fetchall():
cx.execute('vacuum %s;' % row[0])
|