This file is indexed.

/usr/lib/python2.7/dist-packages/zeep/cache.py is in python-zeep 2.5.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import base64
import datetime
import errno
import logging
import os
import threading
from contextlib import contextmanager

import appdirs
import pytz
import six

# The sqlite3 is not available on Google App Engine so we handle the
# ImportError here and set the sqlite3 var to None.
# See https://github.com/mvantellingen/python-zeep/issues/243
try:
    import sqlite3
except ImportError:
    sqlite3 = None

logger = logging.getLogger(__name__)


class Base(object):

    def add(self, url, content):
        raise NotImplemented()

    def get(self, url):
        raise NotImplemented()


class InMemoryCache(Base):
    """Simple in-memory caching using dict lookup with support for timeouts"""
    _cache = {}  # global cache, thread-safe by default

    def __init__(self, timeout=3600):
        self._timeout = timeout

    def add(self, url, content):
        logger.debug("Caching contents of %s", url)
        self._cache[url] = (datetime.datetime.utcnow(), content)

    def get(self, url):
        try:
            created, content = self._cache[url]
        except KeyError:
            pass
        else:
            if not _is_expired(created, self._timeout):
                logger.debug("Cache HIT for %s", url)
                return content
        logger.debug("Cache MISS for %s", url)
        return None


class SqliteCache(Base):
    """Cache contents via an sqlite database on the filesystem"""
    _version = '1'

    def __init__(self, path=None, timeout=3600):

        if sqlite3 is None:
            raise RuntimeError("sqlite3 module is required for the SqliteCache")

        # No way we can support this when we want to achieve thread safety
        if path == ':memory:':
            raise ValueError(
                "The SqliteCache doesn't support :memory: since it is not " +
                "thread-safe. Please use zeep.cache.InMemoryCache()")

        self._lock = threading.RLock()
        self._timeout = timeout
        self._db_path = path if path else _get_default_cache_path()

        # Initialize db
        with self.db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                    CREATE TABLE IF NOT EXISTS request
                    (created timestamp, url text, content text)
                """)
            conn.commit()

    @contextmanager
    def db_connection(self):
        with self._lock:
            connection = sqlite3.connect(
                self._db_path, detect_types=sqlite3.PARSE_DECLTYPES)
            yield connection
            connection.close()

    def add(self, url, content):
        logger.debug("Caching contents of %s", url)
        data = self._encode_data(content)

        with self.db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("DELETE FROM request WHERE url = ?", (url,))
            cursor.execute(
                "INSERT INTO request (created, url, content) VALUES (?, ?, ?)",
                (datetime.datetime.utcnow(), url, data))
            conn.commit()

    def get(self, url):
        with self.db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT created, content FROM request WHERE url=?", (url, ))
            rows = cursor.fetchall()

        if rows:
            created, data = rows[0]
            if not _is_expired(created, self._timeout):
                logger.debug("Cache HIT for %s", url)
                return self._decode_data(data)
        logger.debug("Cache MISS for %s", url)

    def _encode_data(self, data):
        data = base64.b64encode(data)
        if six.PY2:
            return buffer(self._version_string + data)  # noqa
        return self._version_string + data

    def _decode_data(self, data):
        if six.PY2:
            data = str(data)
        if data.startswith(self._version_string):
            return base64.b64decode(data[len(self._version_string):])

    @property
    def _version_string(self):
        prefix = u'$ZEEP:%s$' % self._version
        return bytes(prefix.encode('ascii'))


def _is_expired(value, timeout):
    """Return boolean if the value is expired"""
    if timeout is None:
        return False

    now = datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
    max_age = value.replace(tzinfo=pytz.utc)
    max_age += datetime.timedelta(seconds=timeout)
    return now > max_age


def _get_default_cache_path():
    path = appdirs.user_cache_dir('zeep', False)
    try:
        os.makedirs(path)
    except OSError as exc:
        if exc.errno == errno.EEXIST and os.path.isdir(path):
            pass
        else:
            raise
    return os.path.join(path, 'cache.db')