This file is indexed.

/usr/lib/python3/dist-packages/celery/bin/purge.py is in python3-celery 4.1.0-2ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
"""The ``celery purge`` program, used to delete messages from queues."""
from __future__ import absolute_import, unicode_literals
from celery.five import keys
from celery.bin.base import Command
from celery.utils import text


class purge(Command):
    """Erase all messages from all known task queues.

    Warning:
        There's no undo operation for this command.
    """

    warn_prelude = (
        '{warning}: This will remove all tasks from {queues}: {names}.\n'
        '         There is no undo for this operation!\n\n'
        '(to skip this prompt use the -f option)\n'
    )
    warn_prompt = 'Are you sure you want to delete all tasks'

    fmt_purged = 'Purged {mnum} {messages} from {qnum} known task {queues}.'
    fmt_empty = 'No messages purged from {qnum} {queues}'

    def add_arguments(self, parser):
        group = parser.add_argument_group('Purging Options')
        group.add_argument(
            '--force', '-f', action='store_true', default=False,
            help="Don't prompt for verification",
        )
        group.add_argument(
            '--queues', '-Q', default=[],
            help='Comma separated list of queue names to purge.',
        )
        group.add_argument(
            '--exclude-queues', '-X', default=[],
            help='Comma separated list of queues names not to purge.',
        )

    def run(self, force=False, queues=None, exclude_queues=None, **kwargs):
        queues = set(text.str_to_list(queues or []))
        exclude = set(text.str_to_list(exclude_queues or []))
        names = (queues or set(keys(self.app.amqp.queues))) - exclude
        qnum = len(names)

        messages = None
        if names:
            if not force:
                self.out(self.warn_prelude.format(
                    warning=self.colored.red('WARNING'),
                    queues=text.pluralize(qnum, 'queue'),
                    names=', '.join(sorted(names)),
                ))
                if self.ask(self.warn_prompt, ('yes', 'no'), 'no') != 'yes':
                    return
            with self.app.connection_for_write() as conn:
                messages = sum(self._purge(conn, queue) for queue in names)
        fmt = self.fmt_purged if messages else self.fmt_empty
        self.out(fmt.format(
            mnum=messages, qnum=qnum,
            messages=text.pluralize(messages, 'message'),
            queues=text.pluralize(qnum, 'queue')))

    def _purge(self, conn, queue):
        try:
            return conn.default_channel.queue_purge(queue) or 0
        except conn.channel_errors:
            return 0