/usr/lib/python3/dist-packages/kazoo/retry.py is in python3-kazoo 2.2.1-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | import logging
import random
import time
from kazoo.exceptions import (
ConnectionClosedError,
ConnectionLoss,
KazooException,
OperationTimeoutError,
SessionExpiredError,
)
log = logging.getLogger(__name__)
class ForceRetryError(Exception):
"""Raised when some recipe logic wants to force a retry."""
class RetryFailedError(KazooException):
"""Raised when retrying an operation ultimately failed, after
retrying the maximum number of attempts.
"""
class InterruptedError(RetryFailedError):
"""Raised when the retry is forcibly interrupted by the interrupt
function"""
class KazooRetry(object):
"""Helper for retrying a method in the face of retry-able
exceptions"""
RETRY_EXCEPTIONS = (
ConnectionLoss,
OperationTimeoutError,
ForceRetryError
)
EXPIRED_EXCEPTIONS = (
SessionExpiredError,
)
def __init__(self, max_tries=1, delay=0.1, backoff=2, max_jitter=0.8,
max_delay=3600, ignore_expire=True, sleep_func=time.sleep,
deadline=None, interrupt=None):
"""Create a :class:`KazooRetry` instance for retrying function
calls
:param max_tries: How many times to retry the command. -1 means
infinite tries.
:param delay: Initial delay between retry attempts.
:param backoff: Backoff multiplier between retry attempts.
Defaults to 2 for exponential backoff.
:param max_jitter: Additional max jitter period to wait between
retry attempts to avoid slamming the server.
:param max_delay: Maximum delay in seconds, regardless of other
backoff settings. Defaults to one hour.
:param ignore_expire:
Whether a session expiration should be ignored and treated
as a retry-able command.
:param interrupt:
Function that will be called with no args that may return
True if the retry should be ceased immediately. This will
be called no more than every 0.1 seconds during a wait
between retries.
"""
self.max_tries = max_tries
self.delay = delay
self.backoff = backoff
self.max_jitter = int(max_jitter * 100)
self.max_delay = float(max_delay)
self._attempts = 0
self._cur_delay = delay
self.deadline = deadline
self._cur_stoptime = None
self.sleep_func = sleep_func
self.retry_exceptions = self.RETRY_EXCEPTIONS
self.interrupt = interrupt
if ignore_expire:
self.retry_exceptions += self.EXPIRED_EXCEPTIONS
def reset(self):
"""Reset the attempt counter"""
self._attempts = 0
self._cur_delay = self.delay
self._cur_stoptime = None
def copy(self):
"""Return a clone of this retry manager"""
obj = KazooRetry(max_tries=self.max_tries,
delay=self.delay,
backoff=self.backoff,
max_jitter=self.max_jitter / 100.0,
max_delay=self.max_delay,
sleep_func=self.sleep_func,
deadline=self.deadline,
interrupt=self.interrupt)
obj.retry_exceptions = self.retry_exceptions
return obj
def __call__(self, func, *args, **kwargs):
"""Call a function with arguments until it completes without
throwing a Kazoo exception
:param func: Function to call
:param args: Positional arguments to call the function with
:params kwargs: Keyword arguments to call the function with
The function will be called until it doesn't throw one of the
retryable exceptions (ConnectionLoss, OperationTimeout, or
ForceRetryError), and optionally retrying on session
expiration.
"""
self.reset()
while True:
try:
if self.deadline is not None and self._cur_stoptime is None:
self._cur_stoptime = time.time() + self.deadline
return func(*args, **kwargs)
except ConnectionClosedError:
raise
except self.retry_exceptions:
# Note: max_tries == -1 means infinite tries.
if self._attempts == self.max_tries:
raise RetryFailedError("Too many retry attempts")
self._attempts += 1
sleeptime = self._cur_delay + (
random.randint(0, self.max_jitter) / 100.0)
if self._cur_stoptime is not None and \
time.time() + sleeptime >= self._cur_stoptime:
raise RetryFailedError("Exceeded retry deadline")
if self.interrupt:
while sleeptime > 0:
# Break the time period down and sleep for no
# longer than 0.1 before calling the interrupt
if sleeptime < 0.1:
self.sleep_func(sleeptime)
sleeptime -= sleeptime
else:
self.sleep_func(0.1)
sleeptime -= 0.1
if self.interrupt():
raise InterruptedError()
else:
self.sleep_func(sleeptime)
self._cur_delay = min(self._cur_delay * self.backoff,
self.max_delay)
|