/usr/bin/pgqadm is in skytools 2.1.12-6.
This file is owned by root:root, with mode 0o755.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 | #!/usr/bin/python
"""PgQ ticker and maintenance.
"""
import sys
import skytools
from pgq.ticker import SmartTicker
from pgq.status import PGQStatus
#from pgq.admin import PGQAdmin
"""TODO:
pgqadm ini check
"""
command_usage = """
%prog [options] INI CMD [subcmd args]
commands:
ticker start ticking & maintenance process
status show overview of queue health
install install code into db
create QNAME create queue
drop QNAME drop queue
register QNAME CONS install code into db
unregister QNAME CONS install code into db
config QNAME [VAR=VAL] show or change queue config
"""
config_allowed_list = {
'queue_ticker_max_count': 'int',
'queue_ticker_max_lag': 'interval',
'queue_ticker_idle_period': 'interval',
'queue_rotation_period': 'interval',
}
class PGQAdmin(skytools.DBScript):
def __init__(self, args):
skytools.DBScript.__init__(self, 'pgqadm', args)
self.set_single_loop(1)
if len(self.args) < 2:
print "need command"
sys.exit(1)
int_cmds = {
'create': self.create_queue,
'drop': self.drop_queue,
'register': self.register,
'unregister': self.unregister,
'install': self.installer,
'config': self.change_config,
}
cmd = self.args[1]
if cmd == "ticker":
script = SmartTicker(args)
elif cmd == "status":
script = PGQStatus(args)
elif cmd in int_cmds:
script = None
self.work = int_cmds[cmd]
else:
print "unknown command"
sys.exit(1)
if self.pidfile:
self.pidfile += ".admin"
self.run_script = script
def start(self):
if self.run_script:
self.run_script.start()
else:
skytools.DBScript.start(self)
def init_optparse(self, parser=None):
p = skytools.DBScript.init_optparse(self, parser)
p.set_usage(command_usage.strip())
return p
def installer(self):
objs = [
skytools.DBLanguage("plpgsql"),
skytools.DBFunction("txid_current_snapshot", 0, sql_file="txid.sql"),
skytools.DBSchema("pgq", sql_file="pgq.sql"),
]
db = self.get_database('db')
curs = db.cursor()
skytools.db_install(curs, objs, self.log)
db.commit()
def create_queue(self):
qname = self.args[2]
self.log.info('Creating queue: %s' % qname)
self.exec_sql("select pgq.create_queue(%s)", [qname])
def drop_queue(self):
qname = self.args[2]
self.log.info('Dropping queue: %s' % qname)
self.exec_sql("select pgq.drop_queue(%s)", [qname])
def register(self):
qname = self.args[2]
cons = self.args[3]
self.log.info('Registering consumer %s on queue %s' % (cons, qname))
self.exec_sql("select pgq.register_consumer(%s, %s)", [qname, cons])
def unregister(self):
qname = self.args[2]
cons = self.args[3]
self.log.info('Unregistering consumer %s from queue %s' % (cons, qname))
self.exec_sql("select pgq.unregister_consumer(%s, %s)", [qname, cons])
def change_config(self):
if len(self.args) < 3:
list = self.get_queue_list()
for qname in list:
self.show_config(qname)
return
qname = self.args[2]
if len(self.args) == 3:
self.show_config(qname)
return
alist = []
for el in self.args[3:]:
k, v = el.split('=')
if k not in config_allowed_list:
qk = "queue_" + k
if qk not in config_allowed_list:
raise Exception('unknown config var: '+k)
k = qk
expr = "%s=%s" % (k, skytools.quote_literal(v))
alist.append(expr)
self.log.info('Change queue %s config to: %s' % (qname, ", ".join(alist)))
sql = "update pgq.queue set %s where queue_name = %s" % (
", ".join(alist), skytools.quote_literal(qname))
self.exec_sql(sql, [])
def exec_sql(self, q, args):
self.log.debug(q)
db = self.get_database('db')
curs = db.cursor()
curs.execute(q, args)
db.commit()
def show_config(self, qname):
fields = []
for f, kind in config_allowed_list.items():
if kind == 'interval':
sql = "extract('epoch' from %s)::text as %s" % (f, f)
fields.append(sql)
else:
fields.append(f)
klist = ", ".join(fields)
q = "select " + klist + " from pgq.queue where queue_name = %s"
db = self.get_database('db')
curs = db.cursor()
curs.execute(q, [qname])
res = curs.dictfetchone()
db.commit()
if res is None:
print "no such queue:", qname
return
print qname
for k in config_allowed_list:
n = k
if k[:6] == "queue_":
n = k[6:]
print " %s\t=%7s" % (n, res[k])
def get_queue_list(self):
db = self.get_database('db')
curs = db.cursor()
curs.execute("select queue_name from pgq.queue order by 1")
rows = curs.fetchall()
db.commit()
list = []
for r in rows:
list.append(r[0])
return list
if __name__ == '__main__':
script = PGQAdmin(sys.argv[1:])
script.start()
|