/usr/share/pyshared/gearman/connection_manager.py is in python-gearman 2.0.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 | import logging
import select as select_lib
import gearman.util
from gearman.connection import GearmanConnection
from gearman.constants import _DEBUG_MODE_
from gearman.errors import ConnectionError, ServerUnavailable
from gearman.job import GearmanJob, GearmanJobRequest
from gearman import compat
gearman_logger = logging.getLogger(__name__)
class DataEncoder(object):
@classmethod
def encode(cls, encodable_object):
raise NotImplementedError
@classmethod
def decode(cls, decodable_string):
raise NotImplementedError
class NoopEncoder(DataEncoder):
"""Provide common object dumps for all communications over gearman"""
@classmethod
def _enforce_byte_string(cls, given_object):
if type(given_object) != str:
raise TypeError("Expecting byte string, got %r" % type(given_object))
@classmethod
def encode(cls, encodable_object):
cls._enforce_byte_string(encodable_object)
return encodable_object
@classmethod
def decode(cls, decodable_string):
cls._enforce_byte_string(decodable_string)
return decodable_string
class GearmanConnectionManager(object):
"""Abstract base class for any Gearman-type client that needs to connect/listen to multiple connections
Mananges and polls a group of gearman connections
Forwards all communication between a connection and a command handler
The state of a connection is represented within the command handler
Automatically encodes all 'data' fields as specified in protocol.py
"""
command_handler_class = None
connection_class = GearmanConnection
job_class = GearmanJob
job_request_class = GearmanJobRequest
data_encoder = NoopEncoder
def __init__(self, host_list=None):
assert self.command_handler_class is not None, 'GearmanClientBase did not receive a command handler class'
self.connection_list = []
host_list = host_list or []
for hostport_tuple in host_list:
self.add_connection(hostport_tuple)
self.handler_to_connection_map = {}
self.connection_to_handler_map = {}
self.handler_initial_state = {}
def shutdown(self):
# Shutdown all our connections one by one
for gearman_connection in self.connection_list:
gearman_connection.close()
###################################
# Connection management functions #
###################################
def add_connection(self, hostport_tuple):
"""Add a new connection to this connection manager"""
gearman_host, gearman_port = gearman.util.disambiguate_server_parameter(hostport_tuple)
client_connection = self.connection_class(host=gearman_host, port=gearman_port)
self.connection_list.append(client_connection)
return client_connection
def establish_connection(self, current_connection):
"""Attempt to connect... if not previously connected, create a new CommandHandler to manage this connection's state
!NOTE! This function can throw a ConnectionError which deriving ConnectionManagers should catch
"""
assert current_connection in self.connection_list, "Unknown connection - %r" % current_connection
if current_connection.connected:
return current_connection
# !NOTE! May throw a ConnectionError
current_connection.connect()
# Initiate a new command handler every time we start a new connection
current_handler = self.command_handler_class(connection_manager=self)
# Handler to connection map for CommandHandler -> Connection interactions
# Connection to handler map for Connection -> CommandHandler interactions
self.handler_to_connection_map[current_handler] = current_connection
self.connection_to_handler_map[current_connection] = current_handler
current_handler.initial_state(**self.handler_initial_state)
return current_connection
def poll_connections_once(self, submitted_connections, timeout=None):
"""Does a single robust select, catching socket errors"""
select_connections = set(current_connection for current_connection in submitted_connections if current_connection.connected)
rd_connections = set()
wr_connections = set()
ex_connections = set()
if timeout is not None and timeout < 0.0:
return rd_connections, wr_connections, ex_connections
successful_select = False
while not successful_select and select_connections:
select_connections -= ex_connections
check_rd_connections = [current_connection for current_connection in select_connections if current_connection.readable()]
check_wr_connections = [current_connection for current_connection in select_connections if current_connection.writable()]
try:
rd_list, wr_list, ex_list = gearman.util.select(check_rd_connections, check_wr_connections, select_connections, timeout=timeout)
rd_connections |= set(rd_list)
wr_connections |= set(wr_list)
ex_connections |= set(ex_list)
successful_select = True
except (select_lib.error, ConnectionError):
# On any exception, we're going to assume we ran into a socket exception
# We'll need to fish for bad connections as suggested at
#
# http://www.amk.ca/python/howto/sockets/
for conn_to_test in select_connections:
try:
_, _, _ = gearman.util.select([conn_to_test], [], [], timeout=0)
except (select_lib.error, ConnectionError):
rd_connections.discard(conn_to_test)
wr_connections.discard(conn_to_test)
ex_connections.add(conn_to_test)
gearman_logger.error('select error: %r' % conn_to_test)
if _DEBUG_MODE_:
gearman_logger.debug('select :: Poll - %d :: Read - %d :: Write - %d :: Error - %d', \
len(select_connections), len(rd_connections), len(wr_connections), len(ex_connections))
return rd_connections, wr_connections, ex_connections
def handle_connection_activity(self, rd_connections, wr_connections, ex_connections):
"""Process all connection activity... executes all handle_* callbacks"""
dead_connections = set()
for current_connection in rd_connections:
try:
self.handle_read(current_connection)
except ConnectionError:
dead_connections.add(current_connection)
for current_connection in wr_connections:
try:
self.handle_write(current_connection)
except ConnectionError:
dead_connections.add(current_connection)
for current_connection in ex_connections:
self.handle_error(current_connection)
for current_connection in dead_connections:
self.handle_error(current_connection)
failed_connections = ex_connections | dead_connections
return rd_connections, wr_connections, failed_connections
def poll_connections_until_stopped(self, submitted_connections, callback_fxn, timeout=None):
"""Continue to poll our connections until we receive a stopping condition"""
stopwatch = gearman.util.Stopwatch(timeout)
any_activity = False
callback_ok = callback_fxn(any_activity)
connection_ok = compat.any(current_connection.connected for current_connection in submitted_connections)
while connection_ok and callback_ok:
time_remaining = stopwatch.get_time_remaining()
if time_remaining == 0.0:
break
# Do a single robust select and handle all connection activity
read_connections, write_connections, dead_connections = self.poll_connections_once(submitted_connections, timeout=time_remaining)
self.handle_connection_activity(read_connections, write_connections, dead_connections)
any_activity = compat.any([read_connections, write_connections, dead_connections])
callback_ok = callback_fxn(any_activity)
connection_ok = compat.any(current_connection.connected for current_connection in submitted_connections)
# We should raise here if we have no alive connections (don't go into a select polling loop with no connections)
if not connection_ok:
raise ServerUnavailable('Found no valid connections in list: %r' % self.connection_list)
return bool(connection_ok and callback_ok)
def handle_read(self, current_connection):
"""Handle all our pending socket data"""
current_handler = self.connection_to_handler_map[current_connection]
# Transfer data from socket -> buffer
current_connection.read_data_from_socket()
# Transfer command from buffer -> command queue
current_connection.read_commands_from_buffer()
# Notify the handler that we have commands to fetch
current_handler.fetch_commands()
def handle_write(self, current_connection):
# Transfer command from command queue -> buffer
current_connection.send_commands_to_buffer()
# Transfer data from buffer -> socket
current_connection.send_data_to_socket()
def handle_error(self, current_connection):
dead_handler = self.connection_to_handler_map.pop(current_connection, None)
if dead_handler:
dead_handler.on_io_error()
self.handler_to_connection_map.pop(dead_handler, None)
current_connection.close()
##################################
# Callbacks for Command Handlers #
##################################
def read_command(self, command_handler):
"""CommandHandlers call this function to fetch pending commands
NOTE: CommandHandlers have NO knowledge as to which connection they're representing
ConnectionManagers must forward inbound commands to CommandHandlers
"""
gearman_connection = self.handler_to_connection_map[command_handler]
cmd_tuple = gearman_connection.read_command()
if cmd_tuple is None:
return cmd_tuple
cmd_type, cmd_args = cmd_tuple
return cmd_type, cmd_args
def send_command(self, command_handler, cmd_type, cmd_args):
"""CommandHandlers call this function to send pending commands
NOTE: CommandHandlers have NO knowledge as to which connection they're representing
ConnectionManagers must forward outbound commands to Connections
"""
gearman_connection = self.handler_to_connection_map[command_handler]
gearman_connection.send_command(cmd_type, cmd_args)
def on_gearman_error(self, error_code, error_text):
gearman_logger.error('Received error from server: %s: %s' % (error_code, error_text))
return False
|