/usr/lib/python3/dist-packages/rediscluster/utils.py is in python3-rediscluster 1.3.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | # -*- coding: utf-8 -*-
from socket import gethostbyaddr
from functools import wraps
# rediscluster imports
from .exceptions import (
RedisClusterException, ClusterDownError
)
# 3rd party imports
from redis._compat import basestring, nativestr
def bool_ok(response, *args, **kwargs):
"""
Borrowed from redis._compat becuase that method to not support extra arguments
when used in a cluster environment.
"""
return nativestr(response) == 'OK'
def string_keys_to_dict(key_strings, callback):
"""
Maps each string in `key_strings` to `callback` function
and return as a dict.
"""
return dict.fromkeys(key_strings, callback)
def dict_merge(*dicts):
"""
Merge all provided dicts into 1 dict.
"""
merged = {}
for d in dicts:
if not isinstance(d, dict):
raise ValueError('Value should be of dict type')
else:
merged.update(d)
return merged
def blocked_command(self, command):
"""
Raises a `RedisClusterException` mentioning the command is blocked.
"""
raise RedisClusterException("Command: {0} is blocked in redis cluster mode".format(command))
def merge_result(command, res):
"""
Merge all items in `res` into a list.
This command is used when sending a command to multiple nodes
and they result from each node should be merged into a single list.
"""
if not isinstance(res, dict):
raise ValueError('Value should be of dict type')
result = set([])
for _, v in res.items():
for value in v:
result.add(value)
return list(result)
def first_key(command, res):
"""
Returns the first result for the given command.
If more then 1 result is returned then a `RedisClusterException` is raised.
"""
if not isinstance(res, dict):
raise ValueError('Value should be of dict type')
if len(res.keys()) != 1:
raise RedisClusterException("More then 1 result from command: {0}".format(command))
return list(res.values())[0]
def clusterdown_wrapper(func):
"""
Wrapper for CLUSTERDOWN error handling.
If the cluster reports it is down it is assumed that:
- connection_pool was disconnected
- connection_pool was reseted
- refereh_table_asap set to True
It will try 3 times to rerun the command and raises ClusterDownException if it continues to fail.
"""
@wraps(func)
def inner(*args, **kwargs):
for _ in range(0, 3):
try:
return func(*args, **kwargs)
except ClusterDownError:
# Try again with the new cluster setup. All other errors
# should be raised.
pass
# If it fails 3 times then raise exception back to caller
raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster")
return inner
def nslookup(node_ip):
"""
"""
if ':' not in node_ip:
return gethostbyaddr(node_ip)[0]
ip, port = node_ip.split(':')
return '{0}:{1}'.format(gethostbyaddr(ip)[0], port)
def parse_cluster_slots(resp, **options):
"""
"""
current_host = options.get('current_host', '')
def fix_server(*args):
return (args[0] or current_host, args[1])
slots = {}
for slot in resp:
start, end, master = slot[:3]
slaves = slot[3:]
slots[start, end] = {
'master': fix_server(*master),
'slaves': [fix_server(*slave) for slave in slaves],
}
return slots
def parse_cluster_nodes(resp, **options):
"""
@see: http://redis.io/commands/cluster-nodes # string
@see: http://redis.io/commands/cluster-slaves # list of string
"""
current_host = options.get('current_host', '')
def parse_slots(s):
slots, migrations = [], []
for r in s.split(' '):
if '->-' in r:
slot_id, dst_node_id = r[1:-1].split('->-', 1)
migrations.append({
'slot': int(slot_id),
'node_id': dst_node_id,
'state': 'migrating'
})
elif '-<-' in r:
slot_id, src_node_id = r[1:-1].split('-<-', 1)
migrations.append({
'slot': int(slot_id),
'node_id': src_node_id,
'state': 'importing'
})
elif '-' in r:
start, end = r.split('-')
slots.extend(range(int(start), int(end) + 1))
else:
slots.append(int(r))
return slots, migrations
if isinstance(resp, basestring):
resp = resp.splitlines()
nodes = []
for line in resp:
parts = line.split(' ', 8)
self_id, addr, flags, master_id, ping_sent, \
pong_recv, config_epoch, link_state = parts[:8]
host, port = addr.rsplit(':', 1)
node = {
'id': self_id,
'host': host or current_host,
'port': int(port),
'flags': tuple(flags.split(',')),
'master': master_id if master_id != '-' else None,
'ping-sent': int(ping_sent),
'pong-recv': int(pong_recv),
'link-state': link_state,
'slots': [],
'migrations': [],
}
if len(parts) >= 9:
slots, migrations = parse_slots(parts[8])
node['slots'], node['migrations'] = tuple(slots), migrations
nodes.append(node)
return nodes
|