/usr/lib/python2.7/dist-packages/celery/contrib/migrate.py is in python-celery 3.1.20-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 | # -*- coding: utf-8 -*-
"""
celery.contrib.migrate
~~~~~~~~~~~~~~~~~~~~~~
Migration tools.
"""
from __future__ import absolute_import, print_function, unicode_literals
import socket
from functools import partial
from itertools import cycle, islice
from kombu import eventloop, Queue
from kombu.common import maybe_declare
from kombu.utils.encoding import ensure_bytes
from celery.app import app_or_default
from celery.five import string, string_t
from celery.utils import worker_direct
__all__ = ['StopFiltering', 'State', 'republish', 'migrate_task',
'migrate_tasks', 'move', 'task_id_eq', 'task_id_in',
'start_filter', 'move_task_by_id', 'move_by_idmap',
'move_by_taskmap', 'move_direct', 'move_direct_by_id']
MOVING_PROGRESS_FMT = """\
Moving task {state.filtered}/{state.strtotal}: \
{body[task]}[{body[id]}]\
"""
class StopFiltering(Exception):
pass
class State(object):
count = 0
filtered = 0
total_apx = 0
@property
def strtotal(self):
if not self.total_apx:
return '?'
return string(self.total_apx)
def __repr__(self):
if self.filtered:
return '^{0.filtered}'.format(self)
return '{0.count}/{0.strtotal}'.format(self)
def republish(producer, message, exchange=None, routing_key=None,
remove_props=['application_headers',
'content_type',
'content_encoding',
'headers']):
body = ensure_bytes(message.body) # use raw message body.
info, headers, props = (message.delivery_info,
message.headers, message.properties)
exchange = info['exchange'] if exchange is None else exchange
routing_key = info['routing_key'] if routing_key is None else routing_key
ctype, enc = message.content_type, message.content_encoding
# remove compression header, as this will be inserted again
# when the message is recompressed.
compression = headers.pop('compression', None)
for key in remove_props:
props.pop(key, None)
producer.publish(ensure_bytes(body), exchange=exchange,
routing_key=routing_key, compression=compression,
headers=headers, content_type=ctype,
content_encoding=enc, **props)
def migrate_task(producer, body_, message, queues=None):
info = message.delivery_info
queues = {} if queues is None else queues
republish(producer, message,
exchange=queues.get(info['exchange']),
routing_key=queues.get(info['routing_key']))
def filter_callback(callback, tasks):
def filtered(body, message):
if tasks and body['task'] not in tasks:
return
return callback(body, message)
return filtered
def migrate_tasks(source, dest, migrate=migrate_task, app=None,
queues=None, **kwargs):
app = app_or_default(app)
queues = prepare_queues(queues)
producer = app.amqp.TaskProducer(dest)
migrate = partial(migrate, producer, queues=queues)
def on_declare_queue(queue):
new_queue = queue(producer.channel)
new_queue.name = queues.get(queue.name, queue.name)
if new_queue.routing_key == queue.name:
new_queue.routing_key = queues.get(queue.name,
new_queue.routing_key)
if new_queue.exchange.name == queue.name:
new_queue.exchange.name = queues.get(queue.name, queue.name)
new_queue.declare()
return start_filter(app, source, migrate, queues=queues,
on_declare_queue=on_declare_queue, **kwargs)
def _maybe_queue(app, q):
if isinstance(q, string_t):
return app.amqp.queues[q]
return q
def move(predicate, connection=None, exchange=None, routing_key=None,
source=None, app=None, callback=None, limit=None, transform=None,
**kwargs):
"""Find tasks by filtering them and move the tasks to a new queue.
:param predicate: Filter function used to decide which messages
to move. Must accept the standard signature of ``(body, message)``
used by Kombu consumer callbacks. If the predicate wants the message
to be moved it must return either:
1) a tuple of ``(exchange, routing_key)``, or
2) a :class:`~kombu.entity.Queue` instance, or
3) any other true value which means the specified
``exchange`` and ``routing_key`` arguments will be used.
:keyword connection: Custom connection to use.
:keyword source: Optional list of source queues to use instead of the
default (which is the queues in :setting:`CELERY_QUEUES`).
This list can also contain new :class:`~kombu.entity.Queue` instances.
:keyword exchange: Default destination exchange.
:keyword routing_key: Default destination routing key.
:keyword limit: Limit number of messages to filter.
:keyword callback: Callback called after message moved,
with signature ``(state, body, message)``.
:keyword transform: Optional function to transform the return
value (destination) of the filter function.
Also supports the same keyword arguments as :func:`start_filter`.
To demonstrate, the :func:`move_task_by_id` operation can be implemented
like this:
.. code-block:: python
def is_wanted_task(body, message):
if body['id'] == wanted_id:
return Queue('foo', exchange=Exchange('foo'),
routing_key='foo')
move(is_wanted_task)
or with a transform:
.. code-block:: python
def transform(value):
if isinstance(value, string_t):
return Queue(value, Exchange(value), value)
return value
move(is_wanted_task, transform=transform)
The predicate may also return a tuple of ``(exchange, routing_key)``
to specify the destination to where the task should be moved,
or a :class:`~kombu.entitiy.Queue` instance.
Any other true value means that the task will be moved to the
default exchange/routing_key.
"""
app = app_or_default(app)
queues = [_maybe_queue(app, queue) for queue in source or []] or None
with app.connection_or_acquire(connection, pool=False) as conn:
producer = app.amqp.TaskProducer(conn)
state = State()
def on_task(body, message):
ret = predicate(body, message)
if ret:
if transform:
ret = transform(ret)
if isinstance(ret, Queue):
maybe_declare(ret, conn.default_channel)
ex, rk = ret.exchange.name, ret.routing_key
else:
ex, rk = expand_dest(ret, exchange, routing_key)
republish(producer, message,
exchange=ex, routing_key=rk)
message.ack()
state.filtered += 1
if callback:
callback(state, body, message)
if limit and state.filtered >= limit:
raise StopFiltering()
return start_filter(app, conn, on_task, consume_from=queues, **kwargs)
def expand_dest(ret, exchange, routing_key):
try:
ex, rk = ret
except (TypeError, ValueError):
ex, rk = exchange, routing_key
return ex, rk
def task_id_eq(task_id, body, message):
return body['id'] == task_id
def task_id_in(ids, body, message):
return body['id'] in ids
def prepare_queues(queues):
if isinstance(queues, string_t):
queues = queues.split(',')
if isinstance(queues, list):
queues = dict(tuple(islice(cycle(q.split(':')), None, 2))
for q in queues)
if queues is None:
queues = {}
return queues
def start_filter(app, conn, filter, limit=None, timeout=1.0,
ack_messages=False, tasks=None, queues=None,
callback=None, forever=False, on_declare_queue=None,
consume_from=None, state=None, accept=None, **kwargs):
state = state or State()
queues = prepare_queues(queues)
consume_from = [_maybe_queue(app, q)
for q in consume_from or list(queues)]
if isinstance(tasks, string_t):
tasks = set(tasks.split(','))
if tasks is None:
tasks = set([])
def update_state(body, message):
state.count += 1
if limit and state.count >= limit:
raise StopFiltering()
def ack_message(body, message):
message.ack()
consumer = app.amqp.TaskConsumer(conn, queues=consume_from, accept=accept)
if tasks:
filter = filter_callback(filter, tasks)
update_state = filter_callback(update_state, tasks)
ack_message = filter_callback(ack_message, tasks)
consumer.register_callback(filter)
consumer.register_callback(update_state)
if ack_messages:
consumer.register_callback(ack_message)
if callback is not None:
callback = partial(callback, state)
if tasks:
callback = filter_callback(callback, tasks)
consumer.register_callback(callback)
# declare all queues on the new broker.
for queue in consumer.queues:
if queues and queue.name not in queues:
continue
if on_declare_queue is not None:
on_declare_queue(queue)
try:
_, mcount, _ = queue(consumer.channel).queue_declare(passive=True)
if mcount:
state.total_apx += mcount
except conn.channel_errors:
pass
# start migrating messages.
with consumer:
try:
for _ in eventloop(conn, # pragma: no cover
timeout=timeout, ignore_timeouts=forever):
pass
except socket.timeout:
pass
except StopFiltering:
pass
return state
def move_task_by_id(task_id, dest, **kwargs):
"""Find a task by id and move it to another queue.
:param task_id: Id of task to move.
:param dest: Destination queue.
Also supports the same keyword arguments as :func:`move`.
"""
return move_by_idmap({task_id: dest}, **kwargs)
def move_by_idmap(map, **kwargs):
"""Moves tasks by matching from a ``task_id: queue`` mapping,
where ``queue`` is a queue to move the task to.
Example::
>>> move_by_idmap({
... '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'),
... 'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'),
... '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')},
... queues=['hipri'])
"""
def task_id_in_map(body, message):
return map.get(body['id'])
# adding the limit means that we don't have to consume any more
# when we've found everything.
return move(task_id_in_map, limit=len(map), **kwargs)
def move_by_taskmap(map, **kwargs):
"""Moves tasks by matching from a ``task_name: queue`` mapping,
where ``queue`` is the queue to move the task to.
Example::
>>> move_by_taskmap({
... 'tasks.add': Queue('name'),
... 'tasks.mul': Queue('name'),
... })
"""
def task_name_in_map(body, message):
return map.get(body['task']) # <- name of task
return move(task_name_in_map, **kwargs)
def filter_status(state, body, message, **kwargs):
print(MOVING_PROGRESS_FMT.format(state=state, body=body, **kwargs))
move_direct = partial(move, transform=worker_direct)
move_direct_by_id = partial(move_task_by_id, transform=worker_direct)
move_direct_by_idmap = partial(move_by_idmap, transform=worker_direct)
move_direct_by_taskmap = partial(move_by_taskmap, transform=worker_direct)
|