/usr/lib/python2.7/dist-packages/Bcfg2/Reporting/Transport/RedisTransport.py is in bcfg2-server 1.4.0~pre2+git141-g6d40dace6358-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 | """
The Redis transport. Stats are pickled and written to
a redis queue
"""
import time
import signal
import platform
import traceback
import threading
import Bcfg2.Options
from Bcfg2.Reporting.Transport.base import TransportBase, TransportError
from Bcfg2.Compat import cPickle
try:
import redis
HAS_REDIS = True
except ImportError:
HAS_REDIS = False
class RedisMessage(object):
"""An rpc message"""
def __init__(self, channel, method, args=[], kwargs=dict()):
self.channel = channel
self.method = method
self.args = args
self.kwargs = kwargs
class RedisTransport(TransportBase):
""" Redis Transport Class """
STATS_KEY = 'bcfg2_statistics'
COMMAND_KEY = 'bcfg2_command'
options = TransportBase.options + [
Bcfg2.Options.Option(
cf=('reporting', 'redis_host'), dest="reporting_redis_host",
default='127.0.0.1', help='Reporting Redis host'),
Bcfg2.Options.Option(
cf=('reporting', 'redis_port'), dest="reporting_redis_port",
default=6379, type=int, help='Reporting Redis port'),
Bcfg2.Options.Option(
cf=('reporting', 'redis_db'), dest="reporting_redis_db",
default=0, type=int, help='Reporting Redis DB')]
def __init__(self):
super(RedisTransport, self).__init__()
self._commands = None
self.logger.error("Warning: RedisTransport is experimental")
if not HAS_REDIS:
self.logger.error("redis python module is not available")
raise TransportError
self._redis = redis.Redis(
host=Bcfg2.Options.setup.reporting_redis_host,
port=Bcfg2.Options.setup.reporting_redis_port,
db=Bcfg2.Options.setup.reporting_redis_db)
def start_monitor(self, collector):
"""Start the monitor. Eventaully start the command thread"""
self._commands = threading.Thread(target=self.monitor_thread,
args=(self._redis, collector))
self._commands.start()
def store(self, hostname, metadata, stats):
"""Store the file to disk"""
try:
payload = cPickle.dumps(dict(hostname=hostname,
metadata=metadata,
stats=stats))
except: # pylint: disable=W0702
msg = "%s: Failed to build interaction object: %s" % \
(self.__class__.__name__,
traceback.format_exc().splitlines()[-1])
self.logger.error(msg)
raise TransportError(msg)
try:
self._redis.rpush(RedisTransport.STATS_KEY, payload)
except redis.RedisError:
self.logger.error("Failed to store interaction for %s: %s" %
(hostname, traceback.format_exc().splitlines()[-1]))
def fetch(self):
"""Fetch the next object"""
try:
payload = self._redis.blpop(RedisTransport.STATS_KEY, timeout=5)
if payload:
return cPickle.loads(payload[1])
except redis.RedisError:
self.logger.error("Failed to fetch an interaction: %s" %
(traceback.format_exc().splitlines()[-1]))
except cPickle.UnpicklingError:
self.logger.error("Failed to unpickle payload: %s" %
traceback.format_exc().splitlines()[-1])
raise TransportError
return None
def shutdown(self):
"""Called at program exit"""
self._redis = None
def rpc(self, method, *args, **kwargs):
"""
Send a command to the queue. Timeout after 10 seconds
"""
pubsub = self._redis.pubsub()
channel = "%s%s" % (platform.node(), int(time.time()))
pubsub.subscribe(channel)
self._redis.rpush(RedisTransport.COMMAND_KEY,
cPickle.dumps(RedisMessage(channel, method, args, kwargs)))
resp = pubsub.listen()
signal.signal(signal.SIGALRM, self.shutdown)
signal.alarm(10)
resp.next() # clear subscribe message
response = resp.next()
pubsub.unsubscribe()
try:
return cPickle.loads(response['data'])
except: # pylint: disable=W0702
msg = "%s: Failed to receive response: %s" % \
(self.__class__.__name__,
traceback.format_exc().splitlines()[-1])
self.logger.error(msg)
return None
def monitor_thread(self, rclient, collector):
"""Watch the COMMAND_KEY queue for rpc commands"""
self.logger.info("Command thread started")
while not collector.terminate.isSet():
try:
payload = rclient.blpop(RedisTransport.COMMAND_KEY, timeout=5)
if not payload:
continue
message = cPickle.loads(payload[1])
if not isinstance(message, RedisMessage):
self.logger.error("Message \"%s\" is not a RedisMessage" %
message)
if not message.method in collector.storage.__class__.__rmi__ or\
not hasattr(collector.storage, message.method):
self.logger.error(
"Unknown method %s called on storage engine %s" %
(message.method, collector.storage.__class__.__name__))
raise TransportError
try:
cls_method = getattr(collector.storage, message.method)
response = cls_method(*message.args, **message.kwargs)
response = cPickle.dumps(response)
except:
self.logger.error("RPC method %s failed: %s" %
(message.method, traceback.format_exc().splitlines()[-1]))
raise TransportError
rclient.publish(message.channel, response)
except redis.RedisError:
self.logger.error("Failed to fetch an interaction: %s" %
(traceback.format_exc().splitlines()[-1]))
except cPickle.UnpicklingError:
self.logger.error("Failed to unpickle payload: %s" %
traceback.format_exc().splitlines()[-1])
except TransportError:
pass
except: # pylint: disable=W0702
self.logger.error("Unhandled exception in command thread: %s" %
traceback.format_exc().splitlines()[-1])
self.logger.info("Command thread shutdown")
|