/usr/lib/python3/dist-packages/beaker/ext/mongodb.py is in python3-beaker 1.9.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | import datetime
import os
import threading
import time
import pickle
try:
import pymongo
import pymongo.errors
import bson
except ImportError:
pymongo = None
bson = None
from beaker.container import NamespaceManager
from beaker.synchronization import SynchronizerImpl
from beaker.util import SyncDict, machine_identifier
from beaker.crypto.util import sha1
from beaker._compat import string_type, PY2
class MongoNamespaceManager(NamespaceManager):
"""Provides the :class:`.NamespaceManager` API over MongoDB."""
MAX_KEY_LENGTH = 1024
clients = SyncDict()
def __init__(self, namespace, url, **kw):
super(MongoNamespaceManager, self).__init__(namespace)
self.lock_dir = None # MongoDB uses mongo itself for locking.
if pymongo is None:
raise RuntimeError('pymongo3 is not available')
if isinstance(url, string_type):
self.client = MongoNamespaceManager.clients.get(url, pymongo.MongoClient, url)
else:
self.client = url
self.db = self.client.get_default_database()
def _format_key(self, key):
if not isinstance(key, str):
key = key.decode('ascii')
if len(key) > (self.MAX_KEY_LENGTH - len(self.namespace) - 1):
if not PY2:
key = key.encode('utf-8')
key = sha1(key).hexdigest()
return '%s:%s' % (self.namespace, key)
def get_creation_lock(self, key):
return MongoSynchronizer(self._format_key(key), self.client)
def __getitem__(self, key):
self._clear_expired()
entry = self.db.backer_cache.find_one({'_id': self._format_key(key)})
if entry is None:
raise KeyError(key)
return pickle.loads(entry['value'])
def __contains__(self, key):
self._clear_expired()
entry = self.db.backer_cache.find_one({'_id': self._format_key(key)})
return entry is not None
def has_key(self, key):
return key in self
def set_value(self, key, value, expiretime=None):
self._clear_expired()
expiration = None
if expiretime is not None:
expiration = time.time() + expiretime
value = pickle.dumps(value)
self.db.backer_cache.update_one({'_id': self._format_key(key)},
{'$set': {'value': bson.Binary(value),
'expiration': expiration}},
upsert=True)
def __setitem__(self, key, value):
self.set_value(key, value)
def __delitem__(self, key):
self._clear_expired()
self.db.backer_cache.delete_many({'_id': self._format_key(key)})
def do_remove(self):
self.db.backer_cache.delete_many({'_id': {'$regex': '^%s' % self.namespace}})
def keys(self):
return [e['key'].split(':', 1)[-1] for e in self.db.backer_cache.find_all(
{'_id': {'$regex': '^%s' % self.namespace}}
)]
def _clear_expired(self):
now = time.time()
self.db.backer_cache.delete_many({'_id': {'$regex': '^%s' % self.namespace},
'expiration': {'$ne': None, '$lte': now}})
class MongoSynchronizer(SynchronizerImpl):
# If a cache entry generation function can take a lot,
# but 15 minutes is more than a reasonable time.
LOCK_EXPIRATION = 900
MACHINE_ID = machine_identifier()
def __init__(self, identifier, url):
super(MongoSynchronizer, self).__init__()
self.identifier = identifier
if isinstance(url, string_type):
self.client = MongoNamespaceManager.clients.get(url, pymongo.MongoClient, url)
else:
self.client = url
self.db = self.client.get_default_database()
def _clear_expired_locks(self):
now = datetime.datetime.utcnow()
expired = now - datetime.timedelta(seconds=self.LOCK_EXPIRATION)
self.db.beaker_locks.delete_many({'_id': self.identifier, 'timestamp': {'$lte': expired}})
return now
def _get_owner_id(self):
return '%s-%s-%s' % (self.MACHINE_ID, os.getpid(), threading.current_thread().ident)
def do_release_read_lock(self):
self.db.beaker_locks.update_one({'_id': self.identifier, 'readers': self._get_owner_id()},
{'$pull': {'readers': self._get_owner_id()}})
def do_acquire_read_lock(self, wait):
now = self._clear_expired_locks()
while True:
try:
self.db.beaker_locks.update_one({'_id': self.identifier, 'owner': None},
{'$set': {'timestamp': now},
'$push': {'readers': self._get_owner_id()}},
upsert=True)
return True
except pymongo.errors.DuplicateKeyError:
if not wait:
return False
time.sleep(0.2)
def do_release_write_lock(self):
self.db.beaker_locks.delete_one({'_id': self.identifier, 'owner': self._get_owner_id()})
def do_acquire_write_lock(self, wait):
now = self._clear_expired_locks()
while True:
try:
self.db.beaker_locks.update_one({'_id': self.identifier, 'owner': None,
'readers': []},
{'$set': {'owner': self._get_owner_id(),
'timestamp': now}},
upsert=True)
return True
except pymongo.errors.DuplicateKeyError:
if not wait:
return False
time.sleep(0.2)
|