/usr/share/pyshared/sqlobject/postgres/pgconnection.py is in python-sqlobject 0.12.4-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 | from sqlobject.dbconnection import DBAPI
import re
from sqlobject import col
from sqlobject import sqlbuilder
from sqlobject.converters import registerConverter
class PostgresConnection(DBAPI):
supportTransactions = True
dbName = 'postgres'
schemes = [dbName, 'postgresql']
def __init__(self, dsn=None, host=None, port=None, db=None,
user=None, password=None, backend='psycopg', unicodeCols=False,
**kw):
backends = backend
for backend in backends.split(','):
backend = backend.strip()
if not backend:
continue
try:
if backend == 'psycopg2':
import psycopg2 as psycopg
elif backend == 'psycopg1':
import psycopg
elif backend == 'psycopg':
try:
import psycopg2 as psycopg
except ImportError:
import psycopg
elif backend == 'pygresql':
import pgdb
self.module = pgdb
else:
raise ValueError('Unknown PostgreSQL backend "%s", expected psycopg2, psycopg1 or pygresql' % backend)
except ImportError:
pass
else:
break
else:
raise ImportError('Cannot find a PostgreSQL backend, tried %s' % backends)
if backend.startswith('psycopg'):
self.module = psycopg
# Register a converter for psycopg Binary type.
registerConverter(type(psycopg.Binary('')),
PsycoBinaryConverter)
self.user = user
self.host = host
self.port = port
self.db = db
self.password = password
self.dsn_dict = dsn_dict = {}
if host:
dsn_dict["host"] = host
if port:
if backend == 'pygresql':
dsn_dict["host"] = "%s:%d" % (host, port)
else:
if psycopg.__version__.split('.')[0] == '1':
dsn_dict["port"] = str(port)
else:
dsn_dict["port"] = port
if db:
dsn_dict["database"] = db
if user:
dsn_dict["user"] = user
if password:
dsn_dict["password"] = password
self.use_dsn = dsn is not None
if dsn is None:
if backend == 'pygresql':
dsn = ''
if host:
dsn += host
dsn += ':'
if db:
dsn += db
dsn += ':'
if user:
dsn += user
dsn += ':'
if password:
dsn += password
else:
dsn = []
if db:
dsn.append('dbname=%s' % db)
if user:
dsn.append('user=%s' % user)
if password:
dsn.append('password=%s' % password)
if host:
dsn.append('host=%s' % host)
if port:
dsn.append('port=%d' % port)
dsn = ' '.join(dsn)
self.dsn = dsn
self.unicodeCols = unicodeCols
self.schema = kw.pop('schema', None)
if "charset" in kw:
self.dbEncoding = kw.pop("charset")
else:
self.dbEncoding = None
DBAPI.__init__(self, **kw)
def connectionFromURI(cls, uri):
user, password, host, port, path, args = cls._parseURI(uri)
path = path.strip('/')
if (host is None) and path.count('/'): # Non-default unix socket
path_parts = path.split('/')
host = '/' + '/'.join(path_parts[:-1])
path = path_parts[-1]
return cls(host=host, port=port, db=path, user=user, password=password, **args)
connectionFromURI = classmethod(connectionFromURI)
def _setAutoCommit(self, conn, auto):
# psycopg2 does not have an autocommit method.
if hasattr(conn, 'autocommit'):
conn.autocommit(auto)
def makeConnection(self):
try:
if self.use_dsn:
conn = self.module.connect(self.dsn)
else:
conn = self.module.connect(**self.dsn_dict)
except self.module.OperationalError, e:
raise self.module.OperationalError("%s; used connection string %r" % (e, self.dsn))
if self.autoCommit:
# psycopg2 does not have an autocommit method.
if hasattr(conn, 'autocommit'):
conn.autocommit(1)
c = conn.cursor()
if self.schema:
c.execute("SET search_path TO " + self.schema)
dbEncoding = self.dbEncoding
if dbEncoding:
c.execute("SET client_encoding TO %s" % dbEncoding)
return conn
def _queryInsertID(self, conn, soInstance, id, names, values):
table = soInstance.sqlmeta.table
idName = soInstance.sqlmeta.idName
sequenceName = soInstance.sqlmeta.idSequence or \
'%s_%s_seq' % (table, idName)
c = conn.cursor()
if id is None:
c.execute("SELECT NEXTVAL('%s')" % sequenceName)
id = c.fetchone()[0]
names = [idName] + names
values = [id] + values
q = self._insertSQL(table, names, values)
if self.debug:
self.printDebug(conn, q, 'QueryIns')
c.execute(q)
if self.debugOutput:
self.printDebug(conn, id, 'QueryIns', 'result')
return id
def _queryAddLimitOffset(cls, query, start, end):
if not start:
return "%s LIMIT %i" % (query, end)
if not end:
return "%s OFFSET %i" % (query, start)
return "%s LIMIT %i OFFSET %i" % (query, end-start, start)
_queryAddLimitOffset = classmethod(_queryAddLimitOffset)
def createColumn(self, soClass, col):
return col.postgresCreateSQL()
def createReferenceConstraint(self, soClass, col):
return col.postgresCreateReferenceConstraint()
def createIndexSQL(self, soClass, index):
return index.postgresCreateIndexSQL(soClass)
def createIDColumn(self, soClass):
key_type = {int: "SERIAL", str: "TEXT"}[soClass.sqlmeta.idType]
return '%s %s PRIMARY KEY' % (soClass.sqlmeta.idName, key_type)
def dropTable(self, tableName, cascade=False):
self.query("DROP TABLE %s %s" % (tableName,
cascade and 'CASCADE' or ''))
def joinSQLType(self, join):
return 'INT NOT NULL'
def tableExists(self, tableName):
result = self.queryOne("SELECT COUNT(relname) FROM pg_class WHERE relname = %s"
% self.sqlrepr(tableName))
return result[0]
def addColumn(self, tableName, column):
self.query('ALTER TABLE %s ADD COLUMN %s' %
(tableName,
column.postgresCreateSQL()))
def delColumn(self, sqlmeta, column):
self.query('ALTER TABLE %s DROP COLUMN %s' % (sqlmeta.table, column.dbName))
def columnsFromSchema(self, tableName, soClass):
keyQuery = """
SELECT pg_catalog.pg_get_constraintdef(oid) as condef
FROM pg_catalog.pg_constraint r
WHERE r.conrelid = %s::regclass AND r.contype = 'f'"""
colQuery = """
SELECT a.attname,
pg_catalog.format_type(a.atttypid, a.atttypmod), a.attnotnull,
(SELECT substring(d.adsrc for 128) FROM pg_catalog.pg_attrdef d
WHERE d.adrelid=a.attrelid AND d.adnum = a.attnum)
FROM pg_catalog.pg_attribute a
WHERE a.attrelid =%s::regclass
AND a.attnum > 0 AND NOT a.attisdropped
ORDER BY a.attnum"""
primaryKeyQuery = """
SELECT pg_index.indisprimary,
pg_catalog.pg_get_indexdef(pg_index.indexrelid)
FROM pg_catalog.pg_class c, pg_catalog.pg_class c2,
pg_catalog.pg_index AS pg_index
WHERE c.relname = %s
AND c.oid = pg_index.indrelid
AND pg_index.indexrelid = c2.oid
AND pg_index.indisprimary
"""
keyData = self.queryAll(keyQuery % self.sqlrepr(tableName))
keyRE = re.compile(r"\((.+)\) REFERENCES (.+)\(")
keymap = {}
for (condef,) in keyData:
match = keyRE.search(condef)
if match:
field, reftable = match.groups()
keymap[field] = reftable.capitalize()
primaryData = self.queryAll(primaryKeyQuery % self.sqlrepr(tableName))
primaryRE = re.compile(r'CREATE .*? USING .* \((.+?)\)')
primaryKey = None
for isPrimary, indexDef in primaryData:
match = primaryRE.search(indexDef)
assert match, "Unparseable contraint definition: %r" % indexDef
assert primaryKey is None, "Already found primary key (%r), then found: %r" % (primaryKey, indexDef)
primaryKey = match.group(1)
assert primaryKey, "No primary key found in table %r" % tableName
if primaryKey.startswith('"'):
assert primaryKey.endswith('"')
primaryKey = primaryKey[1:-1]
colData = self.queryAll(colQuery % self.sqlrepr(tableName))
results = []
if self.unicodeCols:
client_encoding = self.queryOne("SHOW client_encoding")[0]
for field, t, notnull, defaultstr in colData:
if field == primaryKey:
continue
if keymap.has_key(field):
colClass = col.ForeignKey
kw = {'foreignKey': soClass.sqlmeta.style.dbTableToPythonClass(keymap[field])}
name = soClass.sqlmeta.style.dbColumnToPythonAttr(field)
if name.endswith('ID'):
name = name[:-2]
kw['name'] = name
else:
colClass, kw = self.guessClass(t)
if self.unicodeCols and colClass is col.StringCol:
colClass = col.UnicodeCol
kw['dbEncoding'] = client_encoding
kw['name'] = soClass.sqlmeta.style.dbColumnToPythonAttr(field)
kw['dbName'] = field
kw['notNone'] = notnull
if defaultstr is not None:
kw['default'] = self.defaultFromSchema(colClass, defaultstr)
elif not notnull:
kw['default'] = None
results.append(colClass(**kw))
return results
def guessClass(self, t):
if t.count('point'): # poINT before INT
return col.StringCol, {}
elif t.count('int'):
return col.IntCol, {}
elif t.count('varying') or t.count('varchar'):
if '(' in t:
return col.StringCol, {'length': int(t[t.index('(')+1:-1])}
else: # varchar without length in Postgres means any length
return col.StringCol, {}
elif t.startswith('character('):
return col.StringCol, {'length': int(t[t.index('(')+1:-1]),
'varchar': False}
elif t.count('float') or t.count('real') or t.count('double'):
return col.FloatCol, {}
elif t == 'text':
return col.StringCol, {}
elif t.startswith('timestamp'):
return col.DateTimeCol, {}
elif t.startswith('datetime'):
return col.DateTimeCol, {}
elif t.startswith('date'):
return col.DateCol, {}
elif t.startswith('bool'):
return col.BoolCol, {}
elif t.startswith('bytea'):
return col.BLOBCol, {}
else:
return col.Col, {}
def defaultFromSchema(self, colClass, defaultstr):
"""
If the default can be converted to a python constant, convert it.
Otherwise return is as a sqlbuilder constant.
"""
if colClass == col.BoolCol:
if defaultstr == 'false':
return False
elif defaultstr == 'true':
return True
return getattr(sqlbuilder.const, defaultstr)
def _createOrDropDatabase(self, op="CREATE"):
# We have to connect to *some* database, so we'll connect to
# template1, which is a common open database.
# @@: This doesn't use self.use_dsn or self.dsn_dict
if self.backend == 'pygresql':
dsn = '%s:template1:%s:%s' % (
self.host or '', self.user or '', self.password or '')
else:
dsn = 'dbname=template1'
if self.user:
dsn += ' user=%s' % self.user
if self.password:
dsn += ' password=%s' % self.password
if self.host:
dsn += ' host=%s' % self.host
conn = self.module.connect(dsn)
cur = conn.cursor()
# We must close the transaction with a commit so that
# the CREATE DATABASE can work (which can't be in a transaction):
cur.execute('COMMIT')
cur.execute('%s DATABASE %s' % (op, self.db))
cur.close()
conn.close()
def createEmptyDatabase(self):
self._createOrDropDatabase()
def dropDatabase(self):
self._createOrDropDatabase(op="DROP")
# Converter for psycopg Binary type.
def PsycoBinaryConverter(value, db):
assert db == 'postgres'
return str(value)
|