This file is indexed.

/usr/lib/python3/dist-packages/protobix/senderprotocol.py is in python3-protobix 1.0.0-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
import struct
import sys
import time
import re

import socket
try: import simplejson as json
except ImportError: import json # pragma: no cover

from .zabbixagentconfig import ZabbixAgentConfig

if sys.version_info < (3,): # pragma: no cover
    def b(x):
        return x
else: # pragma: no cover
    import codecs
    def b(x):
        return codecs.utf_8_encode(x)[0]

HAVE_DECENT_SSL = False
if sys.version_info > (2,7,9):
    import ssl
    # Zabbix force TLSv1.2 protocol
    # in src/libs/zbxcrypto/tls.c function zbx_tls_init_child
    ZBX_TLS_PROTOCOL=ssl.PROTOCOL_TLSv1_2
    HAVE_DECENT_SSL = True

ZBX_HDR = "ZBXD\1"
ZBX_HDR_SIZE = 13
ZBX_RESP_REGEX = r'[Pp]rocessed:? (\d+);? [Ff]ailed:? (\d+);? ' + \
                 r'[Tt]otal:? (\d+);? [Ss]econds spent:? (\d+\.\d+)'

class SenderProtocol(object):

    REQUEST = "sender data"
    _logger = None

    def __init__(self, logger=None):
        self._config = ZabbixAgentConfig()
        self._items_list = []
        self.socket = None
        if logger: # pragma: no cover
            self._logger = logger

    @property
    def server_active(self):
        return self._config.server_active

    @server_active.setter
    def server_active(self, value):
        if self._logger: # pragma: no cover
            self._logger.debug(
                "Replacing server_active  '%s' with '%s'" %
                (self._config.server_active, value)
            )
        self._config.server_active = value

    @property
    def server_port(self):
        return self._config.server_port

    @server_port.setter
    def server_port(self, value):
        if self._logger: # pragma: no cover
            self._logger.debug(
                "Replacing server_port  '%s' with '%s'" %
                (self._config.server_port, value)
            )
        self._config.server_port = value

    @property
    def debug_level(self):
        return self._config.debug_level

    @debug_level.setter
    def debug_level(self, value):
        if self._logger: # pragma: no cover
            self._logger.debug(
                "Replacing debug_level  '%s' with '%s'" %
                (self._config.debug_level, value)
            )
        self._config.debug_level = value

    @property
    def items_list(self):
        return self._items_list

    @property
    def clock(self):
        return int(time.time())

    def _send_to_zabbix(self, item):
        if self._logger: # pragma: no cover
            self._logger.info(
                "Send data to Zabbix Server"
            )

        # Format data to be sent
        if self._logger: # pragma: no cover
            self._logger.debug(
                "Building packet to be sent to Zabbix Server"
            )
        payload = json.dumps({"data": item,
                              "request": self.REQUEST,
                              "clock": self.clock })
        if self._logger: # pragma: no cover
            self._logger.debug('About to send: ' + str(payload))
        data_length = len(payload)
        data_header = struct.pack('<Q', data_length)
        packet = b(ZBX_HDR) + data_header + b(payload)
        if self._logger: # pragma: no cover
            self._logger.debug(
                "Sending packet to Zabbix Server"
            )
        # Send payload to Zabbix Server
        self._socket().sendall(packet)

    def _read_from_zabbix(self):
        recv_length = 4096
        zbx_srv_resp_data = b''

        # Read Zabbix server answer
        if self._logger: # pragma: no cover
            self._logger.info(
                "Reading Zabbix Server's answer"
            )
        while recv_length >= 4096:
            _buffer = self._socket().recv(4096)
            zbx_srv_resp_data += _buffer
            recv_length = len(_buffer)

        _buffer = None
        recv_length = None
        # Check that we have a valid Zabbix header mark
        if self._logger: # pragma: no cover
            self._logger.debug(
                "Checking Zabbix headers"
            )
        assert zbx_srv_resp_data[:5] == b(ZBX_HDR)

        # Extract response body length from packet
        zbx_srv_resp_body_len = struct.unpack('<Q', zbx_srv_resp_data[5:ZBX_HDR_SIZE])[0]

        # Extract response body
        if self._logger: # pragma: no cover
            self._logger.debug(
                "Extracting answer's body"
            )
        body_offset=ZBX_HDR_SIZE+zbx_srv_resp_body_len
        zbx_srv_resp_body = zbx_srv_resp_data[ZBX_HDR_SIZE:body_offset]

        # Check that we have read the whole packet
        assert zbx_srv_resp_data[body_offset:] == b''

        if self._logger: # pragma: no cover
            self._logger.debug(
                "Building JSON object to be analyzed"
            )
        if sys.version_info[0] >= 3: # pragma: no cover
            zbx_srv_resp_body = zbx_srv_resp_body.decode()
        # Analyze Zabbix answer
        response, processed, failed, total, time = self._handle_response(zbx_srv_resp_body)

        # Return Zabbix Server answer as JSON
        return response, processed, failed, total, time

    def _handle_response(self, zbx_answer):
        """
        Analyze Zabbix Server response
        Returns a list with number of:
        * processed items
        * failed items
        * total items
        * time spent

        :zbx_answer: Zabbix server response as string
        """
        zbx_answer = json.loads(zbx_answer)
        if self._logger: # pragma: no cover
            self._logger.info(
                "Anaylizing Zabbix Server's answer"
            )
            if zbx_answer:
                self._logger.debug("Zabbix Server response is: [%s]" % zbx_answer)

        # Default items number in length of th storage list
        nb_item = len(self._items_list)
        if self._config.debug_level >= 4:
            # If debug enabled, force it to 1
            nb_item = 1

        # If dryrun is disabled, we can process answer
        response = zbx_answer.get('response')
        result = re.findall(ZBX_RESP_REGEX, zbx_answer.get('info'))
        processed, failed, total, time = result[0]

        return response, int(processed), int(failed), int(total), float(time)

    def _socket_reset(self):
        if self.socket:
            if self._logger: # pragma: no cover
                self._logger.info(
                    "Reset socket"
                )
            self.socket.close()
            self.socket = None

    def _socket(self):
        # If socket already exists, use it
        if self.socket is not None:
            if self._logger: # pragma: no cover
                self._logger.debug(
                    "Using existing socket"
                )
            return self.socket

        # If not, we have to create it
        if self._logger: # pragma: no cover
            self._logger.debug(
                "Setting socket options"
            )
        socket.setdefaulttimeout(self._config.timeout)
        # Connect to Zabbix server or proxy with provided config options
        if self._logger: # pragma: no cover
            self._logger.info(
                "Creating new socket"
            )
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        # TLS is enabled, let's set it up
        if self._config.tls_connect != 'unencrypted' and HAVE_DECENT_SSL is True:
            if self._logger: # pragma: no cover
                self._logger.info(
                    'Configuring TLS to %s' % str(self._config.tls_connect)
                )
            # Setup TLS context & Wrap socket
            self.socket = self._init_tls()
            if self._logger: # pragma: no cover
                self._logger.info(
                    'Network socket initialized with TLS support'
                )

        if self._logger and isinstance(self.socket, socket.socket): # pragma: no cover
            self._logger.info(
                'Network socket initialized with no TLS'
            )
        # Connect to Zabbix Server
        self.socket.connect(
            (self._config.server_active, self._config.server_port)
        )
        #if isinstance(self.socket, ssl.SSLSocket):
        #    server_cert = self.socket.getpeercert()
        #    if self._config.tls_server_cert_issuer:
        #        print(server_cert['issuer'])
        #        assert server_cert['issuer'] == self._config.tls_server_cert_issuer
        #        self._logger.info(
        #            'Server certificate issuer is %s' %
        #            server_cert['issuer']
        #        )
        #    if self._config.tls_server_cert_subject:
        #        print(server_cert['subject'])
        #        assert server_cert['subject'] == self._config.tls_server_cert_subject
        #        self._logger.info(
        #            'Server certificate subject is %s' %
        #            server_cert['subject']
        #        )

        return self.socket

    """
    Manage TLS context & Wrap socket
    Returns ssl.SSLSocket if TLS enabled
            socket.socket if TLS disabled
    """
    def _init_tls(self):
        # Create a SSLContext and configure it
        if self._logger: # pragma: no cover
            self._logger.info(
                "Initialize TLS context"
            )
        ssl_context = ssl.SSLContext(ZBX_TLS_PROTOCOL)
        if self._logger: # pragma: no cover
            self._logger.debug(
                'Setting TLS verify_mode to ssl.CERT_REQUIRED'
            )
        ssl_context.verify_mode = ssl.CERT_REQUIRED

        # Avoid CRIME and related attacks
        if self._logger: # pragma: no cover
            self._logger.debug(
                'Setting TLS option ssl.OP_NO_COMPRESSION'
            )
        ssl_context.options |= ssl.OP_NO_COMPRESSION
        ssl_context.verify_flags =  ssl.VERIFY_X509_STRICT

        # If tls_connect is cert, we must provide client cert file & key
        if self._config.tls_connect == 'cert':
            if self._logger: # pragma: no cover
                self._logger.debug(
                    "Using provided TLSCertFile %s" % self._config.tls_cert_file
                )
                self._logger.debug(
                    "Using provided TLSKeyFile %s" % self._config.tls_key_file
                )
            ssl_context.load_cert_chain(
                self._config.tls_cert_file,
                self._config.tls_key_file
            )
        elif self._config.tls_connect == 'psk':
            raise NotImplementedError

        # If provided, use CA file & enforce server certificate chek
        if self._config.tls_ca_file:
            if self._logger: # pragma: no cover
                self._logger.debug(
                    "Using provided TLSCAFile %s" % self._config.tls_ca_file
                )
            ssl_context.load_default_certs(ssl.Purpose.SERVER_AUTH)
            ssl_context.load_verify_locations(
                cafile=self._config.tls_ca_file
            )

        # If provided, use CRL file & enforce server certificate check
        if self._config.tls_crl_file:
            if self._logger: # pragma: no cover
                self._logger.debug(
                    "Using provided TLSCRLFile %s" % self._config.tls_crl_file
                )
            ssl_context.verify_flags |=  ssl.VERIFY_CRL_CHECK_LEAF
            ssl_context.load_verify_locations(
                cafile=self._config.tls_crl_file
            )

        ## If provided enforce server cert issuer check
        #if self._config.tls_server_cert_issuer:
        #    verify_issuer

        ## If provided enforce server cert subject check
        #if self._config.tls_server_cert_issuer:
        #    verify_issuer

        # Once configuration is done, wrap network socket to TLS context
        tls_socket = ssl_context.wrap_socket(
            self.socket
        )
        assert isinstance(tls_socket, ssl.SSLSocket)
        return tls_socket