This file is indexed.

/usr/lib/python2.7/dist-packages/celery/fixups/django.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
from __future__ import absolute_import

import os
import sys
import warnings

from kombu.utils import cached_property, symbol_by_name

from datetime import datetime
from importlib import import_module

from celery import signals
from celery.exceptions import FixupWarning

if sys.version_info[0] < 3 and not hasattr(sys, 'pypy_version_info'):
    from StringIO import StringIO
else:
    from io import StringIO

__all__ = ['DjangoFixup', 'fixup']

ERR_NOT_INSTALLED = """\
Environment variable DJANGO_SETTINGS_MODULE is defined
but Django is not installed.  Will not apply Django fixups!
"""


def _maybe_close_fd(fh):
    try:
        os.close(fh.fileno())
    except (AttributeError, OSError, TypeError):
        # TypeError added for celery#962
        pass


def fixup(app, env='DJANGO_SETTINGS_MODULE'):
    SETTINGS_MODULE = os.environ.get(env)
    if SETTINGS_MODULE and 'django' not in app.loader_cls.lower():
        try:
            import django  # noqa
        except ImportError:
            warnings.warn(FixupWarning(ERR_NOT_INSTALLED))
        else:
            return DjangoFixup(app).install()


class DjangoFixup(object):

    def __init__(self, app):
        self.app = app
        self.app.set_default()
        self._worker_fixup = None

    def install(self):
        # Need to add project directory to path
        sys.path.append(os.getcwd())

        self.app.loader.now = self.now
        self.app.loader.mail_admins = self.mail_admins

        signals.import_modules.connect(self.on_import_modules)
        signals.worker_init.connect(self.on_worker_init)
        return self

    @cached_property
    def worker_fixup(self):
        if self._worker_fixup is None:
            self._worker_fixup = DjangoWorkerFixup(self.app)
        return self._worker_fixup

    def on_import_modules(self, **kwargs):
        # call django.setup() before task modules are imported
        self.worker_fixup.validate_models()

    def on_worker_init(self, **kwargs):
        self.worker_fixup.install()

    def now(self, utc=False):
        return datetime.utcnow() if utc else self._now()

    def mail_admins(self, subject, body, fail_silently=False, **kwargs):
        return self._mail_admins(subject, body, fail_silently=fail_silently)

    @cached_property
    def _mail_admins(self):
        return symbol_by_name('django.core.mail:mail_admins')

    @cached_property
    def _now(self):
        try:
            return symbol_by_name('django.utils.timezone:now')
        except (AttributeError, ImportError):  # pre django-1.4
            return datetime.now


