This file is indexed.

/usr/lib/python3/dist-packages/pika/heartbeat.py is in python3-pika 0.11.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""Handle AMQP Heartbeats"""
import logging

from pika import frame

LOGGER = logging.getLogger(__name__)


class HeartbeatChecker(object):
    """Checks to make sure that our heartbeat is received at the expected
    intervals.

    """
    MAX_IDLE_COUNT = 2
    _CONNECTION_FORCED = 320
    _STALE_CONNECTION = "Too Many Missed Heartbeats, No reply in %i seconds"

    def __init__(self, connection, interval, idle_count=MAX_IDLE_COUNT):
        """Create a heartbeat on connection sending a heartbeat frame every
        interval seconds.

        :param pika.connection.Connection: Connection object
        :param int interval: Heartbeat check interval
        :param int idle_count: Number of heartbeat intervals missed until the
                               connection is considered idle and disconnects

        """
        self._connection = connection
        self._interval = interval
        self._max_idle_count = idle_count

        # Initialize counters
        self._bytes_received = 0
        self._bytes_sent = 0
        self._heartbeat_frames_received = 0
        self._heartbeat_frames_sent = 0
        self._idle_byte_intervals = 0

        # The handle for the last timer
        self._timer = None

        # Setup the timer to fire in _interval seconds
        self._setup_timer()

    @property
    def active(self):
        """Return True if the connection's heartbeat attribute is set to this
        instance.

        :rtype True

        """
        return self._connection.heartbeat is self

    @property
    def bytes_received_on_connection(self):
        """Return the number of bytes received by the connection bytes object.

        :rtype int

        """
        return self._connection.bytes_received

    @property
    def connection_is_idle(self):
        """Returns true if the byte count hasn't changed in enough intervals
        to trip the max idle threshold.

        """
        return self._idle_byte_intervals >= self._max_idle_count

    def received(self):
        """Called when a heartbeat is received"""
        LOGGER.debug('Received heartbeat frame')
        self._heartbeat_frames_received += 1

    def send_and_check(self):
        """Invoked by a timer to send a heartbeat when we need to, check to see
        if we've missed any heartbeats and disconnect our connection if it's
        been idle too long.

        """
        LOGGER.debug('Received %i heartbeat frames, sent %i',
                     self._heartbeat_frames_received,
                     self._heartbeat_frames_sent)

        if self.connection_is_idle:
            return self._close_connection()

        # Connection has not received any data, increment the counter
        if not self._has_received_data:
            self._idle_byte_intervals += 1
        else:
            self._idle_byte_intervals = 0

        # Update the counters of bytes sent/received and the frames received
        self._update_counters()

        # Send a heartbeat frame
        self._send_heartbeat_frame()

        # Update the timer to fire again
        self._start_timer()

    def stop(self):
        """Stop the heartbeat checker"""
        if self._timer:
            LOGGER.debug('Removing timeout for next heartbeat interval')
            self._connection.remove_timeout(self._timer)
            self._timer = None

    def _close_connection(self):
        """Close the connection with the AMQP Connection-Forced value."""
        LOGGER.info('Connection is idle, %i stale byte intervals',
                    self._idle_byte_intervals)
        duration = self._max_idle_count * self._interval
        text = HeartbeatChecker._STALE_CONNECTION % duration

        # NOTE: this won't achieve the perceived effect of sending
        # Connection.Close to broker, because the frame will only get buffered
        # in memory before the next statement terminates the connection.
        self._connection.close(HeartbeatChecker._CONNECTION_FORCED, text)

        self._connection._on_terminate(HeartbeatChecker._CONNECTION_FORCED,
                                       text)

    @property
    def _has_received_data(self):
        """Returns True if the connection has received data on the connection.

        :rtype: bool

        """
        return not self._bytes_received == self.bytes_received_on_connection

    @staticmethod
    def _new_heartbeat_frame():
        """Return a new heartbeat frame.

        :rtype pika.frame.Heartbeat

        """
        return frame.Heartbeat()

    def _send_heartbeat_frame(self):
        """Send a heartbeat frame on the connection.

        """
        LOGGER.debug('Sending heartbeat frame')
        self._connection._send_frame(self._new_heartbeat_frame())
        self._heartbeat_frames_sent += 1

    def _setup_timer(self):
        """Use the connection objects delayed_call function which is
        implemented by the Adapter for calling the check_heartbeats function
        every interval seconds.

        """
        self._timer = self._connection.add_timeout(self._interval,
                                                   self.send_and_check)

    def _start_timer(self):
        """If the connection still has this object set for heartbeats, add a
        new timer.

        """
        if self.active:
            self._setup_timer()

    def _update_counters(self):
        """Update the internal counters for bytes sent and received and the
        number of frames received

        """
        self._bytes_sent = self._connection.bytes_sent
        self._bytes_received = self._connection.bytes_received