/usr/lib/python2.7/dist-packages/Bcfg2/Reporting/Transport/DirectStore.py is in bcfg2-server 1.4.0~pre2+git141-g6d40dace6358-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | """ Reporting Transport that stores statistics data directly in the
storage backend """
import os
import sys
import time
import threading
import Bcfg2.Options
from Bcfg2.Reporting.Transport.base import TransportBase, TransportError
from Bcfg2.Compat import Queue, Full, Empty, cPickle
class DirectStore(TransportBase, threading.Thread):
options = TransportBase.options + [Bcfg2.Options.Common.reporting_storage]
def __init__(self):
TransportBase.__init__(self)
threading.Thread.__init__(self)
self.save_file = os.path.join(self.data, ".saved")
self.storage = Bcfg2.Options.setup.reporting_storage()
self.storage.validate()
self.queue = Queue(100000)
self.terminate = threading.Event()
self.debug_log("Reporting: Starting %s thread" %
self.__class__.__name__)
self.start()
def shutdown(self):
self.terminate.set()
def store(self, hostname, metadata, stats):
try:
self.queue.put_nowait(dict(hostname=hostname,
metadata=metadata,
stats=stats))
except Full:
self.logger.warning("Reporting: Queue is full, "
"dropping statistics")
def run(self):
if not self._load():
self.logger.warning("Reporting: Failed to load saved data, "
"DirectStore thread exiting")
return
while not self.terminate.isSet() and self.queue is not None:
try:
interaction = self.queue.get(block=True,
timeout=self.timeout)
start = time.time()
self.storage.import_interaction(interaction)
self.logger.info("Imported data for %s in %s seconds" %
(interaction.get('hostname', '<unknown>'),
time.time() - start))
except Empty:
self.debug_log("Reporting: Queue is empty")
continue
except:
err = sys.exc_info()[1]
self.logger.error("Reporting: Could not import interaction: %s"
% err)
continue
self.debug_log("Reporting: Stopping %s thread" %
self.__class__.__name__)
if self.queue is not None and not self.queue.empty():
self._save()
def fetch(self):
""" no collector is necessary with this backend """
pass
def start_monitor(self, collector):
""" no collector is necessary with this backend """
pass
def rpc(self, method, *args, **kwargs):
try:
return getattr(self.storage, method)(*args, **kwargs)
except: # pylint: disable=W0702
msg = "Reporting: RPC method %s failed: %s" % (method,
sys.exc_info()[1])
self.logger.error(msg)
raise TransportError(msg)
def _save(self):
""" Save any saved data to a file """
self.debug_log("Reporting: Saving pending data to %s" %
self.save_file)
saved_data = []
try:
while not self.queue.empty():
saved_data.append(self.queue.get_nowait())
except Empty:
pass
try:
savefile = open(self.save_file, 'w')
cPickle.dump(saved_data, savefile)
savefile.close()
self.logger.info("Saved pending Reporting data")
except (IOError, TypeError):
err = sys.exc_info()[1]
self.logger.warning("Failed to save pending data: %s" % err)
def _load(self):
""" Load any saved data from a file """
if not os.path.exists(self.save_file):
self.debug_log("Reporting: No saved data to load")
return True
saved_data = []
try:
savefile = open(self.save_file, 'r')
saved_data = cPickle.load(savefile)
savefile.close()
except (IOError, cPickle.UnpicklingError):
err = sys.exc_info()[1]
self.logger.warning("Failed to load saved data: %s" % err)
return False
for interaction in saved_data:
# check that shutdown wasnt called early
if self.terminate.isSet():
self.logger.warning("Reporting: Shutdown called while loading "
" saved data")
return False
try:
self.queue.put_nowait(interaction)
except Full:
self.logger.warning("Reporting: Queue is full, failed to "
"load saved interaction data")
break
try:
os.unlink(self.save_file)
except OSError:
self.logger.error("Reporting: Failed to unlink save file: %s" %
self.save_file)
self.logger.info("Reporting: Loaded saved interaction data")
return True
|