class DjangoWorkerFixup(object):
    _db_recycles = 0

    def __init__(self, app):
        self.app = app
        self.db_reuse_max = self.app.conf.get('CELERY_DB_REUSE_MAX', None)
        self._db = import_module('django.db')
        self._cache = import_module('django.core.cache')
        self._settings = symbol_by_name('django.conf:settings')

        # Database-related exceptions.
        DatabaseError = symbol_by_name('django.db:DatabaseError')
        try:
            import MySQLdb as mysql
            _my_database_errors = (mysql.DatabaseError,
                                   mysql.InterfaceError,
                                   mysql.OperationalError)
        except ImportError:
            _my_database_errors = ()      # noqa
        try:
            import psycopg2 as pg
            _pg_database_errors = (pg.DatabaseError,
                                   pg.InterfaceError,
                                   pg.OperationalError)
        except ImportError:
            _pg_database_errors = ()      # noqa
        try:
            import sqlite3
            _lite_database_errors = (sqlite3.DatabaseError,
                                     sqlite3.InterfaceError,
                                     sqlite3.OperationalError)
        except ImportError:
            _lite_database_errors = ()    # noqa
        try:
            import cx_Oracle as oracle
            _oracle_database_errors = (oracle.DatabaseError,
                                       oracle.InterfaceError,
                                       oracle.OperationalError)
        except ImportError:
            _oracle_database_errors = ()  # noqa

        try:
            self._close_old_connections = symbol_by_name(
                'django.db:close_old_connections',
            )
        except (ImportError, AttributeError):
            self._close_old_connections = None
        self.database_errors = (
            (DatabaseError, ) +
            _my_database_errors +
            _pg_database_errors +
            _lite_database_errors +
            _oracle_database_errors
        )

    def validate_models(self):
        import django
        try:
            django_setup = django.setup
        except AttributeError:
            pass
        else:
            django_setup()
        s = StringIO()
        try:
            from django.core.management.validation import get_validation_errors
        except ImportError:
            from django.core.management.base import BaseCommand
            cmd = BaseCommand()
            try:
                # since django 1.5
                from django.core.management.base import OutputWrapper
                cmd.stdout = OutputWrapper(sys.stdout)
                cmd.stderr = OutputWrapper(sys.stderr)
            except ImportError:
                cmd.stdout, cmd.stderr = sys.stdout, sys.stderr

            cmd.check()
        else:
            num_errors = get_validation_errors(s, None)
            if num_errors:
                raise RuntimeError(
                    'One or more Django models did not validate:\n{0}'.format(
                        s.getvalue()))

    def install(self):
        signals.beat_embedded_init.connect(self.close_database)
        signals.worker_ready.connect(self.on_worker_ready)
        signals.task_prerun.connect(self.on_task_prerun)
        signals.task_postrun.connect(self.on_task_postrun)
        signals.worker_process_init.connect(self.on_worker_process_init)
        self.close_database()
        self.close_cache()
        return self

    def on_worker_process_init(self, **kwargs):
        # Child process must validate models again if on Windows,
        # or if they were started using execv.
        if os.environ.get('FORKED_BY_MULTIPROCESSING'):
            self.validate_models()

        # close connections:
        # the parent process may have established these,
        # so need to close them.

        # calling db.close() on some DB connections will cause
        # the inherited DB conn to also get broken in the parent
        # process so we need to remove it without triggering any
        # network IO that close() might cause.
        try:
            for c in self._db.connections.all():
                if c and c.connection:
                    _maybe_close_fd(c.connection)
        except AttributeError:
            if self._db.connection and self._db.connection.connection:
                _maybe_close_fd(self._db.connection.connection)

        # use the _ version to avoid DB_REUSE preventing the conn.close() call
        self._close_database()
        self.close_cache()

    def on_task_prerun(self, sender, **kwargs):
        """Called before every task."""
        if not getattr(sender.request, 'is_eager', False):
            self.close_database()

    def on_task_postrun(self, sender, **kwargs):
        # See http://groups.google.com/group/django-users/
        #            browse_thread/thread/78200863d0c07c6d/
        if not getattr(sender.request, 'is_eager', False):
            self.close_database()
            self.close_cache()

    def close_database(self, **kwargs):
        if self._close_old_connections:
            return self._close_old_connections()  # Django 1.6
        if not self.db_reuse_max:
            return self._close_database()
        if self._db_recycles >= self.db_reuse_max * 2:
            self._db_recycles = 0
            self._close_database()
        self._db_recycles += 1

    def _close_database(self):
        try:
            funs = [conn.close for conn in self._db.connections.all()]
        except AttributeError:
            if hasattr(self._db, 'close_old_connections'):  # django 1.6
                funs = [self._db.close_old_connections]
            else:
                # pre multidb, pending deprication in django 1.6
                funs = [self._db.close_connection]

        for close in funs:
            try:
                close()
            except self.database_errors as exc:
                str_exc = str(exc)
                if 'closed' not in str_exc and 'not connected' not in str_exc:
                    raise

    def close_cache(self):
        try:
            self._cache.cache.close()
        except (TypeError, AttributeError):
            pass

    def on_worker_ready(self, **kwargs):
        if self._settings.DEBUG:
            warnings.warn('Using settings.DEBUG leads to a memory leak, never '
                          'use this setting in production environments!')