This file is indexed.

/usr/share/pyshared/pgq/maint.py is in skytools 2.1.12-6.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
"""PgQ maintenance functions."""

import skytools, time

def get_pgq_api_version(curs):
    q = "select count(1) from pg_proc p, pg_namespace n"\
        " where n.oid = p.pronamespace and n.nspname='pgq'"\
        "   and p.proname='version';"
    curs.execute(q)
    if not curs.fetchone()[0]:
        return '1.0.0'

    curs.execute("select pgq.version()")
    return curs.fetchone()[0]

def version_ge(curs, want_ver):
    """Check is db version of pgq is greater than want_ver."""
    db_ver = get_pgq_api_version(curs)
    want_tuple = map(int, want_ver.split('.'))
    db_tuple = map(int, db_ver.split('.'))
    if db_tuple[0] != want_tuple[0]:
        raise Exception('Wrong major version')
    if db_tuple[1] >= want_tuple[1]:
        return 1
    return 0

class MaintenanceJob(skytools.DBScript):
    """Periodic maintenance."""
    def __init__(self, ticker, args):
        skytools.DBScript.__init__(self, 'pgqadm', args)
        self.ticker = ticker
        self.last_time = 0 # start immidiately
        self.last_ticks = 0
        self.clean_ticks = 1
        self.maint_delay = 5*60

    def startup(self):
        # disable regular DBScript startup()
        pass

    def reload(self):
        skytools.DBScript.reload(self)

        # force loop_delay
        self.loop_delay = 5

        # compat var
        self.maint_delay = 60 * self.cf.getfloat('maint_delay_min', -1)
        if self.maint_delay < 0:
            self.maint_delay = self.cf.getfloat('maint_delay', 5*60)
        self.maint_delay = self.cf.getfloat('maint_delay', self.maint_delay)

    def work(self):
        t = time.time()
        if self.last_time + self.maint_delay > t:
            return

        self.do_maintenance()

        self.last_time = t
        duration = time.time() - t
        self.stat_add('maint_duration', duration)

    def do_maintenance(self):
        """Helper function for running maintenance."""

        db = self.get_database('db', autocommit=1)
        cx = db.cursor()

        if skytools.exists_function(cx, "pgq.maint_rotate_tables_step1", 1):
            # rotate each queue in own TX
            q = "select queue_name from pgq.get_queue_info()"
            cx.execute(q)
            for row in cx.fetchall():
                cx.execute("select pgq.maint_rotate_tables_step1(%s)", [row[0]])
                res = cx.fetchone()[0]
                if res:
                    self.log.info('Rotating %s' % row[0])
        else:
            cx.execute("select pgq.maint_rotate_tables_step1();")

        # finish rotation
        cx.execute("select pgq.maint_rotate_tables_step2();")

        # move retry events to main queue in small blocks
        rcount = 0
        while 1:
            cx.execute('select pgq.maint_retry_events();')
            res = cx.fetchone()[0]
            rcount += res
            if res == 0:
                break
        if rcount:
            self.log.info('Got %d events for retry' % rcount)

        # vacuum tables that are needed
        cx.execute('set maintenance_work_mem = 32768')
        cx.execute('select * from pgq.maint_tables_to_vacuum()')
        for row in cx.fetchall():
            cx.execute('vacuum %s;' % row[0])