/usr/lib/python2.7/dist-packages/carbon/routers.py is in graphite-carbon 1.0.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 | import imp
from carbon.hashing import ConsistentHashRing
from carbon.util import PluginRegistrar
class DatapointRouter(object):
"Abstract base class for datapoint routing logic implementations"
__metaclass__ = PluginRegistrar
plugins = {}
def addDestination(self, destination):
"destination is a (host, port, instance) triple"
raise NotImplemented()
def removeDestination(self, destination):
"destination is a (host, port, instance) triple"
raise NotImplemented()
def getDestinations(self, key):
"""Generate the destinations where the given routing key should map to. Only
destinations which are configured (addDestination has been called for it)
may be generated by this method."""
raise NotImplemented()
class RelayRulesRouter(DatapointRouter):
plugin_name = 'rules'
def __init__(self, settings):
# We need to import relayrules here to avoid circular dependencies.
from carbon.relayrules import loadRelayRules
rules_path = settings["relay-rules"]
self.rules_path = rules_path
self.rules = loadRelayRules(rules_path)
self.destinations = set()
def addDestination(self, destination):
self.destinations.add(destination)
def removeDestination(self, destination):
self.destinations.discard(destination)
def getDestinations(self, key):
for rule in self.rules:
if rule.matches(key):
for destination in rule.destinations:
if destination in self.destinations:
yield destination
if not rule.continue_matching:
return
class ConsistentHashingRouter(DatapointRouter):
plugin_name = 'consistent-hashing'
def __init__(self, settings):
replication_factor = settings.REPLICATION_FACTOR
diverse_replicas = settings.DIVERSE_REPLICAS
self.replication_factor = int(replication_factor)
self.diverse_replicas = diverse_replicas
self.instance_ports = {} # { (server, instance) : port }
self.ring = ConsistentHashRing([])
def addDestination(self, destination):
(server, port, instance) = destination
if (server, instance) in self.instance_ports:
raise Exception("destination instance (%s, %s) already configured" % (server, instance))
self.instance_ports[(server, instance)] = port
self.ring.add_node((server, instance))
def removeDestination(self, destination):
(server, port, instance) = destination
if (server, instance) not in self.instance_ports:
raise Exception("destination instance (%s, %s) not configured" % (server, instance))
del self.instance_ports[(server, instance)]
self.ring.remove_node((server, instance))
def getDestinations(self, metric):
key = self.getKey(metric)
if self.diverse_replicas:
used_servers = set()
for (server, instance) in self.ring.get_nodes(key):
if server in used_servers:
continue
else:
used_servers.add(server)
port = self.instance_ports[(server, instance)]
yield (server, port, instance)
if len(used_servers) >= self.replication_factor:
return
else:
for (count, node) in enumerate(self.ring.get_nodes(key)):
if count == self.replication_factor:
return
(server, instance) = node
port = self.instance_ports[(server, instance)]
yield (server, port, instance)
def getKey(self, metric):
return metric
def setKeyFunction(self, func):
self.getKey = func
def setKeyFunctionFromModule(self, keyfunc_spec):
module_path, func_name = keyfunc_spec.rsplit(':', 1)
module_file = open(module_path, 'U')
description = ('.py', 'U', imp.PY_SOURCE)
module = imp.load_module('keyfunc_module', module_file, module_path, description)
keyfunc = getattr(module, func_name)
self.setKeyFunction(keyfunc)
class AggregatedConsistentHashingRouter(DatapointRouter):
plugin_name = 'aggregated-consistent-hashing'
def __init__(self, settings):
from carbon.aggregator.rules import RuleManager
aggregation_rules_path = settings["aggregation-rules"]
if aggregation_rules_path:
RuleManager.read_from(aggregation_rules_path)
self.hash_router = ConsistentHashingRouter(settings)
self.agg_rules_manager = RuleManager
def addDestination(self, destination):
self.hash_router.addDestination(destination)
def removeDestination(self, destination):
self.hash_router.removeDestination(destination)
def getDestinations(self, key):
# resolve metric to aggregate forms
resolved_metrics = []
for rule in self.agg_rules_manager.rules:
aggregate_metric = rule.get_aggregate_metric(key)
if aggregate_metric is None:
continue
else:
resolved_metrics.append(aggregate_metric)
# if the metric will not be aggregated, send it raw
# (will pass through aggregation)
if len(resolved_metrics) == 0:
resolved_metrics.append(key)
# get consistent hashing destinations based on aggregate forms
destinations = set()
for resolved_metric in resolved_metrics:
for destination in self.hash_router.getDestinations(resolved_metric):
destinations.add(destination)
for destination in destinations:
yield destination
try:
import mmh3
except ImportError:
pass
else:
class FastHashRing(object):
"""A very fast hash 'ring'.
Instead of trying to avoid rebalancing data when changing
the list of nodes we try to making routing as fast as we
can. It's good enough because the current rebalancing
tools performances depend on the total number of metrics
and not the number of metrics to rebalance.
"""
def __init__(self):
self.nodes = set()
self.sorted_nodes = []
def _hash(self, key):
return mmh3.hash(key)
def _update_nodes(self):
self.sorted_nodes = sorted(
[(self._hash(str(n)), n) for n in self.nodes],
key=lambda v: v[0]
)
def add_node(self, node):
self.nodes.add(node)
self._update_nodes()
def remove_node(self, node):
self.nodes.discard(node)
self._update_nodes()
def get_nodes(self, key):
seed = self._hash(key) % len(self.nodes)
for n in xrange(seed, seed + len(self.nodes)):
yield self.sorted_nodes[n % len(self.sorted_nodes)][1]
class FastHashingRouter(ConsistentHashingRouter):
"""Same as ConsistentHashingRouter but using FastHashRing."""
plugin_name = 'fast-hashing'
def __init__(self, settings):
super(FastHashingRouter, self).__init__(settings)
self.ring = FastHashRing()
class FastAggregatedHashingRouter(AggregatedConsistentHashingRouter):
"""Same as AggregatedConsistentHashingRouter but using FastHashRing."""
plugin_name = 'fast-aggregated-hashing'
def __init__(self, settings):
super(FastAggregatedHashingRouter, self).__init__(settings)
self.hash_router.ring = FastHashRing()
|