/usr/lib/python3/dist-packages/celery/bin/call.py is in python3-celery 4.1.0-2ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | """The ``celery call`` program used to send tasks from the command-line."""
from __future__ import absolute_import, unicode_literals
from kombu.utils.json import loads
from celery.bin.base import Command
from celery.five import string_t
from celery.utils.time import maybe_iso8601
class call(Command):
"""Call a task by name.
Examples:
.. code-block:: console
$ celery call tasks.add --args='[2, 2]'
$ celery call tasks.add --args='[2, 2]' --countdown=10
"""
args = '<task_name>'
# since we have an argument --args, we need to name this differently.
args_name = 'posargs'
def add_arguments(self, parser):
group = parser.add_argument_group('Calling Options')
group.add_argument('--args', '-a',
help='positional arguments (json).')
group.add_argument('--kwargs', '-k',
help='keyword arguments (json).')
group.add_argument('--eta',
help='scheduled time (ISO-8601).')
group.add_argument(
'--countdown', type=float,
help='eta in seconds from now (float/int).',
)
group.add_argument(
'--expires',
help='expiry time (ISO-8601/float/int).',
),
group.add_argument(
'--serializer', default='json',
help='defaults to json.'),
ropts = parser.add_argument_group('Routing Options')
ropts.add_argument('--queue', help='custom queue name.')
ropts.add_argument('--exchange', help='custom exchange name.')
ropts.add_argument('--routing-key', help='custom routing key.')
def run(self, name, *_, **kwargs):
self._send_task(name, **kwargs)
def _send_task(self, name, args=None, kwargs=None,
countdown=None, serializer=None,
queue=None, exchange=None, routing_key=None,
eta=None, expires=None, **_):
# arguments
args = loads(args) if isinstance(args, string_t) else args
kwargs = loads(kwargs) if isinstance(kwargs, string_t) else kwargs
# Expires can be int/float.
try:
expires = float(expires)
except (TypeError, ValueError):
# or a string describing an ISO 8601 datetime.
try:
expires = maybe_iso8601(expires)
except (TypeError, ValueError):
raise
# send the task and print the id.
self.out(self.app.send_task(
name,
args=args or (), kwargs=kwargs or {},
countdown=countdown,
serializer=serializer,
queue=queue,
exchange=exchange,
routing_key=routing_key,
eta=maybe_iso8601(eta),
expires=expires,
).id)
|