/usr/lib/python2.7/dist-packages/oops_amqp/receiver.py is in python-oops-amqp 0.0.7-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | # Copyright (c) 2011, Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3 only.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# GNU Lesser General Public License version 3 (see the file LICENSE).
"""Receive OOPS reports over amqp and republish locally."""
__metaclass__ = type
import time
import anybson as bson
from utils import (
amqplib_error_types,
close_ignoring_connection_errors,
is_amqplib_connection_error,
)
__all__ = [
'Receiver',
]
class Receiver:
"""Republish OOPS reports from AMQP to a local oops.Config.
:ivar stopping: When True will cause Receiver to break out of run_forever.
Calls to run_forever reset this to False.
:ivar sentinel: If a message identical to the sentinel is received,
handle_report will set stopping to True.
"""
def __init__(self, config, connection_factory, queue_name):
"""Create a Receiver.
:param config: An oops.Config to republish the OOPS reports.
:param connection_factory: An amqplib connection factory, used to make
the initial connection and to reconnect if that connection is
interrupted.
:param queue_name: The queue to listen for reports on.
"""
self.config = config
self.connection = None
self.channel = None
self.connection_factory = connection_factory
self.queue_name = queue_name
self.sentinel = None
def handle_report(self, message):
if message.body == self.sentinel:
self.stopping = True
self.channel.basic_ack(message.delivery_tag)
return
try:
report = bson.loads(message.body)
except KeyError:
# Garbage in the queue. Possibly this should raise an OOPS itself
# (through a different config) or log an info level message.
pass
self.config.publish(report)
# ACK last so errors here don't eat the message.
self.channel.basic_ack(message.delivery_tag)
def run_forever(self):
"""Run in a loop handling messages.
If the amqp server is down or uncontactable for > 120 seconds, error
out.
"""
self.stopping = False
self.went_bad = None
while (not self.stopping and
(not self.went_bad or time.time() < self.went_bad + 120)):
try:
self._run_forever()
except amqplib_error_types, e:
if not is_amqplib_connection_error(e):
# Something unknown went wrong.
raise
if not self.went_bad:
self.went_bad = time.time()
# Don't probe immediately, give the network/process time to
# come back.
time.sleep(0.1)
def _run_forever(self):
self.connection = self.connection_factory()
# A successful connection: record this so run_forever won't bail early.
self.went_bad = None
try:
self.channel = self.connection.channel()
try:
self.consume_tag = self.channel.basic_consume(
self.queue_name, callback=self.handle_report)
try:
while True:
self.channel.wait()
if self.stopping:
break
finally:
if self.channel.is_open:
self.channel.basic_cancel(self.consume_tag)
finally:
close_ignoring_connection_errors(self.channel)
finally:
close_ignoring_connection_errors(self.connection)
|