/usr/lib/python2.7/dist-packages/graphite/remote_storage.py is in graphite-web 0.9.12+debian-6.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | import socket
import time
import httplib
from urllib import urlencode
from django.core.cache import cache
from django.conf import settings
from graphite.render.hashing import compactHash
from graphite.util import unpickle
class RemoteStore(object):
lastFailure = 0.0
retryDelay = settings.REMOTE_STORE_RETRY_DELAY
available = property(lambda self: time.time() - self.lastFailure > self.retryDelay)
def __init__(self, host):
self.host = host
def find(self, query):
request = FindRequest(self, query)
request.send()
return request
def fail(self):
self.lastFailure = time.time()
class FindRequest:
suppressErrors = True
def __init__(self, store, query):
self.store = store
self.query = query
self.connection = None
self.cacheKey = compactHash('find:%s:%s' % (self.store.host, query))
self.cachedResults = None
def send(self):
self.cachedResults = cache.get(self.cacheKey)
if self.cachedResults:
return
self.connection = HTTPConnectionWithTimeout(self.store.host)
self.connection.timeout = settings.REMOTE_STORE_FIND_TIMEOUT
query_params = [
('local', '1'),
('format', 'pickle'),
('query', self.query),
]
query_string = urlencode(query_params)
try:
self.connection.request('GET', '/metrics/find/?' + query_string)
except:
self.store.fail()
if not self.suppressErrors:
raise
def get_results(self):
if self.cachedResults:
return self.cachedResults
if not self.connection:
self.send()
try:
response = self.connection.getresponse()
assert response.status == 200, "received error response %s - %s" % (response.status, response.reason)
result_data = response.read()
results = unpickle.loads(result_data)
except:
self.store.fail()
if not self.suppressErrors:
raise
else:
results = []
resultNodes = [ RemoteNode(self.store, node['metric_path'], node['isLeaf']) for node in results ]
cache.set(self.cacheKey, resultNodes, settings.REMOTE_FIND_CACHE_DURATION)
self.cachedResults = resultNodes
return resultNodes
class RemoteNode:
context = {}
def __init__(self, store, metric_path, isLeaf):
self.store = store
self.fs_path = None
self.metric_path = metric_path
self.real_metric = metric_path
self.name = metric_path.split('.')[-1]
self.__isLeaf = isLeaf
def fetch(self, startTime, endTime):
if not self.__isLeaf:
return []
query_params = [
('target', self.metric_path),
('format', 'pickle'),
('from', str( int(startTime) )),
('until', str( int(endTime) ))
]
query_string = urlencode(query_params)
connection = HTTPConnectionWithTimeout(self.store.host)
connection.timeout = settings.REMOTE_STORE_FETCH_TIMEOUT
connection.request('GET', '/render/?' + query_string)
response = connection.getresponse()
assert response.status == 200, "Failed to retrieve remote data: %d %s" % (response.status, response.reason)
rawData = response.read()
seriesList = unpickle.loads(rawData)
assert len(seriesList) == 1, "Invalid result: seriesList=%s" % str(seriesList)
series = seriesList[0]
timeInfo = (series['start'], series['end'], series['step'])
return (timeInfo, series['values'])
def isLeaf(self):
return self.__isLeaf
# This is a hack to put a timeout in the connect() of an HTTP request.
# Python 2.6 supports this already, but many Graphite installations
# are not on 2.6 yet.
class HTTPConnectionWithTimeout(httplib.HTTPConnection):
timeout = 30
def connect(self):
msg = "getaddrinfo returns an empty list"
for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
self.sock = socket.socket(af, socktype, proto)
try:
self.sock.settimeout( float(self.timeout) ) # default self.timeout is an object() in 2.6
except:
pass
self.sock.connect(sa)
self.sock.settimeout(None)
except socket.error, msg:
if self.sock:
self.sock.close()
self.sock = None
continue
break
if not self.sock:
raise socket.error, msg
|