/usr/lib/python3/dist-packages/pymysql/tests/base.py is in python3-pymysql 0.8.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | import gc
import json
import os
import re
import warnings
import unittest2
import pymysql
from .._compat import CPYTHON
class PyMySQLTestCase(unittest2.TestCase):
# You can specify your test environment creating a file named
# "databases.json" or editing the `databases` variable below.
fname = os.path.join(os.path.dirname(__file__), "databases.json")
if os.path.exists(fname):
with open(fname) as f:
databases = json.load(f)
else:
databases = [
{"host":"localhost","user":"root",
"passwd":"","db":"test_pymysql", "use_unicode": True, 'local_infile': True},
{"host":"localhost","user":"root","passwd":"","db":"test_pymysql2"}]
def mysql_server_is(self, conn, version_tuple):
"""Return True if the given connection is on the version given or
greater.
e.g.::
if self.mysql_server_is(conn, (5, 6, 4)):
# do something for MySQL 5.6.4 and above
"""
server_version = conn.get_server_info()
server_version_tuple = tuple(
(int(dig) if dig is not None else 0)
for dig in
re.match(r'(\d+)\.(\d+)\.(\d+)', server_version).group(1, 2, 3)
)
return server_version_tuple >= version_tuple
def setUp(self):
self.connections = []
for params in self.databases:
self.connections.append(pymysql.connect(**params))
self.addCleanup(self._teardown_connections)
def _teardown_connections(self):
for connection in self.connections:
connection.close()
def safe_create_table(self, connection, tablename, ddl, cleanup=True):
"""create a table.
Ensures any existing version of that table is first dropped.
Also adds a cleanup rule to drop the table after the test
completes.
"""
cursor = connection.cursor()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
cursor.execute("drop table if exists `%s`" % (tablename,))
cursor.execute(ddl)
cursor.close()
if cleanup:
self.addCleanup(self.drop_table, connection, tablename)
def drop_table(self, connection, tablename):
cursor = connection.cursor()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
cursor.execute("drop table if exists `%s`" % (tablename,))
cursor.close()
def safe_gc_collect(self):
"""Ensure cycles are collected via gc.
Runs additional times on non-CPython platforms.
"""
gc.collect()
if not CPYTHON:
gc.collect()
|