/usr/lib/python2.7/dist-packages/Bcfg2/Reporting/Transport/LocalFilesystem.py is in bcfg2-server 1.4.0~pre2+git141-g6d40dace6358-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | """
The local transport. Stats are pickled and written to
<repo>/store/<hostname>-timestamp
Leans on FileMonitor to detect changes
"""
import os
import select
import time
import traceback
import Bcfg2.Options
import Bcfg2.Server.FileMonitor
from Bcfg2.Reporting.Collector import ReportingCollector, ReportingError
from Bcfg2.Reporting.Transport.base import TransportBase, TransportError
from Bcfg2.Compat import cPickle
class LocalFilesystem(TransportBase):
options = TransportBase.options + [Bcfg2.Options.Common.filemonitor]
def __init__(self):
super(LocalFilesystem, self).__init__()
self.work_path = "%s/work" % self.data
self.debug_log("LocalFilesystem: work path %s" % self.work_path)
self.fmon = None
self._phony_collector = None
#setup our local paths or die
if not os.path.exists(self.work_path):
try:
os.makedirs(self.work_path)
except:
self.logger.error("%s: Unable to create storage: %s" %
(self.__class__.__name__,
traceback.format_exc().splitlines()[-1]))
raise TransportError
def set_debug(self, debug):
rv = TransportBase.set_debug(self, debug)
if self.fmon is not None:
self.fmon.set_debug(debug)
return rv
def start_monitor(self, collector):
"""Start the file monitor. Most of this comes from BaseCore"""
try:
self.fmon = Bcfg2.Server.FileMonitor.get_fam()
except IOError:
msg = "Failed to instantiate fam driver %s" % \
Bcfg2.Options.setup.filemonitor
self.logger.error(msg, exc_info=1)
raise TransportError(msg)
if self.debug_flag:
self.fmon.set_debug(self.debug_flag)
self.fmon.start()
self.fmon.AddMonitor(self.work_path, self)
def store(self, hostname, metadata, stats):
"""Store the file to disk"""
try:
payload = cPickle.dumps(dict(hostname=hostname,
metadata=metadata,
stats=stats))
except: # pylint: disable=W0702
msg = "%s: Failed to build interaction object: %s" % \
(self.__class__.__name__,
traceback.format_exc().splitlines()[-1])
self.logger.error(msg)
raise TransportError(msg)
fname = "%s-%s" % (hostname, time.time())
save_file = os.path.join(self.work_path, fname)
tmp_file = os.path.join(self.work_path, "." + fname)
if os.path.exists(save_file):
self.logger.error("%s: Oops.. duplicate statistic in directory." %
self.__class__.__name__)
raise TransportError
# using a tmpfile to hopefully avoid the file monitor from grabbing too
# soon
saved = open(tmp_file, 'wb')
try:
saved.write(payload)
except IOError:
self.logger.error("Failed to store interaction for %s: %s" %
(hostname, traceback.format_exc().splitlines()[-1]))
os.unlink(tmp_file)
saved.close()
os.rename(tmp_file, save_file)
def fetch(self):
"""Fetch the next object"""
event = None
fmonfd = self.fmon.fileno()
if self.fmon.pending():
event = self.fmon.get_event()
elif fmonfd:
select.select([fmonfd], [], [], self.timeout)
if self.fmon.pending():
event = self.fmon.get_event()
else:
# pseudo.. if nothings pending sleep and loop
time.sleep(self.timeout)
if not event or event.filename == self.work_path:
return None
#deviate from the normal routines here we only want one event
etype = event.code2str()
self.debug_log("Recieved event %s for %s" % (etype, event.filename))
if os.path.basename(event.filename)[0] == '.':
return None
if etype in ('created', 'exists'):
self.debug_log("Handling event %s" % event.filename)
payload = os.path.join(self.work_path, event.filename)
try:
payloadfd = open(payload, "rb")
interaction = cPickle.load(payloadfd)
payloadfd.close()
os.unlink(payload)
return interaction
except IOError:
self.logger.error("Failed to read payload: %s" %
traceback.format_exc().splitlines()[-1])
except cPickle.UnpicklingError:
self.logger.error("Failed to unpickle payload: %s" %
traceback.format_exc().splitlines()[-1])
payloadfd.close()
raise TransportError
return None
def shutdown(self):
"""Called at program exit"""
if self.fmon:
self.fmon.shutdown()
if self._phony_collector:
self._phony_collector.shutdown()
def rpc(self, method, *args, **kwargs):
"""
Here this is more of a dummy. Rather then start a layer
which doesn't exist or muck with files, start the collector
This will all change when other layers are added
"""
try:
if not self._phony_collector:
self._phony_collector = ReportingCollector()
except ReportingError:
raise TransportError
except:
self.logger.error("Failed to load collector: %s" %
traceback.format_exc().splitlines()[-1])
raise TransportError
if not method in self._phony_collector.storage.__class__.__rmi__ or \
not hasattr(self._phony_collector.storage, method):
self.logger.error("Unknown method %s called on storage engine %s" %
(method, self._phony_collector.storage.__class__.__name__))
raise TransportError
try:
cls_method = getattr(self._phony_collector.storage, method)
return cls_method(*args, **kwargs)
except:
self.logger.error("RPC method %s failed: %s" %
(method, traceback.format_exc().splitlines()[-1]))
raise TransportError
|