This file is indexed.

/usr/include/kvutils/kvu_message_queue.h is in libkvutils-dev 2.8.1-5build1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#ifndef INCLUDE_KVU_MESSAGE_QUEUE_H
#define INCLUDE_KVU_MESSAGE_QUEUE_H

#include <deque>

#include <errno.h>
#include <pthread.h>
#include <sys/time.h>

#include "kvu_timestamp.h"

/**
 * Default for maximum size of the queue for operation in 
 * bounded execution time mode.
 */
static const size_t msg_queue_rt_max_size_const = 1024;

/**
 * A queue implementation for sending generic messages between
 * threads.
 * 
 * All the consumer operations are real-time safe, i.e. they have 
 * bounded execution time. However, the bounded execution is 
 * guaranteed only until queue size reaches max_rt_size() items. 
 * 
 * Once queue size reached max_rt_size(), consumer operation is 
 * switched to blocking behaviour. This is done as a safety 
 * measure in real-time use.
 *
 * @author Kai Vehmanen
 */
template<class T>
class MESSAGE_QUEUE_RT_C {
    
public:

  /**
   * Class constructor.
   *
   * @param max_rt_size Change queue behaviour to be non-determistic 
   *                    if the number of queued messaes reaches this 
   *                    limit. This can be used as a safety measure
   *                    real-time applications.
   *
   * Execution note: may block, may allocate memory
   */
  MESSAGE_QUEUE_RT_C(int max_rt_size = -1)
    : pending_pops_rep(0) {
    pthread_mutex_init(&lock_rep, NULL);
    pthread_cond_init(&cond_rep, NULL);

    if (max_rt_size == -1)
      max_rt_size_rep = msg_queue_rt_max_size_const;
    else 
      max_rt_size_rep = static_cast<size_t>(max_rt_size);
  }

  /**
   * Adds a new item to the end of the queue.
   *
   * Execution note: may block, may allocate memory
   */
  void push_back(const T& arg) {
    pthread_mutex_lock(&lock_rep);
    msgs_rep.push_back(arg);
    pthread_cond_broadcast(&cond_rep);
    pthread_mutex_unlock(&lock_rep);
  }

  /**
   * Fetches, and removes, the front item in the queue.
   * If the queue is empty, an error is returned.
   *
   * Execution note: rt-safe, does not block
   *
   * @return 1 on success, -1 if busy, 0 if empty
   */
  int pop_front(T* front_msg) {
    int res = 1;
    int lockres = pthread_mutex_trylock(&lock_rep);

    if (lockres == 0) {
      if (msgs_rep.size() > 0) {
	if (front_msg != 0)
	  *front_msg = msgs_rep.front();
	msgs_rep.pop_front();
      }
      else {
	res = 0;
      }
      pthread_mutex_unlock(&lock_rep);
    }
    else {
      res = -1;
    }

    return res;
  }

  /**
   * Clears the queue
   *
   * Execution note: may block
   */
  void clear(void) {
    pthread_mutex_lock(&lock_rep);
    msgs_rep.clear();
    pthread_cond_broadcast(&cond_rep);
    pthread_mutex_unlock(&lock_rep);
  }

  /**
   * Blocks until 'is_empty() != true'. 'timeout_sec' and
   * 'timeout_usec' specify the upper time limit for blocking.
   *
   * Execution: may block, may allocate memory
   *
   * @pre is_empty() != true
   */
  void poll(int timeout_sec, long int timeout_usec) {
    struct timeval nowtmp;
    struct timespec now, timeout;
    int retcode = 0;

    gettimeofday(&nowtmp, NULL);

    now.tv_sec = nowtmp.tv_sec;
    now.tv_nsec = nowtmp.tv_usec * 1000;
    timeout.tv_sec = timeout_sec;
    timeout.tv_nsec = timeout_usec * 1000;
    kvu_timespec_add(&now, &timeout, &timeout);

    pthread_mutex_lock(&lock_rep);
    while (msgs_rep.empty() == true && retcode != ETIMEDOUT) {
      retcode = pthread_cond_timedwait(&cond_rep, &lock_rep, &timeout);
    }
    pthread_mutex_unlock(&lock_rep);
    return;
  }

  /**
   * Is queue empty?
   *
   * Execution note: rt-safe if queue size within 'max_rt_size'
   */
  bool is_empty(void) const {
    bool emptyres = false;
    int ret = pthread_mutex_trylock(&lock_rep);

    /* note: msgs_rep.size() is accessed without holding 
     *       a lock, but that's safe as in the worst case 
     *       caller blocks unnecessarily */
    if (ret != 0 && 
	msgs_rep.size() >= max_rt_size_rep) {

      /* note: queue has grown beyond the rt-safe maximum size, 
       *       change to non-bounded mode to force synchronization
       *       between the producer and consumer threads 
       */
      ret = pthread_mutex_lock(&lock_rep);
    }

    if (ret == 0) {
      emptyres = (msgs_rep.size() == 0);
      pthread_mutex_unlock(&lock_rep);
    }

    return emptyres;
  }

  size_t max_rt_size(void) const {
    return max_rt_size_rep;
  }

private:

  mutable pthread_mutex_t lock_rep; // mutex ensuring exclusive access to buffer
  mutable pthread_cond_t cond_rep;
  
  size_t max_rt_size_rep;   // only modified in constructor
  size_t pending_pops_rep; 

  T invalid_rep;            // only modified in constructor
  std::deque<T> msgs_rep;

};

#endif /* INCLUDE_KVU_MESSAGE_QUEUE_H */