This file is indexed.

/usr/lib/python2.7/dist-packages/celery/contrib/abortable.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# -*- coding: utf-8 -*-
"""
=========================
Abortable tasks overview
=========================

For long-running :class:`Task`'s, it can be desirable to support
aborting during execution. Of course, these tasks should be built to
support abortion specifically.

The :class:`AbortableTask` serves as a base class for all :class:`Task`
objects that should support abortion by producers.

* Producers may invoke the :meth:`abort` method on
  :class:`AbortableAsyncResult` instances, to request abortion.

* Consumers (workers) should periodically check (and honor!) the
  :meth:`is_aborted` method at controlled points in their task's
  :meth:`run` method. The more often, the better.

The necessary intermediate communication is dealt with by the
:class:`AbortableTask` implementation.

Usage example
-------------

In the consumer:

.. code-block:: python

    from __future__ import absolute_import

    from celery.contrib.abortable import AbortableTask
    from celery.utils.log import get_task_logger

    from proj.celery import app

    logger = get_logger(__name__)

    @app.task(bind=True, base=AbortableTask)
    def long_running_task(self):
        results = []
        for i in range(100):
            # check after every 5 iterations...
            # (or alternatively, check when some timer is due)
            if not i % 5:
                if self.is_aborted():
                    # respect aborted state, and terminate gracefully.
                    logger.warning('Task aborted')
                    return
                value = do_something_expensive(i)
                results.append(y)
        logger.info('Task complete')
        return results

In the producer:

.. code-block:: python

    from __future__ import absolute_import

    import time

    from proj.tasks import MyLongRunningTask

    def myview(request):
        # result is of type AbortableAsyncResult
        result = long_running_task.delay()

        # abort the task after 10 seconds
        time.sleep(10)
        result.abort()

After the `result.abort()` call, the task execution is not
aborted immediately. In fact, it is not guaranteed to abort at all. Keep
checking `result.state` status, or call `result.get(timeout=)` to
have it block until the task is finished.

.. note::

   In order to abort tasks, there needs to be communication between the
   producer and the consumer.  This is currently implemented through the
   database backend.  Therefore, this class will only work with the
   database backends.

"""
from __future__ import absolute_import

from celery import Task
from celery.result import AsyncResult

__all__ = ['AbortableAsyncResult', 'AbortableTask']


"""
Task States
-----------

.. state:: ABORTED

ABORTED
~~~~~~~

Task is aborted (typically by the producer) and should be
aborted as soon as possible.

"""
ABORTED = 'ABORTED'


class AbortableAsyncResult(AsyncResult):
    """Represents a abortable result.

    Specifically, this gives the `AsyncResult` a :meth:`abort()` method,
    which sets the state of the underlying Task to `'ABORTED'`.

    """

    def is_aborted(self):
        """Return :const:`True` if the task is (being) aborted."""
        return self.state == ABORTED

    def abort(self):
        """Set the state of the task to :const:`ABORTED`.

        Abortable tasks monitor their state at regular intervals and
        terminate execution if so.

        Be aware that invoking this method does not guarantee when the
        task will be aborted (or even if the task will be aborted at
        all).

        """
        # TODO: store_result requires all four arguments to be set,
        # but only status should be updated here
        return self.backend.store_result(self.id, result=None,
                                         status=ABORTED, traceback=None)


class AbortableTask(Task):
    """A celery task that serves as a base class for all :class:`Task`'s
    that support aborting during execution.

    All subclasses of :class:`AbortableTask` must call the
    :meth:`is_aborted` method periodically and act accordingly when
    the call evaluates to :const:`True`.

    """
    abstract = True

    def AsyncResult(self, task_id):
        """Return the accompanying AbortableAsyncResult instance."""
        return AbortableAsyncResult(task_id, backend=self.backend)

    def is_aborted(self, **kwargs):
        """Checks against the backend whether this
        :class:`AbortableAsyncResult` is :const:`ABORTED`.

        Always return :const:`False` in case the `task_id` parameter
        refers to a regular (non-abortable) :class:`Task`.

        Be aware that invoking this method will cause a hit in the
        backend (for example a database query), so find a good balance
        between calling it regularly (for responsiveness), but not too
        often (for performance).

        """
        task_id = kwargs.get('task_id', self.request.id)
        result = self.AsyncResult(task_id)
        if not isinstance(result, AbortableAsyncResult):
            return False
        return result.is_aborted()