This file is indexed.

/usr/lib/python2.7/dist-packages/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py is in python-oslo.messaging 5.35.0-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#    Copyright 2015-2016 Mirantis, Inc.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import logging
import uuid

import six

from oslo_messaging._drivers.zmq_driver.server.consumers \
    import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._drivers.zmq_driver import zmq_updater
from oslo_messaging._drivers.zmq_driver import zmq_version
from oslo_messaging._i18n import _LE, _LI

LOG = logging.getLogger(__name__)

zmq = zmq_async.import_zmq()


class SubConsumer(zmq_consumer_base.ConsumerBase):

    def __init__(self, conf, poller, server):
        super(SubConsumer, self).__init__(conf, poller, server)
        self.matchmaker = SubscriptionMatchmakerWrapper(conf,
                                                        server.matchmaker)
        self.target = server.target
        self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB,
                                           immediate=False,
                                           identity=self._generate_identity())
        self.sockets.append(self.socket)
        self.host = self.socket.handle.identity
        self._subscribe_to_topic()
        self._receive_request_versions = \
            zmq_version.get_method_versions(self, 'receive_request')
        self.connection_updater = SubscriberConnectionUpdater(
            conf, self.matchmaker, self.socket)
        self.poller.register(self.socket, self.receive_request)
        LOG.info(_LI("[%s] Run SUB consumer"), self.host)

    def _generate_identity(self):
        return six.b(self.conf.oslo_messaging_zmq.rpc_zmq_host + '/') + \
            zmq_address.target_to_subscribe_filter(self.target) + \
            six.b('/' + str(uuid.uuid4()))

    def _subscribe_to_topic(self):
        topic_filter = zmq_address.target_to_subscribe_filter(self.target)
        self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter)
        LOG.debug("[%(host)s] Subscribing to topic %(filter)s",
                  {"host": self.host, "filter": topic_filter})

    def _get_receive_request_version(self, version):
        receive_request_version = self._receive_request_versions.get(version)
        if receive_request_version is None:
            raise zmq_version.UnsupportedMessageVersionError(version)
        return receive_request_version

    def _receive_request_v_1_0(self, topic_filter, socket):
        message_type = int(socket.recv())
        assert message_type in zmq_names.MULTISEND_TYPES, "Fanout expected!"
        message_id = socket.recv()
        context, message = socket.recv_loaded()
        LOG.debug("[%(host)s] Received on topic %(filter)s message %(msg_id)s "
                  "(v%(msg_version)s)",
                  {'host': self.host,
                   'filter': topic_filter,
                   'msg_id': message_id,
                   'msg_version': '1.0'})
        return context, message

    def receive_request(self, socket):
        try:
            topic_filter = socket.recv()
            message_version = socket.recv_string()
            receive_request_version = \
                self._get_receive_request_version(message_version)
            context, message = receive_request_version(topic_filter, socket)
            return zmq_incoming_message.ZmqIncomingMessage(context, message)
        except (zmq.ZMQError, AssertionError, ValueError,
                zmq_version.UnsupportedMessageVersionError) as e:
            LOG.error(_LE("Receiving message failed: %s"), str(e))
            # NOTE(gdavoian): drop the left parts of a broken message
            if socket.getsockopt(zmq.RCVMORE):
                socket.recv_multipart()

    def cleanup(self):
        LOG.info(_LI("[%s] Destroy SUB consumer"), self.host)
        self.connection_updater.cleanup()
        super(SubConsumer, self).cleanup()


class SubscriptionMatchmakerWrapper(object):

    def __init__(self, conf, matchmaker):
        self.conf = conf
        self.matchmaker = matchmaker

    def get_publishers(self):
        conf_publishers = self.conf.oslo_messaging_zmq.subscribe_on
        LOG.debug("Publishers taken from configuration %s", conf_publishers)
        if conf_publishers:
            return [(publisher, None) for publisher in conf_publishers]
        return self.matchmaker.get_publishers()


class SubscriberConnectionUpdater(zmq_updater.ConnectionUpdater):

    def _update_connection(self):
        publishers = self.matchmaker.get_publishers()
        for publisher_address, router_address in publishers:
            self.socket.connect_to_host(publisher_address)
        LOG.debug("[%s] SUB consumer connected to publishers %s",
                  self.socket.handle.identity, publishers)