/usr/lib/python2.7/dist-packages/carbon/aggregator/buffers.py is in graphite-carbon 0.9.15-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 | import time
from twisted.internet.task import LoopingCall
from carbon.conf import settings
from carbon import log
class BufferManager:
def __init__(self):
self.buffers = {}
def __len__(self):
return len(self.buffers)
def get_buffer(self, metric_path):
if metric_path not in self.buffers:
log.aggregator("Allocating new metric buffer for %s" % metric_path)
self.buffers[metric_path] = MetricBuffer(metric_path)
return self.buffers[metric_path]
def clear(self):
for buffer in self.buffers.values():
buffer.close()
self.buffers.clear()
class MetricBuffer:
__slots__ = ('metric_path', 'interval_buffers', 'compute_task', 'configured',
'aggregation_frequency', 'aggregation_func')
def __init__(self, metric_path):
self.metric_path = metric_path
self.interval_buffers = {}
self.compute_task = None
self.configured = False
self.aggregation_frequency = None
self.aggregation_func = None
def input(self, datapoint):
(timestamp, value) = datapoint
interval = timestamp - (timestamp % self.aggregation_frequency)
if interval in self.interval_buffers:
buffer = self.interval_buffers[interval]
else:
buffer = self.interval_buffers[interval] = IntervalBuffer(interval)
buffer.input(datapoint)
def configure_aggregation(self, frequency, func):
self.aggregation_frequency = int(frequency)
self.aggregation_func = func
self.compute_task = LoopingCall(self.compute_value)
self.compute_task.start(settings['WRITE_BACK_FREQUENCY'] or frequency, now=False)
self.configured = True
def compute_value(self):
now = int( time.time() )
current_interval = now - (now % self.aggregation_frequency)
age_threshold = current_interval - (settings['MAX_AGGREGATION_INTERVALS'] * self.aggregation_frequency)
for buffer in self.interval_buffers.values():
if buffer.active:
value = self.aggregation_func(buffer.values)
datapoint = (buffer.interval, value)
events.metricGenerated(self.metric_path, datapoint)
instrumentation.increment('aggregateDatapointsSent')
buffer.mark_inactive()
if buffer.interval < age_threshold:
del self.interval_buffers[buffer.interval]
if not self.interval_buffers:
self.close()
self.configured = False
del BufferManager.buffers[self.metric_path]
def close(self):
if self.compute_task and self.compute_task.running:
self.compute_task.stop()
@property
def size(self):
return sum([len(buf.values) for buf in self.interval_buffers.values()])
class IntervalBuffer:
__slots__ = ('interval', 'values', 'active')
def __init__(self, interval):
self.interval = interval
self.values = []
self.active = True
def input(self, datapoint):
self.values.append( datapoint[1] )
self.active = True
def mark_inactive(self):
self.active = False
# Shared importable singleton
BufferManager = BufferManager()
# Avoid import circularity
from carbon import events, state, instrumentation
|