/usr/share/pyshared/pika/heartbeat.py is in python-pika 0.9.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | # ***** BEGIN LICENSE BLOCK *****
#
# For copyright and licensing please refer to COPYING.
#
# ***** END LICENSE BLOCK *****
from pika.frame import Heartbeat
MAX_MISSED_HEARTBEATS = 2
class HeartbeatChecker(object):
def __init__(self, connection, interval):
"""
Create a heartbeat on connection sending a heartbeat frame every
interval seconds.
"""
# We need to reference our connection object to close a connection
self.connection = connection
self.interval = interval
# Initialize our counters
self.missed = 0
self.received = 0
self.sent = 0
# Setup our timer to fire every interval seconds
self.setup_timer()
def setup_timer(self):
"""
Use the connection objects delayed_call function which is implemented
by the Adapter for calling the
check_heartbeats function every interval seconds
"""
self.connection.add_timeout(self.interval, self.send_and_check)
def send_and_check(self):
"""
Invoked by a timer to send a heartbeat when we need to, check to see
if we've missed any heartbeats and disconnect our connection if it's
been idle too long
"""
# If our received byte count == our connection object's received byte
# count, the connection has been
# stale since the last heartbeat
if self.received == self.connection.bytes_received:
self.missed += 1
else:
# The server has said something. Reset our count.
self.missed = 0
# If we've missed MAX_MISSED_HEARTBEATS, close the connection
if self.missed >= MAX_MISSED_HEARTBEATS:
duration = self.missed * self.interval
reason = "Too Many Missed Heartbeats, No reply in %i seconds" % \
duration
self.connection.close(320, reason)
return
# If we've not sent a heartbeat since the last time we ran this
# function, send a new heartbeat frame
if self.sent == self.connection.bytes_sent:
self.connection._send_frame(Heartbeat())
# Get the current byte counters from the connection, we expect these
# to increment on our next call
self.sent = self.connection.bytes_sent
self.received = self.connection.bytes_received
# If we're still relevant to the connection, add another timeout for
# our interval
if self.connection.heartbeat is self:
self.setup_timer()
|