/usr/lib/python3/dist-packages/celery/contrib/migrate.py is in python3-celery 4.1.0-2ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 | # -*- coding: utf-8 -*-
"""Message migration tools (Broker <-> Broker)."""
from __future__ import absolute_import, print_function, unicode_literals
import socket
from functools import partial
from itertools import cycle, islice
from kombu import eventloop, Queue
from kombu.common import maybe_declare
from kombu.utils.encoding import ensure_bytes
from celery.app import app_or_default
from celery.five import python_2_unicode_compatible, string, string_t
from celery.utils.nodenames import worker_direct
from celery.utils.text import str_to_list
__all__ = [
'StopFiltering', 'State', 'republish', 'migrate_task',
'migrate_tasks', 'move', 'task_id_eq', 'task_id_in',
'start_filter', 'move_task_by_id', 'move_by_idmap',
'move_by_taskmap', 'move_direct', 'move_direct_by_id',
]
MOVING_PROGRESS_FMT = """\
Moving task {state.filtered}/{state.strtotal}: \
{body[task]}[{body[id]}]\
"""
class StopFiltering(Exception):
"""Semi-predicate used to signal filter stop."""
@python_2_unicode_compatible
class State(object):
"""Migration progress state."""
count = 0
filtered = 0
total_apx = 0
@property
def strtotal(self):
if not self.total_apx:
return '?'
return string(self.total_apx)
def __repr__(self):
if self.filtered:
return '^{0.filtered}'.format(self)
return '{0.count}/{0.strtotal}'.format(self)
def republish(producer, message, exchange=None, routing_key=None,
remove_props=['application_headers',
'content_type',
'content_encoding',
'headers']):
"""Republish message."""
body = ensure_bytes(message.body) # use raw message body.
info, headers, props = (message.delivery_info,
message.headers, message.properties)
exchange = info['exchange'] if exchange is None else exchange
routing_key = info['routing_key'] if routing_key is None else routing_key
ctype, enc = message.content_type, message.content_encoding
# remove compression header, as this will be inserted again
# when the message is recompressed.
compression = headers.pop('compression', None)
for key in remove_props:
props.pop(key, None)
producer.publish(ensure_bytes(body), exchange=exchange,
routing_key=routing_key, compression=compression,
headers=headers, content_type=ctype,
content_encoding=enc, **props)
def migrate_task(producer, body_, message, queues=None):
"""Migrate single task message."""
info = message.delivery_info
queues = {} if queues is None else queues
republish(producer, message,
exchange=queues.get(info['exchange']),
routing_key=queues.get(info['routing_key']))
def filter_callback(callback, tasks):
def filtered(body, message):
if tasks and body['task'] not in tasks:
return
return callback(body, message)
return filtered
def migrate_tasks(source, dest, migrate=migrate_task, app=None,
queues=None, **kwargs):
"""Migrate tasks from one broker to another."""
app = app_or_default(app)
queues = prepare_queues(queues)
producer = app.amqp.Producer(dest, auto_declare=False)
migrate = partial(migrate, producer, queues=queues)
def on_declare_queue(queue):
new_queue = queue(producer.channel)
new_queue.name = queues.get(queue.name, queue.name)
if new_queue.routing_key == queue.name:
new_queue.routing_key = queues.get(queue.name,
new_queue.routing_key)
if new_queue.exchange.name == queue.name:
new_queue.exchange.name = queues.get(queue.name, queue.name)
new_queue.declare()
return start_filter(app, source, migrate, queues=queues,
on_declare_queue=on_declare_queue, **kwargs)
def _maybe_queue(app, q):
if isinstance(q, string_t):
return app.amqp.queues[q]
return q
def move(predicate, connection=None, exchange=None, routing_key=None,
source=None, app=None, callback=None, limit=None, transform=None,
**kwargs):
"""Find tasks by filtering them and move the tasks to a new queue.
Arguments:
predicate (Callable): Filter function used to decide the messages
to move. Must accept the standard signature of ``(body, message)``
used by Kombu consumer callbacks. If the predicate wants the
message to be moved it must return either:
1) a tuple of ``(exchange, routing_key)``, or
2) a :class:`~kombu.entity.Queue` instance, or
3) any other true value means the specified
``exchange`` and ``routing_key`` arguments will be used.
connection (kombu.Connection): Custom connection to use.
source: List[Union[str, kombu.Queue]]: Optional list of source
queues to use instead of the default (queues
in :setting:`task_queues`). This list can also contain
:class:`~kombu.entity.Queue` instances.
exchange (str, kombu.Exchange): Default destination exchange.
routing_key (str): Default destination routing key.
limit (int): Limit number of messages to filter.
callback (Callable): Callback called after message moved,
with signature ``(state, body, message)``.
transform (Callable): Optional function to transform the return
value (destination) of the filter function.
Also supports the same keyword arguments as :func:`start_filter`.
To demonstrate, the :func:`move_task_by_id` operation can be implemented
like this:
.. code-block:: python
def is_wanted_task(body, message):
if body['id'] == wanted_id:
return Queue('foo', exchange=Exchange('foo'),
routing_key='foo')
move(is_wanted_task)
or with a transform:
.. code-block:: python
def transform(value):
if isinstance(value, string_t):
return Queue(value, Exchange(value), value)
return value
move(is_wanted_task, transform=transform)
Note:
The predicate may also return a tuple of ``(exchange, routing_key)``
to specify the destination to where the task should be moved,
or a :class:`~kombu.entitiy.Queue` instance.
Any other true value means that the task will be moved to the
default exchange/routing_key.
"""
app = app_or_default(app)
queues = [_maybe_queue(app, queue) for queue in source or []] or None
with app.connection_or_acquire(connection, pool=False) as conn:
producer = app.amqp.Producer(conn)
state = State()
def on_task(body, message):
ret = predicate(body, message)
if ret:
if transform:
ret = transform(ret)
if isinstance(ret, Queue):
maybe_declare(ret, conn.default_channel)
ex, rk = ret.exchange.name, ret.routing_key
else:
ex, rk = expand_dest(ret, exchange, routing_key)
republish(producer, message,
exchange=ex, routing_key=rk)
message.ack()
state.filtered += 1
if callback:
callback(state, body, message)
if limit and state.filtered >= limit:
raise StopFiltering()
return start_filter(app, conn, on_task, consume_from=queues, **kwargs)
def expand_dest(ret, exchange, routing_key):
try:
ex, rk = ret
except (TypeError, ValueError):
ex, rk = exchange, routing_key
return ex, rk
def task_id_eq(task_id, body, message):
"""Return true if task id equals task_id'."""
return body['id'] == task_id
def task_id_in(ids, body, message):
"""Return true if task id is member of set ids'."""
return body['id'] in ids
def prepare_queues(queues):
if isinstance(queues, string_t):
queues = queues.split(',')
if isinstance(queues, list):
queues = dict(tuple(islice(cycle(q.split(':')), None, 2))
for q in queues)
if queues is None:
queues = {}
return queues
class Filterer(object):
def __init__(self, app, conn, filter,
limit=None, timeout=1.0,
ack_messages=False, tasks=None, queues=None,
callback=None, forever=False, on_declare_queue=None,
consume_from=None, state=None, accept=None, **kwargs):
self.app = app
self.conn = conn
self.filter = filter
self.limit = limit
self.timeout = timeout
self.ack_messages = ack_messages
self.tasks = set(str_to_list(tasks) or [])
self.queues = prepare_queues(queues)
self.callback = callback
self.forever = forever
self.on_declare_queue = on_declare_queue
self.consume_from = [
_maybe_queue(self.app, q)
for q in consume_from or list(self.queues)
]
self.state = state or State()
self.accept = accept
def start(self):
# start migrating messages.
with self.prepare_consumer(self.create_consumer()):
try:
for _ in eventloop(self.conn, # pragma: no cover
timeout=self.timeout,
ignore_timeouts=self.forever):
pass
except socket.timeout:
pass
except StopFiltering:
pass
return self.state
def update_state(self, body, message):
self.state.count += 1
if self.limit and self.state.count >= self.limit:
raise StopFiltering()
def ack_message(self, body, message):
message.ack()
def create_consumer(self):
return self.app.amqp.TaskConsumer(
self.conn,
queues=self.consume_from,
accept=self.accept,
)
def prepare_consumer(self, consumer):
filter = self.filter
update_state = self.update_state
ack_message = self.ack_message
if self.tasks:
filter = filter_callback(filter, self.tasks)
update_state = filter_callback(update_state, self.tasks)
ack_message = filter_callback(ack_message, self.tasks)
consumer.register_callback(filter)
consumer.register_callback(update_state)
if self.ack_messages:
consumer.register_callback(self.ack_message)
if self.callback is not None:
callback = partial(self.callback, self.state)
if self.tasks:
callback = filter_callback(callback, self.tasks)
consumer.register_callback(callback)
self.declare_queues(consumer)
return consumer
def declare_queues(self, consumer):
# declare all queues on the new broker.
for queue in consumer.queues:
if self.queues and queue.name not in self.queues:
continue
if self.on_declare_queue is not None:
self.on_declare_queue(queue)
try:
_, mcount, _ = queue(
consumer.channel).queue_declare(passive=True)
if mcount:
self.state.total_apx += mcount
except self.conn.channel_errors:
pass
def start_filter(app, conn, filter, limit=None, timeout=1.0,
ack_messages=False, tasks=None, queues=None,
callback=None, forever=False, on_declare_queue=None,
consume_from=None, state=None, accept=None, **kwargs):
"""Filter tasks."""
return Filterer(
app, conn, filter,
limit=limit,
timeout=timeout,
ack_messages=ack_messages,
tasks=tasks,
queues=queues,
callback=callback,
forever=forever,
on_declare_queue=on_declare_queue,
consume_from=consume_from,
state=state,
accept=accept,
**kwargs).start()
def move_task_by_id(task_id, dest, **kwargs):
"""Find a task by id and move it to another queue.
Arguments:
task_id (str): Id of task to find and move.
dest: (str, kombu.Queue): Destination queue.
**kwargs (Any): Also supports the same keyword
arguments as :func:`move`.
"""
return move_by_idmap({task_id: dest}, **kwargs)
def move_by_idmap(map, **kwargs):
"""Move tasks by matching from a ``task_id: queue`` mapping.
Where ``queue`` is a queue to move the task to.
Example:
>>> move_by_idmap({
... '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'),
... 'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'),
... '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')},
... queues=['hipri'])
"""
def task_id_in_map(body, message):
return map.get(body['id'])
# adding the limit means that we don't have to consume any more
# when we've found everything.
return move(task_id_in_map, limit=len(map), **kwargs)
def move_by_taskmap(map, **kwargs):
"""Move tasks by matching from a ``task_name: queue`` mapping.
``queue`` is the queue to move the task to.
Example:
>>> move_by_taskmap({
... 'tasks.add': Queue('name'),
... 'tasks.mul': Queue('name'),
... })
"""
def task_name_in_map(body, message):
return map.get(body['task']) # <- name of task
return move(task_name_in_map, **kwargs)
def filter_status(state, body, message, **kwargs):
print(MOVING_PROGRESS_FMT.format(state=state, body=body, **kwargs))
move_direct = partial(move, transform=worker_direct)
move_direct_by_id = partial(move_task_by_id, transform=worker_direct)
move_direct_by_idmap = partial(move_by_idmap, transform=worker_direct)
move_direct_by_taskmap = partial(move_by_taskmap, transform=worker_direct)
|