/usr/include/boost/graph/distributed/queue.hpp is in libboost1.46-dev 1.46.1-7ubuntu3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 | // Copyright (C) 2004-2006 The Trustees of Indiana University.
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// Authors: Douglas Gregor
// Andrew Lumsdaine
#ifndef BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
#define BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
#ifndef BOOST_GRAPH_USE_MPI
#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
#endif
#include <boost/graph/parallel/process_group.hpp>
#include <boost/optional.hpp>
#include <boost/shared_ptr.hpp>
#include <vector>
namespace boost { namespace graph { namespace distributed {
/// A unary predicate that always returns "true".
struct always_push
{
template<typename T> bool operator()(const T&) const { return true; }
};
/** A distributed queue adaptor.
*
* Class template @c distributed_queue implements a distributed queue
* across a process group. The distributed queue is an adaptor over an
* existing (local) queue, which must model the @ref Buffer
* concept. Each process stores a distinct copy of the local queue,
* from which it draws or removes elements via the @ref pop and @ref
* top members.
*
* The value type of the local queue must be a model of the @ref
* GlobalDescriptor concept. The @ref push operation of the
* distributed queue passes (via a message) the value to its owning
* processor. Thus, the elements within a particular local queue are
* guaranteed to have the process owning that local queue as an owner.
*
* Synchronization of distributed queues occurs in the @ref empty and
* @ref size functions, which will only return "empty" values (true or
* 0, respectively) when the entire distributed queue is empty. If the
* local queue is empty but the distributed queue is not, the
* operation will block until either condition changes. When the @ref
* size function of a nonempty queue returns, it returns the size of
* the local queue. These semantics were selected so that sequential
* code that processes elements in the queue via the following idiom
* can be parallelized via introduction of a distributed queue:
*
* distributed_queue<...> Q;
* Q.push(x);
* while (!Q.empty()) {
* // do something, that may push a value onto Q
* }
*
* In the parallel version, the initial @ref push operation will place
* the value @c x onto its owner's queue. All processes will
* synchronize at the call to empty, and only the process owning @c x
* will be allowed to execute the loop (@ref Q.empty() returns
* false). This iteration may in turn push values onto other remote
* queues, so when that process finishes execution of the loop body
* and all processes synchronize again in @ref empty, more processes
* may have nonempty local queues to execute. Once all local queues
* are empty, @ref Q.empty() returns @c false for all processes.
*
* The distributed queue can receive messages at two different times:
* during synchronization and when polling @ref empty. Messages are
* always received during synchronization, to ensure that accurate
* local queue sizes can be determines. However, whether @ref empty
* should poll for messages is specified as an option to the
* constructor. Polling may be desired when the order in which
* elements in the queue are processed is not important, because it
* permits fewer synchronization steps and less communication
* overhead. However, when more strict ordering guarantees are
* required, polling may be semantically incorrect. By disabling
* polling, one ensures that parallel execution using the idiom above
* will not process an element at a later "level" before an earlier
* "level".
*
* The distributed queue nearly models the @ref Buffer
* concept. However, the @ref push routine does not necessarily
* increase the result of @c size() by one (although the size of the
* global queue does increase by one).
*/
template<typename ProcessGroup, typename OwnerMap, typename Buffer,
typename UnaryPredicate = always_push>
class distributed_queue
{
typedef distributed_queue self_type;
enum {
/** Message indicating a remote push. The message contains a
* single value x of type value_type that is to be pushed on the
* receiver's queue.
*/
msg_push,
/** Push many elements at once. */
msg_multipush
};
public:
typedef ProcessGroup process_group_type;
typedef Buffer buffer_type;
typedef typename buffer_type::value_type value_type;
typedef typename buffer_type::size_type size_type;
/** Construct a new distributed queue.
*
* Build a new distributed queue that communicates over the given @p
* process_group, whose local queue is initialized via @p buffer and
* which may or may not poll for messages.
*/
explicit
distributed_queue(const ProcessGroup& process_group,
const OwnerMap& owner,
const Buffer& buffer,
bool polling = false);
/** Construct a new distributed queue.
*
* Build a new distributed queue that communicates over the given @p
* process_group, whose local queue is initialized via @p buffer and
* which may or may not poll for messages.
*/
explicit
distributed_queue(const ProcessGroup& process_group = ProcessGroup(),
const OwnerMap& owner = OwnerMap(),
const Buffer& buffer = Buffer(),
const UnaryPredicate& pred = UnaryPredicate(),
bool polling = false);
/** Construct a new distributed queue.
*
* Build a new distributed queue that communicates over the given @p
* process_group, whose local queue is default-initalized and which
* may or may not poll for messages.
*/
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
const UnaryPredicate& pred, bool polling = false);
/** Virtual destructor required with virtual functions.
*
*/
virtual ~distributed_queue() {}
/** Push an element onto the distributed queue.
*
* The element will be sent to its owner process to be added to that
* process's local queue. If polling is enabled for this queue and
* the owner process is the current process, the value will be
* immediately pushed onto the local queue.
*
* Complexity: O(1) messages of size O(sizeof(value_type)) will be
* transmitted.
*/
void push(const value_type& x);
/** Pop an element off the local queue.
*
* @p @c !empty()
*/
void pop() { buffer.pop(); }
/**
* Return the element at the top of the local queue.
*
* @p @c !empty()
*/
value_type& top() { return buffer.top(); }
/**
* \overload
*/
const value_type& top() const { return buffer.top(); }
/** Determine if the queue is empty.
*
* When the local queue is nonempty, returns @c true. If the local
* queue is empty, synchronizes with all other processes in the
* process group until either (1) the local queue is nonempty
* (returns @c true) (2) the entire distributed queue is empty
* (returns @c false).
*/
bool empty() const;
/** Determine the size of the local queue.
*
* The behavior of this routine is equivalent to the behavior of
* @ref empty, except that when @ref empty returns true this
* function returns the size of the local queue and when @ref empty
* returns false this function returns zero.
*/
size_type size() const;
// private:
/** Synchronize the distributed queue and determine if all queues
* are empty.
*
* \returns \c true when all local queues are empty, or false if at least
* one of the local queues is nonempty.
* Defined as virtual for derived classes like depth_limited_distributed_queue.
*/
virtual bool do_synchronize() const;
private:
// Setup triggers
void setup_triggers();
// Message handlers
void
handle_push(int source, int tag, const value_type& value,
trigger_receive_context);
void
handle_multipush(int source, int tag, const std::vector<value_type>& values,
trigger_receive_context);
mutable ProcessGroup process_group;
OwnerMap owner;
mutable Buffer buffer;
UnaryPredicate pred;
bool polling;
typedef std::vector<value_type> outgoing_buffer_t;
typedef std::vector<outgoing_buffer_t> outgoing_buffers_t;
shared_ptr<outgoing_buffers_t> outgoing_buffers;
};
/// Helper macro containing the normal names for the template
/// parameters to distributed_queue.
#define BOOST_DISTRIBUTED_QUEUE_PARMS \
typename ProcessGroup, typename OwnerMap, typename Buffer, \
typename UnaryPredicate
/// Helper macro containing the normal template-id for
/// distributed_queue.
#define BOOST_DISTRIBUTED_QUEUE_TYPE \
distributed_queue<ProcessGroup, OwnerMap, Buffer, UnaryPredicate>
/** Synchronize all processes involved with the given distributed queue.
*
* This function will synchronize all of the local queues for a given
* distributed queue, by ensuring that no additional messages are in
* transit. It is rarely required by the user, because most
* synchronization of distributed queues occurs via the @c empty or @c
* size methods.
*/
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
inline void
synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE& Q)
{ Q.do_synchronize(); }
/// Construct a new distributed queue.
template<typename ProcessGroup, typename OwnerMap, typename Buffer>
inline distributed_queue<ProcessGroup, OwnerMap, Buffer>
make_distributed_queue(const ProcessGroup& process_group,
const OwnerMap& owner,
const Buffer& buffer,
bool polling = false)
{
typedef distributed_queue<ProcessGroup, OwnerMap, Buffer> result_type;
return result_type(process_group, owner, buffer, polling);
}
} } } // end namespace boost::graph::distributed
#include <boost/graph/distributed/detail/queue.ipp>
#undef BOOST_DISTRIBUTED_QUEUE_TYPE
#undef BOOST_DISTRIBUTED_QUEUE_PARMS
#endif // BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
|