This file is indexed.

/usr/lib/python2.7/dist-packages/pyres/job.py is in python-pyres 1.5-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import logging
import time
from datetime import timedelta
from pyres import ResQ, safe_str_to_class
from pyres import failure
from pyres.failure.redis import RedisBackend
from pyres.compat import string_types

class Job(object):
    """Every job on the ResQ is an instance of the *Job* class.

    The ``__init__`` takes these keyword arguments:

        ``queue`` -- A string defining the queue to which this Job will be
                     added.

        ``payload`` -- A dictionary which contains the string name of a class
                       which extends this Job and a list of args which will be
                       passed to that class.

        ``resq`` -- An instance of the ResQ class.

        ``worker`` -- The name of a specific worker if you'd like this Job to be
                      done by that worker. Default is "None".

    """

    safe_str_to_class = staticmethod(safe_str_to_class)

    def __init__(self, queue, payload, resq, worker=None):
        self._queue = queue
        self._payload = payload
        self.resq = resq
        self._worker = worker

        self.enqueue_timestamp = self._payload.get("enqueue_timestamp")

        # Set the default back end, jobs can override when we import them
        # inside perform().
        failure.backend = RedisBackend

    def __str__(self):
        return "(Job{%s} | %s | %s)" % (
            self._queue, self._payload['class'], repr(self._payload['args']))

    def perform(self):
        """This method converts payload into args and calls the ``perform``
        method on the payload class.

        Before calling ``perform``, a ``before_perform`` class method
        is called, if it exists.  It takes a dictionary as an argument;
        currently the only things stored on the dictionary are the
        args passed into ``perform`` and a timestamp of when the job
        was enqueued.

        Similarly, an ``after_perform`` class method is called after
        ``perform`` is finished.  The metadata dictionary contains the
        same data, plus a timestamp of when the job was performed, a
        ``failed`` boolean value, and if it did fail, a ``retried``
        boolean value.  This method is called after retry, and is
        called regardless of whether an exception is ultimately thrown
        by the perform method.


        """
        payload_class_str = self._payload["class"]
        payload_class = self.safe_str_to_class(payload_class_str)
        payload_class.resq = self.resq
        args = self._payload.get("args")

        metadata = dict(args=args)
        if self.enqueue_timestamp:
            metadata["enqueue_timestamp"] = self.enqueue_timestamp

        before_perform = getattr(payload_class, "before_perform", None)

        metadata["failed"] = False
        metadata["perform_timestamp"] = time.time()
        check_after = True
        try:
            if before_perform:
                payload_class.before_perform(metadata)
            return payload_class.perform(*args)
        except Exception as e:
            check_after = False
            metadata["failed"] = True
            metadata["exception"] = e
            if not self.retry(payload_class, args):
                metadata["retried"] = False
                raise
            else:
                metadata["retried"] = True
                logging.exception("Retry scheduled after error in %s", self._payload)
        finally:
            after_perform = getattr(payload_class, "after_perform", None)
            if after_perform and check_after:
                payload_class.after_perform(metadata)
            delattr(payload_class,'resq')

    def fail(self, exception):
        """This method provides a way to fail a job and will use whatever
        failure backend you've provided. The default is the ``RedisBackend``.

        """
        fail = failure.create(exception, self._queue, self._payload,
                              self._worker)
        fail.save(self.resq)
        return fail

    def retry(self, payload_class, args):
        """This method provides a way to retry a job after a failure.
        If the jobclass defined by the payload containes a ``retry_every`` attribute then pyres
        will attempt to retry the job until successful or until timeout defined by ``retry_timeout`` on the payload class.

        """
        retry_every = getattr(payload_class, 'retry_every', None)
        retry_timeout = getattr(payload_class, 'retry_timeout', 0)

        if retry_every:
            now = ResQ._current_time()
            first_attempt = self._payload.get("first_attempt", now)
            retry_until = first_attempt + timedelta(seconds=retry_timeout)
            retry_at = now + timedelta(seconds=retry_every)
            if retry_at < retry_until:
                self.resq.enqueue_at(retry_at, payload_class, *args,
                        **{'first_attempt':first_attempt})
                return True
        return False

    @classmethod
    def reserve(cls, queues, res, worker=None, timeout=10):
        """Reserve a job on one of the queues. This marks this job so
        that other workers will not pick it up.

        """
        if isinstance(queues, string_types):
            queues = [queues]
        queue, payload = res.pop(queues, timeout=timeout)
        if payload:
            return cls(queue, payload, res, worker)