/var/lib/gnumed/server/pycommon/gmPsql.py is in gnumed-server 21.15-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 | # A Python class to replace the PSQL command-line interpreter
# NOTE: this is not a full replacement for the interpeter, merely
# enough functionality to run gnumed installation scripts
#
# Copyright (C) 2003, 2004 - 2010 GNUmed developers
# Licence: GPL v2 or later
#===================================================================
__author__ = "Ian Haywood"
__license__ = "GPL v2 or later (details at http://www.gnu.org)"
# stdlib
import sys
import os
import re
import logging
import io
_log = logging.getLogger('gm.bootstrapper')
unformattable_error_id = 12345
#===================================================================
class Psql:
def __init__ (self, conn):
"""
db : the interpreter to connect to, must be a DBAPI compliant interface
"""
self.conn = conn
self.vars = {'ON_ERROR_STOP': None}
#---------------------------------------------------------------
def match(self, pattern):
match = re.match(pattern, self.line)
if match is None:
return 0
self.groups = match.groups()
return 1
#---------------------------------------------------------------
def fmt_msg(self, aMsg):
try:
tmp = u"%s:%d: %s" % (self.filename, self.lineno-1, aMsg)
tmp = tmp.replace(u'\r', u'')
tmp = tmp.replace(u'\n', u'')
except UnicodeDecodeError:
global unformattable_error_id
tmp = u"%s:%d: <cannot unicode(msg), printing on console with ID [#%d]>" % (self.filename, self.lineno-1, unformattable_error_id)
try:
print('ERROR: GNUmed bootstrap #%d:' % unformattable_error_id)
print(aMsg)
except: pass
unformattable_error_id += 1
return tmp
#---------------------------------------------------------------
def run (self, filename):
"""
filename: a file, containg semicolon-separated SQL commands
"""
_log.debug('processing [%s]', filename)
curs = self.conn.cursor()
curs.execute(u'show session authorization')
start_auth = curs.fetchall()[0][0]
curs.close()
_log.debug(u'session auth: %s', start_auth)
if os.access (filename, os.R_OK):
sql_file = io.open(filename, mode = 'rt', encoding = 'utf8')
else:
_log.error(u"cannot open file [%s]", filename)
return 1
self.lineno = 0
self.filename = filename
in_string = False
bracketlevel = 0
curr_cmd = ''
curs = self.conn.cursor()
for self.line in sql_file.readlines():
self.lineno += 1
if len(self.line.strip()) == 0:
continue
# \set
if self.match(r"^\\set (\S+) (\S+)"):
_log.debug(u'"\set" found: %s', self.groups)
self.vars[self.groups[0]] = self.groups[1]
if self.groups[0] == 'ON_ERROR_STOP':
# adjusting from string to int so that "1" -> 1 -> True
self.vars['ON_ERROR_STOP'] = int(self.vars['ON_ERROR_STOP'])
continue
# \unset
if self.match (r"^\\unset (\S+)"):
self.vars[self.groups[0]] = None
continue
# other '\' commands
if self.match (r"^\\(.*)") and not in_string:
# most other \ commands are for controlling output formats, don't make
# much sense in an installation script, so we gently ignore them
_log.warning(self.fmt_msg(u"psql command \"\\%s\" being ignored " % self.groups[0]))
continue
# non-'\' commands
this_char = self.line[0]
# loop over characters in line
for next_char in self.line[1:] + ' ':
# start/end of string detected
if this_char == "'":
in_string = not in_string
# detect "--"-style comments
if this_char == '-' and next_char == '-' and not in_string:
break
# detect bracketing
if this_char == '(' and not in_string:
bracketlevel += 1
if this_char == ')' and not in_string:
bracketlevel -= 1
# have we:
# - found end of command ?
# - are not inside a string ?
# - are not inside bracket pair ?
if not ((in_string is False) and (bracketlevel == 0) and (this_char == ';')):
curr_cmd += this_char
else:
if curr_cmd.strip() != '':
try:
curs.execute(curr_cmd)
try:
data = curs.fetchall()
_log.debug(u'cursor data: %s', data)
except StandardError: # actually: psycopg2.ProgrammingError but no handle
pass
except Exception as error:
_log.exception(curr_cmd)
if re.match(r"^NOTICE:.*", str(error)):
_log.warning(self.fmt_msg(error))
else:
_log.error(self.fmt_msg(error))
if hasattr(error, 'diag'):
for prop in dir(error.diag):
if prop.startswith(u'__'):
continue
val = getattr(error.diag, prop)
if val is None:
continue
_log.error(u'PG diags %s: %s', prop, val)
if self.vars['ON_ERROR_STOP']:
self.conn.commit()
curs.close()
return 1
self.conn.commit()
curs.close()
curs = self.conn.cursor()
curr_cmd = ''
this_char = next_char
# end of loop over chars
# end of loop over lines
self.conn.commit()
curs.execute(u'show session authorization')
end_auth = curs.fetchall()[0][0]
curs.close()
_log.debug(u'session auth after sql file processing: %s', end_auth)
if start_auth != end_auth:
_log.error('session auth changed before/after processing sql file')
return 0
#===================================================================
# testing code
if __name__ == '__main__':
if len(sys.argv) < 2:
sys.exit()
if sys.argv[1] != 'test':
sys.exit()
#from pyPgSQL import PgSQL
conn = PgSQL.connect(user='gm-dbo', database = 'gnumed')
psql = Psql(conn)
psql.run(sys.argv[1])
conn.close()
|