This file is indexed.

/usr/share/pyshared/gearman/worker_handler.py is in python-gearman 2.0.2-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import logging

from gearman.command_handler import GearmanCommandHandler
from gearman.errors import InvalidWorkerState
from gearman.protocol import GEARMAN_COMMAND_PRE_SLEEP, GEARMAN_COMMAND_RESET_ABILITIES, GEARMAN_COMMAND_CAN_DO, GEARMAN_COMMAND_SET_CLIENT_ID, GEARMAN_COMMAND_GRAB_JOB_UNIQ, \
    GEARMAN_COMMAND_WORK_STATUS, GEARMAN_COMMAND_WORK_COMPLETE, GEARMAN_COMMAND_WORK_FAIL, GEARMAN_COMMAND_WORK_EXCEPTION, GEARMAN_COMMAND_WORK_WARNING, GEARMAN_COMMAND_WORK_DATA

gearman_logger = logging.getLogger(__name__)

class GearmanWorkerCommandHandler(GearmanCommandHandler):
    """GearmanWorker state machine on a per connection basis

    A worker can be in the following distinct states:
        SLEEP         -> Doing nothing, can be awoken
        AWAKE         -> Transitional state (for NOOP)
        AWAITING_JOB  -> Holding worker level job lock and awaiting a server response
        EXECUTING_JOB -> Transitional state (for ASSIGN_JOB)
    """
    def __init__(self, connection_manager=None):
        super(GearmanWorkerCommandHandler, self).__init__(connection_manager=connection_manager)

        self._handler_abilities = []
        self._client_id = None

    def initial_state(self, abilities=None, client_id=None):
        self.set_client_id(client_id)
        self.set_abilities(abilities)

        self._sleep()

    ##################################################################
    ##### Public interface methods to be called by GearmanWorker #####
    ##################################################################
    def set_abilities(self, connection_abilities_list):
        assert type(connection_abilities_list) in (list, tuple)
        self._handler_abilities = connection_abilities_list

        self.send_command(GEARMAN_COMMAND_RESET_ABILITIES)
        for task in self._handler_abilities:
            self.send_command(GEARMAN_COMMAND_CAN_DO, task=task)

    def set_client_id(self, client_id):
        self._client_id = client_id

        if self._client_id is not None:
            self.send_command(GEARMAN_COMMAND_SET_CLIENT_ID, client_id=self._client_id)

    ###############################################################
    #### Convenience methods for typical gearman jobs to call #####
    ###############################################################
    def send_job_status(self, current_job, numerator, denominator):
        assert type(numerator) in (int, float), 'Numerator must be a numeric value'
        assert type(denominator) in (int, float), 'Denominator must be a numeric value'
        self.send_command(GEARMAN_COMMAND_WORK_STATUS, job_handle=current_job.handle, numerator=str(numerator), denominator=str(denominator))

    def send_job_complete(self, current_job, data):
        """Removes a job from the queue if its backgrounded"""
        self.send_command(GEARMAN_COMMAND_WORK_COMPLETE, job_handle=current_job.handle, data=self.encode_data(data))

    def send_job_failure(self, current_job):
        """Removes a job from the queue if its backgrounded"""
        self.send_command(GEARMAN_COMMAND_WORK_FAIL, job_handle=current_job.handle)

    def send_job_exception(self, current_job, data):
        # Using GEARMAND_COMMAND_WORK_EXCEPTION is not recommended at time of this writing [2010-02-24]
        # http://groups.google.com/group/gearman/browse_thread/thread/5c91acc31bd10688/529e586405ed37fe
        #
        self.send_command(GEARMAN_COMMAND_WORK_EXCEPTION, job_handle=current_job.handle, data=self.encode_data(data))

    def send_job_data(self, current_job, data):
        self.send_command(GEARMAN_COMMAND_WORK_DATA, job_handle=current_job.handle, data=self.encode_data(data))

    def send_job_warning(self, current_job, data):
        self.send_command(GEARMAN_COMMAND_WORK_WARNING, job_handle=current_job.handle, data=self.encode_data(data))

    ###########################################################
    ### Callbacks when we receive a command from the server ###
    ###########################################################
    def _grab_job(self):
        self.send_command(GEARMAN_COMMAND_GRAB_JOB_UNIQ)

    def _sleep(self):
        self.send_command(GEARMAN_COMMAND_PRE_SLEEP)

    def _check_job_lock(self):
        return self.connection_manager.check_job_lock(self)

    def _acquire_job_lock(self):
        return self.connection_manager.set_job_lock(self, lock=True)

    def _release_job_lock(self):
        if not self.connection_manager.set_job_lock(self, lock=False):
            raise InvalidWorkerState("Unable to release job lock for %r" % self)

        return True

    def recv_noop(self):
        """Transition from being SLEEP --> AWAITING_JOB / SLEEP

          AWAITING_JOB -> AWAITING_JOB :: Noop transition, we're already awaiting a job
        SLEEP -> AWAKE -> AWAITING_JOB :: Transition if we can acquire the worker job lock
        SLEEP -> AWAKE -> SLEEP        :: Transition if we can NOT acquire a worker job lock
        """
        if self._check_job_lock():
            pass
        elif self._acquire_job_lock():
            self._grab_job()
        else:
            self._sleep()

        return True

    def recv_no_job(self):
        """Transition from being AWAITING_JOB --> SLEEP

        AWAITING_JOB -> SLEEP :: Always transition to sleep if we have nothing to do
        """
        self._release_job_lock()
        self._sleep()

        return True

    def recv_job_assign_uniq(self, job_handle, task, unique, data):
        """Transition from being AWAITING_JOB --> EXECUTE_JOB --> SLEEP

        AWAITING_JOB -> EXECUTE_JOB -> SLEEP :: Always transition once we're given a job
        """
        assert task in self._handler_abilities, '%s not found in %r' % (task, self._handler_abilities)

        # After this point, we know this connection handler is holding onto the job lock so we don't need to acquire it again
        if not self.connection_manager.check_job_lock(self):
            raise InvalidWorkerState("Received a job when we weren't expecting one")

        gearman_job = self.connection_manager.create_job(self, job_handle, task, unique, self.decode_data(data))

        # Create a new job
        self.connection_manager.on_job_execute(gearman_job)

        # Release the job lock once we're doing and go back to sleep
        self._release_job_lock()
        self._sleep()

        return True

    def recv_job_assign(self, job_handle, task, data):
        """JOB_ASSIGN and JOB_ASSIGN_UNIQ are essentially the same"""
        return self.recv_job_assign(job_handle=job_handle, task=task, unique=None, data=data)