This file is indexed.

/usr/lib/python3/dist-packages/celery/bin/call.py is in python3-celery 4.1.0-2ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""The ``celery call`` program used to send tasks from the command-line."""
from __future__ import absolute_import, unicode_literals
from kombu.utils.json import loads
from celery.bin.base import Command
from celery.five import string_t
from celery.utils.time import maybe_iso8601


class call(Command):
    """Call a task by name.

    Examples:
        .. code-block:: console

            $ celery call tasks.add --args='[2, 2]'
            $ celery call tasks.add --args='[2, 2]' --countdown=10
    """

    args = '<task_name>'

    # since we have an argument --args, we need to name this differently.
    args_name = 'posargs'

    def add_arguments(self, parser):
        group = parser.add_argument_group('Calling Options')
        group.add_argument('--args', '-a',
                           help='positional arguments (json).')
        group.add_argument('--kwargs', '-k',
                           help='keyword arguments (json).')
        group.add_argument('--eta',
                           help='scheduled time (ISO-8601).')
        group.add_argument(
            '--countdown', type=float,
            help='eta in seconds from now (float/int).',
        )
        group.add_argument(
            '--expires',
            help='expiry time (ISO-8601/float/int).',
        ),
        group.add_argument(
            '--serializer', default='json',
            help='defaults to json.'),

        ropts = parser.add_argument_group('Routing Options')
        ropts.add_argument('--queue', help='custom queue name.')
        ropts.add_argument('--exchange', help='custom exchange name.')
        ropts.add_argument('--routing-key', help='custom routing key.')

    def run(self, name, *_, **kwargs):
        self._send_task(name, **kwargs)

    def _send_task(self, name, args=None, kwargs=None,
                   countdown=None, serializer=None,
                   queue=None, exchange=None, routing_key=None,
                   eta=None, expires=None, **_):
        # arguments
        args = loads(args) if isinstance(args, string_t) else args
        kwargs = loads(kwargs) if isinstance(kwargs, string_t) else kwargs

        # Expires can be int/float.
        try:
            expires = float(expires)
        except (TypeError, ValueError):
            # or a string describing an ISO 8601 datetime.
            try:
                expires = maybe_iso8601(expires)
            except (TypeError, ValueError):
                raise

        # send the task and print the id.
        self.out(self.app.send_task(
            name,
            args=args or (), kwargs=kwargs or {},
            countdown=countdown,
            serializer=serializer,
            queue=queue,
            exchange=exchange,
            routing_key=routing_key,
            eta=maybe_iso8601(eta),
            expires=expires,
        ).id)