/usr/lib/python2.7/dist-packages/cbor/cbor_rpc_client.py is in python-cbor 0.1.24-1+b1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 | from __future__ import absolute_import
import logging
import random
import socket
import time
import cbor
logger = logging.getLogger(__name__)
class SocketReader(object):
'''
Simple adapter from socket.recv to file-like-read
'''
def __init__(self, sock):
self.socket = sock
self.timeout_seconds = 10.0
def read(self, num):
start = time.time()
data = self.socket.recv(num)
while len(data) < num:
now = time.time()
if now > (start + self.timeout_seconds):
break
ndat = self.socket.recv(num - len(data))
if ndat:
data += ndat
return data
class CborRpcClient(object):
'''Base class for all client objects.
This provides common `addr_family`, `address`, and `registry_addresses`
configuration parameters, and manages the connection back to the server.
Automatic retry and time based fallback is managed from
configuration parameters `retries` (default 5), and
`base_retry_seconds` (default 0.5). Retry time doubles on each
retry. E.g. try 0; wait 0.5s; try 1; wait 1s; try 2; wait 2s; try
3; wait 4s; try 4; wait 8s; try 5; FAIL. Total time waited just
under base_retry_seconds * (2 ** retries).
.. automethod:: __init__
.. automethod:: _rpc
.. automethod:: close
'''
def __init__(self, config=None):
self._socket_family = config.get('addr_family', socket.AF_INET)
# may need to be ('host', port)
self._socket_addr = config.get('address')
if self._socket_family == socket.AF_INET:
if not isinstance(self._socket_addr, tuple):
# python socket standard library insists this be tuple!
tsocket_addr = tuple(self._socket_addr)
assert len(tsocket_addr) == 2, 'address must be length-2 tuple ("hostname", port number), got {!r} tuplified to {!r}'.format(self._socket_addr, tsocket_addr)
self._socket_addr = tsocket_addr
self._socket = None
self._rfile = None
self._local_addr = None
self._message_count = 0
self._retries = config.get('retries', 5)
self._base_retry_seconds = float(config.get('base_retry_seconds', 0.5))
def _conn(self):
# lazy socket opener
if self._socket is None:
try:
self._socket = socket.create_connection(self._socket_addr)
self._local_addr = self._socket.getsockname()
except:
logger.error('error connecting to %r:%r', self._socket_addr[0],
self._socket_addr[1], exc_info=True)
raise
return self._socket
def close(self):
'''Close the connection to the server.
The next RPC call will reopen the connection.
'''
if self._socket is not None:
self._rfile = None
try:
self._socket.shutdown(socket.SHUT_RDWR)
self._socket.close()
except socket.error:
logger.warn('error closing lockd client socket',
exc_info=True)
self._socket = None
@property
def rfile(self):
if self._rfile is None:
conn = self._conn()
self._rfile = SocketReader(conn)
return self._rfile
def _rpc(self, method_name, params):
'''Call a method on the server.
Calls ``method_name(*params)`` remotely, and returns the results
of that function call. Expected return types are primitives, lists,
and dictionaries.
:raise Exception: if the server response was a failure
'''
mlog = logging.getLogger('cborrpc')
tryn = 0
delay = self._base_retry_seconds
self._message_count += 1
message = {
'id': self._message_count,
'method': method_name,
'params': params
}
mlog.debug('request %r', message)
buf = cbor.dumps(message)
errormessage = None
while True:
try:
conn = self._conn()
conn.send(buf)
response = cbor.load(self.rfile)
mlog.debug('response %r', response)
assert response['id'] == message['id']
if 'result' in response:
return response['result']
# From here on out we got a response, the server
# didn't have some weird intermittent error or
# non-connectivity, it gave us an error message. We
# don't retry that, we raise it to the user.
errormessage = response.get('error')
if errormessage and hasattr(errormessage,'get'):
errormessage = errormessage.get('message')
if not errormessage:
errormessage = repr(response)
break
except Exception as ex:
if tryn < self._retries:
tryn += 1
logger.debug('ex in %r (%s), retrying %s in %s sec...',
method_name, ex, tryn, delay, exc_info=True)
self.close()
time.sleep(delay)
delay *= 2
continue
logger.error('failed in rpc %r %r', method_name, params,
exc_info=True)
raise
raise Exception(errormessage)
if __name__ == '__main__':
import sys
logging.basicConfig(level=logging.DEBUG)
host,port = sys.argv[1].split(':')
if not host:
host = 'localhost'
port = int(port)
client = CborRpcClient({'address':(host,port)})
print(client._rpc(u'connect', [u'127.0.0.1:5432', u'root', u'aoeu']))
print(client._rpc(u'put', [[('k1','v1'), ('k2','v2')]]))
#print(client._rpc(u'ping', []))
#print(client._rpc(u'gnip', []))
client.close()
|