/usr/lib/python2.7/dist-packages/zeep/cache.py is in python-zeep 2.5.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | import base64
import datetime
import errno
import logging
import os
import threading
from contextlib import contextmanager
import appdirs
import pytz
import six
# The sqlite3 is not available on Google App Engine so we handle the
# ImportError here and set the sqlite3 var to None.
# See https://github.com/mvantellingen/python-zeep/issues/243
try:
import sqlite3
except ImportError:
sqlite3 = None
logger = logging.getLogger(__name__)
class Base(object):
def add(self, url, content):
raise NotImplemented()
def get(self, url):
raise NotImplemented()
class InMemoryCache(Base):
"""Simple in-memory caching using dict lookup with support for timeouts"""
_cache = {} # global cache, thread-safe by default
def __init__(self, timeout=3600):
self._timeout = timeout
def add(self, url, content):
logger.debug("Caching contents of %s", url)
self._cache[url] = (datetime.datetime.utcnow(), content)
def get(self, url):
try:
created, content = self._cache[url]
except KeyError:
pass
else:
if not _is_expired(created, self._timeout):
logger.debug("Cache HIT for %s", url)
return content
logger.debug("Cache MISS for %s", url)
return None
class SqliteCache(Base):
"""Cache contents via an sqlite database on the filesystem"""
_version = '1'
def __init__(self, path=None, timeout=3600):
if sqlite3 is None:
raise RuntimeError("sqlite3 module is required for the SqliteCache")
# No way we can support this when we want to achieve thread safety
if path == ':memory:':
raise ValueError(
"The SqliteCache doesn't support :memory: since it is not " +
"thread-safe. Please use zeep.cache.InMemoryCache()")
self._lock = threading.RLock()
self._timeout = timeout
self._db_path = path if path else _get_default_cache_path()
# Initialize db
with self.db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS request
(created timestamp, url text, content text)
""")
conn.commit()
@contextmanager
def db_connection(self):
with self._lock:
connection = sqlite3.connect(
self._db_path, detect_types=sqlite3.PARSE_DECLTYPES)
yield connection
connection.close()
def add(self, url, content):
logger.debug("Caching contents of %s", url)
data = self._encode_data(content)
with self.db_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM request WHERE url = ?", (url,))
cursor.execute(
"INSERT INTO request (created, url, content) VALUES (?, ?, ?)",
(datetime.datetime.utcnow(), url, data))
conn.commit()
def get(self, url):
with self.db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT created, content FROM request WHERE url=?", (url, ))
rows = cursor.fetchall()
if rows:
created, data = rows[0]
if not _is_expired(created, self._timeout):
logger.debug("Cache HIT for %s", url)
return self._decode_data(data)
logger.debug("Cache MISS for %s", url)
def _encode_data(self, data):
data = base64.b64encode(data)
if six.PY2:
return buffer(self._version_string + data) # noqa
return self._version_string + data
def _decode_data(self, data):
if six.PY2:
data = str(data)
if data.startswith(self._version_string):
return base64.b64decode(data[len(self._version_string):])
@property
def _version_string(self):
prefix = u'$ZEEP:%s$' % self._version
return bytes(prefix.encode('ascii'))
def _is_expired(value, timeout):
"""Return boolean if the value is expired"""
if timeout is None:
return False
now = datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
max_age = value.replace(tzinfo=pytz.utc)
max_age += datetime.timedelta(seconds=timeout)
return now > max_age
def _get_default_cache_path():
path = appdirs.user_cache_dir('zeep', False)
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
return os.path.join(path, 'cache.db')
|