/usr/share/pyshared/radosgw_agent/lock.py is in radosgw-agent 1.1-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | import logging
import threading
import time
from radosgw_agent import client
log = logging.getLogger(__name__)
class LockBroken(Exception):
pass
class LockRenewFailed(LockBroken):
pass
class LockExpired(LockBroken):
pass
class Lock(threading.Thread):
"""A lock on a shard log that automatically refreshes itself.
It may be used to lock different shards throughout its lifetime.
To lock a new shard, call aquire() with the shard_num desired.
To release the lock, call release_and_clear(). This will raise an
exception if the lock ever failed to be acquired in the timeout
period.
"""
def __init__(self, conn, type_, locker_id, timeout, zone_id):
super(Lock, self).__init__()
self.conn = conn
self.type = type_
self.timeout = timeout
self.lock = threading.Lock()
self.locker_id = locker_id
self.zone_id = zone_id
self.shard_num = None
self.last_locked = None
self.failed = False
def set_shard(self, shard_num):
log.debug('set_shard to %d', shard_num)
with self.lock:
assert self.shard_num is None, \
'attempted to acquire new lock without releasing old one'
self.failed = False
self.last_locked = None
self.shard_num = shard_num
def unset_shard(self):
log.debug('unset shard')
with self.lock:
self.shard_num = None
def acquire(self):
"""Renew an existing lock, or acquire a new one.
The old lock must have already been released if shard_num is specified.
client.NotFound may be raised if the log contains no entries.
"""
log.debug('acquire lock')
with self.lock:
now = time.time()
client.lock_shard(self.conn, self.type, self.shard_num,
self.zone_id, self.timeout, self.locker_id)
self.last_locked = now
def release_and_clear(self):
"""Release the lock currently being held.
Prevent it from being automatically renewed, and check if there
were any errors renewing the current lock or if it expired.
If the lock was not sustained, raise LockAcquireFailed or LockExpired.
"""
log.debug('release and clear lock')
with self.lock:
shard_num = self.shard_num
self.shard_num = None
diff = time.time() - self.last_locked
if diff > self.timeout:
msg = 'lock was not renewed in over %0.2f seconds' % diff
raise LockExpired(msg)
if self.failed:
raise LockRenewFailed()
try:
client.unlock_shard(self.conn, self.type, shard_num,
self.zone_id, self.locker_id)
except client.HttpError as e:
log.warn('failed to unlock shard %d in zone %s: %s',
shard_num, self.zone_id, e)
self.last_locked = None
def run(self):
while True:
if self.shard_num is not None:
try:
self.acquire()
except client.HttpError as e:
log.error('locking shard %d in zone %s failed: %s',
self.shard_num, self.zone_id, e)
self.failed = True
time.sleep(0.5 * self.timeout)
|