/usr/share/pyshared/glance/notifier/notify_kombu.py is in python-glance 2012.1.3+stable~20120821-120fcf-0ubuntu1.5.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 | # vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011, OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import time
import kombu.connection
import kombu.entity
from glance.common import cfg
from glance.notifier import strategy
logger = logging.getLogger('glance.notifier.notify_kombu')
class KombuMaxRetriesReached(Exception):
pass
class RabbitStrategy(strategy.Strategy):
"""A notifier that puts a message on a queue when called."""
opts = [
cfg.StrOpt('rabbit_host', default='localhost'),
cfg.IntOpt('rabbit_port', default=5672),
cfg.BoolOpt('rabbit_use_ssl', default=False),
cfg.StrOpt('rabbit_userid', default='guest'),
cfg.StrOpt('rabbit_password', default='guest'),
cfg.StrOpt('rabbit_virtual_host', default='/'),
cfg.StrOpt('rabbit_notification_exchange', default='glance'),
cfg.StrOpt('rabbit_notification_topic',
default='glance_notifications'),
cfg.StrOpt('rabbit_max_retries', default=0),
cfg.StrOpt('rabbit_retry_backoff', default=2),
cfg.StrOpt('rabbit_retry_max_backoff', default=30)
]
def __init__(self, conf):
"""Initialize the rabbit notification strategy."""
self._conf = conf
self._conf.register_opts(self.opts)
self.topic = self._conf.rabbit_notification_topic
self.max_retries = self._conf.rabbit_max_retries
# NOTE(comstud): When reading the config file, these values end
# up being strings, and we need them as ints.
self.retry_backoff = int(self._conf.rabbit_retry_backoff)
self.retry_max_backoff = int(self._conf.rabbit_retry_max_backoff)
self.connection = None
self.retry_attempts = 0
try:
self.reconnect()
except KombuMaxRetriesReached:
pass
def _close(self):
"""Close connection to rabbit."""
try:
self.connection.close()
except self.connection_errors:
pass
self.connection = None
def _connect(self):
"""Connect to rabbit. Exceptions should be handled by the
caller.
"""
log_info = {}
log_info['hostname'] = self._conf.rabbit_host
log_info['port'] = self._conf.rabbit_port
if self.connection:
logger.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % log_info)
self._close()
else:
logger.info(_("Connecting to AMQP server on "
"%(hostname)s:%(port)d") % log_info)
self.connection = kombu.connection.BrokerConnection(
hostname=self._conf.rabbit_host,
port=self._conf.rabbit_port,
userid=self._conf.rabbit_userid,
password=self._conf.rabbit_password,
virtual_host=self._conf.rabbit_virtual_host,
ssl=self._conf.rabbit_use_ssl)
self.connection_errors = self.connection.connection_errors
self.connection.connect()
self.channel = self.connection.channel()
self.exchange = kombu.entity.Exchange(
channel=self.channel,
type="topic",
name=self._conf.rabbit_notification_exchange)
# NOTE(jerdfelt): Normally the consumer would create the queues,
# but we do this to ensure that messages don't get dropped if the
# consumer is started after we do
for priority in ["WARN", "INFO", "ERROR"]:
routing_key = "%s.%s" % (self.topic, priority.lower())
queue = kombu.entity.Queue(
channel=self.channel,
exchange=self.exchange,
durable=True,
name=routing_key,
routing_key=routing_key)
queue.declare()
logger.info(_("Connected to AMQP server on "
"%(hostname)s:%(port)d") % log_info)
def reconnect(self):
"""Handles reconnecting and re-establishing queues."""
while True:
self.retry_attempts += 1
try:
self._connect()
return
except self.connection_errors, e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
# a protocol response. (See paste link in LP888621 for
# nova.) So, we check all exceptions for 'timeout' in them
# and try to reconnect in this case.
if 'timeout' not in str(e):
raise
log_info = {}
log_info['err_str'] = str(e)
log_info['max_retries'] = self.max_retries
log_info['hostname'] = self._conf.rabbit_host
log_info['port'] = self._conf.rabbit_port
if self.max_retries and self.retry_attempts >= self.max_retries:
logger.exception(_('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info)
if self.connection:
self._close()
raise KombuMaxRetriesReached
sleep_time = self.retry_backoff * self.retry_attempts
if self.retry_max_backoff:
sleep_time = min(sleep_time, self.retry_max_backoff)
log_info['sleep_time'] = sleep_time
logger.exception(_('AMQP server on %(hostname)s:%(port)d is'
' unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time)
def log_failure(self, msg, priority):
"""Fallback to logging when we can't send to rabbit."""
message = _('Notification with priority %(priority)s failed: '
'msg=%(msg)s')
logger.error(message % {'msg': msg, 'priority': priority})
def _send_message(self, msg, routing_key):
"""Send a message. Caller needs to catch exceptions for retry."""
msg = self.exchange.Message(json.dumps(msg),
content_type='application/json')
self.exchange.publish(msg, routing_key=routing_key)
def _notify(self, msg, priority):
"""Send a notification and retry if needed."""
self.retry_attempts = 0
if not self.connection:
try:
self.reconnect()
except KombuMaxRetriesReached:
self.log_failure(msg, priority)
return
routing_key = "%s.%s" % (self.topic, priority.lower())
while True:
try:
self._send_message(msg, routing_key)
return
except self.connection_errors, e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
# a protocol response. (See paste link in LP888621 for
# nova.) So, we check all exceptions for 'timeout' in them
# and try to reconnect in this case.
if 'timeout' not in str(e):
raise
logger.exception(_("Unable to send notification: %s") % str(e))
try:
self.reconnect()
except KombuMaxRetriesReached:
break
self.log_failure(msg, priority)
def warn(self, msg):
self._notify(msg, "WARN")
def info(self, msg):
self._notify(msg, "INFO")
def error(self, msg):
self._notify(msg, "ERROR")
|