/usr/share/pyshared/gearman/client.py is in python-gearman 2.0.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 | import collections
from gearman import compat
import logging
import os
import random
import gearman.util
from gearman.connection_manager import GearmanConnectionManager
from gearman.client_handler import GearmanClientCommandHandler
from gearman.constants import PRIORITY_NONE, PRIORITY_LOW, PRIORITY_HIGH, JOB_UNKNOWN, JOB_PENDING
from gearman.errors import ConnectionError, ExceededConnectionAttempts, ServerUnavailable
gearman_logger = logging.getLogger(__name__)
# This number must be <= GEARMAN_UNIQUE_SIZE in gearman/libgearman/constants.h
RANDOM_UNIQUE_BYTES = 16
class GearmanClient(GearmanConnectionManager):
"""
GearmanClient :: Interface to submit jobs to a Gearman server
"""
command_handler_class = GearmanClientCommandHandler
def __init__(self, host_list=None, random_unique_bytes=RANDOM_UNIQUE_BYTES):
super(GearmanClient, self).__init__(host_list=host_list)
self.random_unique_bytes = random_unique_bytes
# The authoritative copy of all requests that this client knows about
# Ignores the fact if a request has been bound to a connection or not
self.request_to_rotating_connection_queue = compat.defaultdict(collections.deque)
def submit_job(self, task, data, unique=None, priority=PRIORITY_NONE, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None):
"""Submit a single job to any gearman server"""
job_info = dict(task=task, data=data, unique=unique, priority=priority)
completed_job_list = self.submit_multiple_jobs([job_info], background=background, wait_until_complete=wait_until_complete, max_retries=max_retries, poll_timeout=poll_timeout)
return gearman.util.unlist(completed_job_list)
def submit_multiple_jobs(self, jobs_to_submit, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None):
"""Takes a list of jobs_to_submit with dicts of
{'task': task, 'data': data, 'unique': unique, 'priority': priority}
"""
assert type(jobs_to_submit) in (list, tuple, set), "Expected multiple jobs, received 1?"
# Convert all job dicts to job request objects
requests_to_submit = [self._create_request_from_dictionary(job_info, background=background, max_retries=max_retries) for job_info in jobs_to_submit]
return self.submit_multiple_requests(requests_to_submit, wait_until_complete=wait_until_complete, poll_timeout=poll_timeout)
def submit_multiple_requests(self, job_requests, wait_until_complete=True, poll_timeout=None):
"""Take GearmanJobRequests, assign them connections, and request that they be done.
* Blocks until our jobs are accepted (should be fast) OR times out
* Optionally blocks until jobs are all complete
You MUST check the status of your requests after calling this function as "timed_out" or "state == JOB_UNKNOWN" maybe True
"""
assert type(job_requests) in (list, tuple, set), "Expected multiple job requests, received 1?"
stopwatch = gearman.util.Stopwatch(poll_timeout)
# We should always wait until our job is accepted, this should be fast
time_remaining = stopwatch.get_time_remaining()
processed_requests = self.wait_until_jobs_accepted(job_requests, poll_timeout=time_remaining)
# Optionally, we'll allow a user to wait until all jobs are complete with the same poll_timeout
time_remaining = stopwatch.get_time_remaining()
if wait_until_complete and bool(time_remaining != 0.0):
processed_requests = self.wait_until_jobs_completed(processed_requests, poll_timeout=time_remaining)
return processed_requests
def wait_until_jobs_accepted(self, job_requests, poll_timeout=None):
"""Go into a select loop until all our jobs have moved to STATE_PENDING"""
assert type(job_requests) in (list, tuple, set), "Expected multiple job requests, received 1?"
def is_request_pending(current_request):
return bool(current_request.state == JOB_PENDING)
# Poll until we know we've gotten acknowledgement that our job's been accepted
# If our connection fails while we're waiting for it to be accepted, automatically retry right here
def continue_while_jobs_pending(any_activity):
for current_request in job_requests:
if current_request.state == JOB_UNKNOWN:
self.send_job_request(current_request)
return compat.any(is_request_pending(current_request) for current_request in job_requests)
self.poll_connections_until_stopped(self.connection_list, continue_while_jobs_pending, timeout=poll_timeout)
# Mark any job still in the queued state to poll_timeout
for current_request in job_requests:
current_request.timed_out = is_request_pending(current_request)
return job_requests
def wait_until_jobs_completed(self, job_requests, poll_timeout=None):
"""Go into a select loop until all our jobs have completed or failed"""
assert type(job_requests) in (list, tuple, set), "Expected multiple job requests, received 1?"
def is_request_incomplete(current_request):
return not current_request.complete
# Poll until we get responses for all our functions
# Do NOT attempt to auto-retry connection failures as we have no idea how for a worker got
def continue_while_jobs_incomplete(any_activity):
for current_request in job_requests:
if is_request_incomplete(current_request) and current_request.state != JOB_UNKNOWN:
return True
return False
self.poll_connections_until_stopped(self.connection_list, continue_while_jobs_incomplete, timeout=poll_timeout)
# Mark any job still in the queued state to poll_timeout
for current_request in job_requests:
current_request.timed_out = is_request_incomplete(current_request)
if not current_request.timed_out:
self.request_to_rotating_connection_queue.pop(current_request, None)
return job_requests
def get_job_status(self, current_request, poll_timeout=None):
"""Fetch the job status of a single request"""
request_list = self.get_job_statuses([current_request], poll_timeout=poll_timeout)
return gearman.util.unlist(request_list)
def get_job_statuses(self, job_requests, poll_timeout=None):
"""Fetch the job status of a multiple requests"""
assert type(job_requests) in (list, tuple, set), "Expected multiple job requests, received 1?"
for current_request in job_requests:
current_request.status['last_time_received'] = current_request.status.get('time_received')
current_connection = current_request.job.connection
current_command_handler = self.connection_to_handler_map[current_connection]
current_command_handler.send_get_status_of_job(current_request)
return self.wait_until_job_statuses_received(job_requests, poll_timeout=poll_timeout)
def wait_until_job_statuses_received(self, job_requests, poll_timeout=None):
"""Go into a select loop until we received statuses on all our requests"""
assert type(job_requests) in (list, tuple, set), "Expected multiple job requests, received 1?"
def is_status_not_updated(current_request):
current_status = current_request.status
return bool(current_status.get('time_received') == current_status.get('last_time_received'))
# Poll to make sure we send out our request for a status update
def continue_while_status_not_updated(any_activity):
for current_request in job_requests:
if is_status_not_updated(current_request) and current_request.state != JOB_UNKNOWN:
return True
return False
self.poll_connections_until_stopped(self.connection_list, continue_while_status_not_updated, timeout=poll_timeout)
for current_request in job_requests:
current_request.status = current_request.status or {}
current_request.timed_out = is_status_not_updated(current_request)
return job_requests
def _create_request_from_dictionary(self, job_info, background=False, max_retries=0):
"""Takes a dictionary with fields {'task': task, 'unique': unique, 'data': data, 'priority': priority, 'background': background}"""
# Make sure we have a unique identifier for ALL our tasks
job_unique = job_info.get('unique')
if job_unique == '-':
job_unique = job_info['data']
elif not job_unique:
job_unique = os.urandom(self.random_unique_bytes).encode('hex')
current_job = self.job_class(connection=None, handle=None, task=job_info['task'], unique=job_unique, data=job_info['data'])
initial_priority = job_info.get('priority', PRIORITY_NONE)
max_attempts = max_retries + 1
current_request = self.job_request_class(current_job, initial_priority=initial_priority, background=background, max_attempts=max_attempts)
return current_request
def establish_request_connection(self, current_request):
"""Return a live connection for the given hash"""
# We'll keep track of the connections we're attempting to use so if we ever have to retry, we can use this history
rotating_connections = self.request_to_rotating_connection_queue.get(current_request, None)
if not rotating_connections:
shuffled_connection_list = list(self.connection_list)
random.shuffle(shuffled_connection_list)
rotating_connections = collections.deque(shuffled_connection_list)
self.request_to_rotating_connection_queue[current_request] = rotating_connections
failed_connections = 0
chosen_connection = None
for possible_connection in rotating_connections:
try:
chosen_connection = self.establish_connection(possible_connection)
break
except ConnectionError:
# Rotate our server list so we'll skip all our broken servers
failed_connections += 1
if not chosen_connection:
raise ServerUnavailable('Found no valid connections: %r' % self.connection_list)
# Rotate our server list so we'll skip all our broken servers
rotating_connections.rotate(-failed_connections)
return chosen_connection
def send_job_request(self, current_request):
"""Attempt to send out a job request"""
if current_request.connection_attempts >= current_request.max_connection_attempts:
raise ExceededConnectionAttempts('Exceeded %d connection attempt(s) :: %r' % (current_request.max_connection_attempts, current_request))
chosen_connection = self.establish_request_connection(current_request)
current_request.job.connection = chosen_connection
current_request.connection_attempts += 1
current_request.timed_out = False
current_command_handler = self.connection_to_handler_map[chosen_connection]
current_command_handler.send_job_request(current_request)
return current_request
|