This file is indexed.

/usr/share/rostest/nodes/hztest is in python-rostest 1.13.5+ds1-3.

This file is owned by root:root, with mode 0o755.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above
#    copyright notice, this list of conditions and the following
#    disclaimer in the documentation and/or other materials provided
#    with the distribution.
#  * Neither the name of Willow Garage, Inc. nor the names of its
#    contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$

## Integration test node that subscribes to any topic and verifies
## the publishing rate to be within a specified bounds. The following
## parameters must be set:
##
##  * ~/hz: expected hz
##  * ~/hzerror: errors bound for hz
##  * ~/test_duration: time (in secs) to run test
## 

from __future__ import print_function

import sys
import threading
import time
import unittest

import rospy
import rostest

NAME = 'hztest'

from threading import Thread

class HzTest(unittest.TestCase):
    def __init__(self, *args):
        super(HzTest, self).__init__(*args)
        rospy.init_node(NAME)

        self.lock = threading.Lock()
        self.message_received = False

    def setUp(self):
        self.errors = []
        # Count of all messages received
        self.msg_count = 0
        # Time of first message received
        self.msg_t0 = -1.0
        # Time of last message received
        self.msg_tn = -1.0

    ## performs two tests of a node, first with /rostime off, then with /rostime on
    def test_hz(self):
        # Fetch parameters
        try:
            # expected publishing rate
            hz = float(rospy.get_param('~hz'))
            # length of test
            test_duration = float(rospy.get_param('~test_duration'))
            # topic to test
            topic = rospy.get_param('~topic')
            # time to wait before
            wait_time = rospy.get_param('~wait_time', 20.)            
        except KeyError as e:
            self.fail('hztest not initialized properly. Parameter [%s] not set. debug[%s] debug[%s]'%(str(e), rospy.get_caller_id(), rospy.resolve_name(e.args[0])))

        # We only require hzerror if hz is non-zero
        hzerror = 0.0
        if hz != 0.0:
            try:
                # margin of error allowed
                hzerror = float(rospy.get_param('~hzerror'))
            except KeyError as e:
                self.fail('hztest not initialized properly. Parameter [%s] not set. debug[%s] debug[%s]'%(str(e), rospy.get_caller_id(), rospy.resolve_name(e.args[0])))

        # We optionally check each inter-message interval
        try:
            self.check_intervals = bool(rospy.get_param('~check_intervals'))
        except KeyError:
            self.check_intervals = False

        # We optionally measure wall clock time
        try:
            self.wall_clock = bool(rospy.get_param('~wall_clock'))
        except KeyError:
            self.wall_clock = False

        print("""Hz: %s
Hz Error: %s
Topic: %s
Test Duration: %s"""%(hz, hzerror, topic, test_duration))
        
        self._test_hz(hz, hzerror, topic, test_duration, wait_time)        
            
    def _test_hz(self, hz, hzerror, topic, test_duration, wait_time): 
        self.assert_(hz >= 0.0, "bad parameter (hz)")
        self.assert_(hzerror >= 0.0, "bad parameter (hzerror)")
        self.assert_(test_duration > 0.0, "bad parameter (test_duration)")
        self.assert_(len(topic), "bad parameter (topic)")

        if hz == 0:
            self.min_rate = 0.0
            self.max_rate = 0.0
            self.min_interval = 0.0
            self.max_interval = 0.0
        else:
            self.min_rate = hz - hzerror
            self.max_rate = hz + hzerror
            self.min_interval = 1.0 / self.max_rate
            if self.min_rate <= 0.0:
                self.max_interval = 0.0
            else:
                self.max_interval = 1.0 / self.min_rate

        # Start actual test
        sub = rospy.Subscriber(topic, rospy.AnyMsg, self.callback)
        self.assert_(not self.errors, "bad initialization state (errors)")
        
        print("Waiting for messages")
        # we have to wait until the first message is received before measuring the rate
        # as time can advance too much before publisher is up
        
        # - give the test 20 seconds to start, may parameterize this in the future
        wallclock_timeout_t = time.time() + wait_time
        while not self.message_received and time.time() < wallclock_timeout_t:
            time.sleep(0.1)
        if hz > 0.:
            self.assert_(self.message_received, "no messages before timeout")
        else:
            self.failIf(self.message_received, "message received")
            
        print("Starting rate measurement")
        if self.wall_clock:
            timeout_t = time.time() + test_duration
            while time.time() < timeout_t:
                time.sleep(0.1)
        else:
            timeout_t = rospy.get_time() + test_duration
            while rospy.get_time() < timeout_t:
                rospy.sleep(0.1)
        print("Done waiting, validating results")
        sub.unregister()

        # Check that we got at least one message
        if hz > 0:
            self.assert_(self.msg_count > 0, "no messages received")
        else:
            self.assertEquals(0, self.msg_count)
        # Check whether inter-message intervals were violated (if we were
        # checking them)
        self.assert_(not self.errors, '\n'.join(self.errors))

        # If we have a non-zero rate target, make sure that we hit it on
        # average
        if hz > 0.0:
          self.assert_(self.msg_t0 >= 0.0, "no first message received")
          self.assert_(self.msg_tn >= 0.0, "no last message received")
          dt = self.msg_tn - self.msg_t0
          self.assert_(dt > 0.0, "only one message received")
          rate = ( self.msg_count - 1) / dt
          self.assert_(rate >= self.min_rate, 
                       "average rate (%.3fHz) exceeded minimum (%.3fHz)" %
                       (rate, self.min_rate))
          self.assert_(rate <= self.max_rate, 
                       "average rate (%.3fHz) exceeded maximum (%.3fHz)" %
                       (rate, self.max_rate))
        
    def callback(self, msg):
        # flag that message has been received
        self.message_received = True         
        try:
            self.lock.acquire()

            if self.wall_clock:
                curr = time.time()
            else:
                curr_rostime = rospy.get_rostime()

                #print "CURR ROSTIME", curr_rostime.to_sec()
                
                if curr_rostime.is_zero():
                    return
                curr = curr_rostime.to_sec()
 
            if self.msg_t0 <= 0.0 or self.msg_t0 > curr:
                self.msg_t0 = curr
                self.msg_count = 1
                last = 0
            else:
                self.msg_count += 1
                last = self.msg_tn

            self.msg_tn = curr

            # If we're instructed to check each inter-message interval, do
            # so
            if self.check_intervals and last > 0:
                interval = curr - last
                if interval < self.min_interval:
                    print("CURR", str(curr), file=sys.stderr)
                    print("LAST", str(last), file=sys.stderr)
                    print("msg_count", str(self.msg_count), file=sys.stderr)
                    print("msg_tn", str(self.msg_tn), file=sys.stderr)
                    self.errors.append(
                        'min_interval exceeded: %s [actual] vs. %s [min]'%\
                        (interval, self.min_interval))
                # If max_interval is <= 0.0, then we have no max
                elif self.max_interval > 0.0 and interval > self.max_interval:
                    self.errors.append(
                        'max_interval exceeded: %s [actual] vs. %s [max]'%\
                        (interval, self.max_interval))

        finally:
            self.lock.release()
    
        
if __name__ == '__main__':
    # A dirty hack to work around an apparent race condition at startup
    # that causes some hztests to fail.  Most evident in the tests of
    # rosstage.
    time.sleep(0.75)
    try:
        rostest.run('rostest', NAME, HzTest, sys.argv)
    except KeyboardInterrupt:
        pass
    print("exiting")