/usr/share/pyshared/storm/twisted/transact.py is in python-storm 0.19-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | import time
import random
import transaction
from functools import wraps
from zope.component import getUtility
from storm.zope.interfaces import IZStorm
from storm.exceptions import IntegrityError, DisconnectionError
from twisted.internet.threads import deferToThreadPool
class Transactor(object):
"""Run in a thread code that needs to interact with the database.
This class makes sure that code interacting with the database is run
in a separate thread and that the associated transaction is aborted or
committed in the same thread.
@param threadpool: The C{ThreadPool} to get threads from.
@param _transaction: The C{TransactionManager} to use, for test cases only.
@ivar retries: Maximum number of retries upon retriable exceptions. The
default is to retry a function up to 2 times upon possibly transient
or spurious errors like L{IntegrityError} and L{DisconnectionError}.
@see: C{twisted.python.threadpool.ThreadPool}
"""
retries = 2
def __init__(self, threadpool, _transaction=None):
self._threadpool = threadpool
if _transaction is None:
_transaction = transaction
self._transaction = _transaction
self._retriable_errors = (DisconnectionError, IntegrityError)
try:
from psycopg2.extensions import TransactionRollbackError
self._retriable_errors += (TransactionRollbackError,)
except ImportError:
pass
def run(self, function, *args, **kwargs):
"""Run C{function} in a thread.
The function is run in a thread by a function wrapper, which
commits the transaction if the function runs successfully. If it
raises an exception the transaction is aborted.
@param function: The function to run.
@param args: Positional arguments to pass to C{function}.
@param kwargs: Keyword arguments to pass to C{function}.
@return: A C{Deferred} that will fire after the function has been run.
"""
# Inline the reactor import here for sake of safeness, in case a
# custom reactor needs to be installed
from twisted.internet import reactor
return deferToThreadPool(
reactor, self._threadpool, self._wrap, function, *args, **kwargs)
def _wrap(self, function, *args, **kwargs):
retries = 0
while True:
try:
result = function(*args, **kwargs)
self._transaction.commit()
except self._retriable_errors, error:
if isinstance(error, DisconnectionError):
# If we got a disconnection, calling rollback may not be
# enough because psycopg2 doesn't necessarily use the
# connection, so we call a dummy query to be sure that all
# the stores are correct.
zstorm = getUtility(IZStorm)
for name, store in zstorm.iterstores():
try:
store.execute("SELECT 1")
except DisconnectionError:
pass
self._transaction.abort()
if retries < self.retries:
retries += 1
time.sleep(random.uniform(1, 2 ** retries))
continue
else:
raise
except:
self._transaction.abort()
raise
else:
return result
def transact(method):
"""Decorate L{method} so that it is invoked via L{Transactor.run}.
@param method: The method to decorate.
@return: A decorated method.
@note: The return value of the decorated method should *not* contain
any reference to Storm objects, because they were retrieved in
a different thread and cannot be used outside it.
Example:
from twisted.python.threadpool import ThreadPool
from storm.twisted.transact import Transactor, transact
class Foo(object):
def __init__(self, transactor):
self.transactor = transactor
@transact
def bar(self):
# code that uses Storm
threadpool = ThreadPool(0, 10)
threadpool.start()
transactor = Transactor(threadpool)
foo = Foo(transactor)
deferred = foo.bar()
deferred.addCallback(...)
"""
@wraps(method)
def wrapper(self, *args, **kwargs):
return self.transactor.run(method, self, *args, **kwargs)
return wrapper
|