This file is indexed.

/usr/share/pyshared/gearman/admin_client_handler.py is in python-gearman 2.0.2-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import collections
import logging

from gearman.command_handler import GearmanCommandHandler
from gearman.errors import ProtocolError, InvalidAdminClientState
from gearman.protocol import GEARMAN_COMMAND_ECHO_REQ, GEARMAN_COMMAND_TEXT_COMMAND, \
    GEARMAN_SERVER_COMMAND_STATUS, GEARMAN_SERVER_COMMAND_VERSION, \
    GEARMAN_SERVER_COMMAND_WORKERS, GEARMAN_SERVER_COMMAND_MAXQUEUE, GEARMAN_SERVER_COMMAND_SHUTDOWN

gearman_logger = logging.getLogger(__name__)

EXPECTED_GEARMAN_SERVER_COMMANDS = set([GEARMAN_SERVER_COMMAND_STATUS, GEARMAN_SERVER_COMMAND_VERSION, \
    GEARMAN_SERVER_COMMAND_WORKERS, GEARMAN_SERVER_COMMAND_MAXQUEUE, GEARMAN_SERVER_COMMAND_SHUTDOWN])

class GearmanAdminClientCommandHandler(GearmanCommandHandler):
    """Special GEARMAN_COMMAND_TEXT_COMMAND command handler that'll parse text responses from the server"""
    STATUS_FIELDS = 4
    WORKERS_FIELDS = 4

    def __init__(self, connection_manager=None):
        super(GearmanAdminClientCommandHandler, self).__init__(connection_manager=connection_manager)
        self._sent_commands = collections.deque()
        self._recv_responses = collections.deque()

        self._status_response = []
        self._workers_response = []

    #######################################################################
    ##### Public interface methods to be called by GearmanAdminClient #####
    #######################################################################

    @property
    def response_ready(self):
        return bool(self._recv_responses)

    def pop_response(self):
        if not self._sent_commands or not self._recv_responses:
            raise InvalidAdminClientState('Attempted to pop a response for a command that is not ready')

        sent_command = self._sent_commands.popleft()
        recv_response = self._recv_responses.popleft()
        return sent_command, recv_response

    def send_text_command(self, command_line):
        """Send our administrative text command"""
        expected_server_command = None
        for server_command in EXPECTED_GEARMAN_SERVER_COMMANDS:
            if command_line.startswith(server_command):
                expected_server_command = server_command
                break

        if not expected_server_command:
            raise ProtocolError('Attempted to send an unknown server command: %r' % command_line)

        self._sent_commands.append(expected_server_command)

        output_text = '%s\n' % command_line
        self.send_command(GEARMAN_COMMAND_TEXT_COMMAND, raw_text=output_text)

    def send_echo_request(self, echo_string):
        """Send our administrative text command"""
        self._sent_commands.append(GEARMAN_COMMAND_ECHO_REQ)

        self.send_command(GEARMAN_COMMAND_ECHO_REQ, data=echo_string)

    ###########################################################
    ### Callbacks when we receive a command from the server ###
    ###########################################################

    def recv_echo_res(self, data):
        self._recv_responses.append(data)
        return False

    def recv_text_command(self, raw_text):
        """Catch GEARMAN_COMMAND_TEXT_COMMAND's and forward them onto their respective recv_server_* callbacks"""
        if not self._sent_commands:
            raise InvalidAdminClientState('Received an unexpected server response')

        # Peek at the first command
        cmd_type = self._sent_commands[0]
        recv_server_command_function_name = 'recv_server_%s' % cmd_type

        cmd_callback = getattr(self, recv_server_command_function_name, None)
        if not cmd_callback:
            gearman_logger.error('Could not handle command: %r - %r' % (cmd_type, raw_text))
            raise ValueError('Could not handle command: %r - %r' % (cmd_type, raw_text))

        # This must match the parameter names as defined in the command handler
        completed_work = cmd_callback(raw_text)
        return completed_work

    def recv_server_status(self, raw_text):
        """Slowly assemble a server status message line by line"""
        # If we received a '.', we've finished parsing this status message
        # Pack up our output and reset our response queue
        if raw_text == '.':
            output_response = tuple(self._status_response)
            self._recv_responses.append(output_response)
            self._status_response = []
            return False

        # If we didn't get a final response, split our line and interpret all the data
        split_tokens = raw_text.split('\t')
        if len(split_tokens) != self.STATUS_FIELDS:
            raise ProtocolError('Received %d tokens, expected %d tokens: %r' % (len(split_tokens), self.STATUS_FIELDS, split_tokens))

        # Label our fields and make the results Python friendly
        task, queued_count, running_count, worker_count = split_tokens

        status_dict = {}
        status_dict['task'] = task
        status_dict['queued'] = int(queued_count)
        status_dict['running'] = int(running_count)
        status_dict['workers'] = int(worker_count)
        self._status_response.append(status_dict)
        return True

    def recv_server_version(self, raw_text):
        """Version response is a simple passthrough"""
        self._recv_responses.append(raw_text)
        return False

    def recv_server_workers(self, raw_text):
        """Slowly assemble a server workers message line by line"""
        # If we received a '.', we've finished parsing this workers message
        # Pack up our output and reset our response queue
        if raw_text == '.':
            output_response = tuple(self._workers_response)
            self._recv_responses.append(output_response)
            self._workers_response = []
            return False

        split_tokens = raw_text.split(' ')
        if len(split_tokens) < self.WORKERS_FIELDS:
            raise ProtocolError('Received %d tokens, expected >= 4 tokens: %r' % (len(split_tokens), split_tokens))

        if split_tokens[3] != ':':
            raise ProtocolError('Malformed worker response: %r' % (split_tokens, ))

        # Label our fields and make the results Python friendly
        worker_dict = {}
        worker_dict['file_descriptor'] = split_tokens[0]
        worker_dict['ip'] = split_tokens[1]
        worker_dict['client_id'] = split_tokens[2]
        worker_dict['tasks'] = tuple(split_tokens[4:])
        self._workers_response.append(worker_dict)
        return True

    def recv_server_maxqueue(self, raw_text):
        """Maxqueue response is a simple passthrough"""
        if raw_text != 'OK':
            raise ProtocolError("Expected 'OK', received: %s" % raw_text)

        self._recv_responses.append(raw_text)
        return False

    def recv_server_shutdown(self, raw_text):
        """Shutdown response is a simple passthrough"""
        self._recv_responses.append(None)
        return False