This file is indexed.

/usr/include/hpptools/pfor.hpp is in libhpptools-dev 1.1.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
/// @author    Matei David, Ontario Institute for Cancer Research
/// @version   2.0
/// @date      2013-2015
/// @copyright MIT Public License
///
/// A parallel for loop.
///
/// A parallel for loop that can re-sort its output, implemented in C++11.

#ifndef __PFOR_HPP
#define __PFOR_HPP

#include <algorithm>
#include <cassert>
#include <chrono>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <type_traits>
#include <vector>

namespace pfor
{

/// C++11 implementation of a parallel for loop.
///
/// Given a number of threads and a chunk size, the total work is split into
/// chunks, each chunk is processed independently by a thread. Optionally, the
/// outputs can be reordered in the same order they were read. More
/// specifically:
///   - The master thread starts `num_threads` worker threads, then waits for
///     all workers. Each worker proceeds as follows:
///   - Allocate a vector of `chunk_size` objects of type `Input` which are
///     default-initialized.
///   - Execute `do_before()`.
///   - Repeatedly, until done:
///     - Inside an input critical section: call `get_item()` repeatedly,
///       getting up to `chunk_size` items. The `get_item()` function should
///       save an item in the `Input` location given as parameter.
///     - If `Chunk_Output` is given:
///       - Allocate a new default-initialized Chunk_Output object on the heap.
///     - Repeatedly call `process_item()` in the order the items appear in the
///       thread local buffer. If `Chunk_Output` is given, the calls are passed
///       the same `Chunk_Output` object.
///     - If `Chunk_Output` is given: Inside an output critical section, the
///       `Chunk_Output` object is added to a heap. As long as the chunk that
///       must be output next is in found in the heap, `output_chunk` is called
///       on that object, after which it is destroyed.
///   - Execute `do_after()`.
///
/// To ENABLE output sorting:
/// - `Chunk_Output` should contain one (or more) ostringstream object(s);
/// - `process_item()` should take 2 arguments, the second being a
/// `Chunk_Output` object; the ostringstream inside that `Chunk_Output` should
/// be used for output inside `process_item()`;
/// - `output_chunk()` should flush the ostringstream to output.
///
/// To DISABLE output sorting:
/// - `Chunk_Output` should not be specified;
/// - `process_item()` should take 1 argument only;
/// - `output_chunk()` should not be specified.
///
/// @tparam Input Type of the input items.
/// @tparam Chunk_Output (optional) Type of an object created during the
/// processing of each chunk. Each such object is tagged with its chunk number,
/// and placed in a priority queue. The objects are destroyed inside an output
/// critical section in the order of their chunk numbers.
/// @param num_threads Number of worker threads to spawn.
/// @param chunk_size Number of items each thread should process in one chunk.
/// @param get_item Function executed inside an input critical section. If an
/// item is available, the function should save it in the `Input` location given
/// as parameter, and return true. If the items have been exhausted, the
/// function should return false.
/// @param process_item Function executed to process the given `Input` item. If
/// output sorting is enabled, the function takes as second argument a
/// `Chunk_Output` object that will be the same for all items in that chunk.
/// @param output_chunk (optional) Function called on a `Chunk_Output`
/// object inside an output critical section before it is destroyed.
/// @param do_before Function executed by each thread before any actual work.
/// @param do_after Function executed by each thread after all work.
/// @progress_report Function that will be called to report progress with 2
/// arguments: the number of items processed, and the number of seconds
/// elapsed so far.
/// @progress_count Number of items that should be processed between calls to
/// `progress_report`. (This will be rounded down to be divisible by
/// `chunk_size`).

template < typename Input >
void pfor(unsigned num_threads,
          size_t chunk_size,
          std::function< bool(Input&) > get_item,
          std::function< void(Input&) > process_item,
          std::function< void(size_t, size_t) > progress_report = nullptr,
          size_t progress_count = 0);

template < typename Input, typename Chunk_Output >
void pfor(unsigned num_threads,
          size_t chunk_size,
          std::function< bool(Input&) > get_item,
          std::function< void(Input&, Chunk_Output&) > process_item,
          std::function< void(Chunk_Output&) > output_chunk,
          std::function< void(size_t, size_t) > progress_report = nullptr,
          size_t progress_count = 0);

template < typename Input, typename Chunk_Output >
void pfor(unsigned num_threads,
          size_t chunk_size,
          std::function< bool(Input&) > get_item,
          std::function< void(Input&, Chunk_Output&) > process_item,
          std::function< void(Chunk_Output&) > output_chunk,
          std::function< void(void) > do_before,
          std::function< void(void) > do_after,
          std::function< void(size_t, size_t) > progress_report = nullptr,
          size_t progress_count = 0);

/// @cond
namespace detail
{

struct empty {};

template < typename Chunk_Output >
struct Chunk_Output_Wrapper
    : public Chunk_Output
{
    Chunk_Output_Wrapper(unsigned tid_, unsigned cid_)
        : Chunk_Output(), tid(tid_), cid(cid_) {}

    unsigned tid;
    unsigned cid;
}; // struct Chunk_Output_Wrapper

template < typename Chunk_Output >
struct Chunk_Output_Wrapper_Ptr_Comparator
{
    bool operator () (const Chunk_Output_Wrapper< Chunk_Output >* lhs_p,
                      const Chunk_Output_Wrapper< Chunk_Output >* rhs_p)
    {
        return lhs_p->cid > rhs_p->cid;
    }
};

template < typename Chunk_Output >
using Output_Heap = std::priority_queue< Chunk_Output_Wrapper< Chunk_Output >*,
                                    std::vector< Chunk_Output_Wrapper< Chunk_Output >* >,
                                    Chunk_Output_Wrapper_Ptr_Comparator< Chunk_Output > >;

template < typename Input, typename Chunk_Output >
struct Common_Storage
{
    std::function< bool(Input&) > get_item;
    std::function< void(Input&, Chunk_Output&) > process_item;
    std::function< void(Chunk_Output&) > output_chunk;
    std::function< void(void) > do_before;
    std::function< void(void) > do_after;
    std::function< void(size_t, size_t) > progress_report;
    size_t chunk_size;
    size_t progress_count;
    size_t item_count;
    size_t cid_in;
    size_t cid_out;
    std::chrono::system_clock::time_point start_time;
    std::mutex input_mutex;
    std::mutex output_mutex;
    Output_Heap< Chunk_Output > h;
}; // struct Common_Storage

template < typename Input, typename Chunk_Output, bool sort_output >
void do_work(unsigned tid, std::reference_wrapper< Common_Storage< Input, Chunk_Output > > cs_wrap)
{
    Common_Storage< Input, Chunk_Output >& cs = cs_wrap;
    Chunk_Output_Wrapper< Chunk_Output > _empty_wrapper(tid, 0);
    std::vector< Input > buff(cs.chunk_size);
    size_t load = 0;
    size_t cid;
    bool done = false;
    if (cs.do_before) cs.do_before();
    while (not done)
    {
        load = 0;
        // input critical section
        {
            std::lock_guard< std::mutex > input_lock(cs.input_mutex);
            cid = cs.cid_in++;
            while (load < cs.chunk_size and cs.get_item(buff[load]))
            {
                ++load;
            }
            done = (load < cs.chunk_size);
        }
        // parallel work
        if (load == 0)
        {
            break;
        }
        Chunk_Output_Wrapper< Chunk_Output >* cow_p = &_empty_wrapper;
        if (sort_output)
        {
            cow_p = new Chunk_Output_Wrapper< Chunk_Output >(tid, cid);
        }
        for (size_t i = 0; i < load; ++i)
        {
            cs.process_item(buff[i], *cow_p);
        }
        // output critical section
        {
            std::lock_guard< std::mutex > output_lock(cs.output_mutex);
            cs.item_count += load;
            if (cs.progress_report and cs.item_count % cs.progress_count == 0)
            {
                auto crt_time = std::chrono::system_clock::now();
                auto elapsed = std::chrono::duration_cast< std::chrono::seconds >(crt_time - cs.start_time);
                cs.progress_report(cs.item_count, elapsed.count());
            }
            if (sort_output)
            {
                cs.h.push(cow_p);
                while (cs.h.size() > 0)
                {
                    cow_p = cs.h.top();
                    assert(cow_p->cid >= cs.cid_out);
                    if (cow_p->cid > cs.cid_out)
                    {
                        break;
                    }
                    cs.h.pop();
                    if (cs.output_chunk)
                    {
                        cs.output_chunk(*cow_p);
                    }
                    delete cow_p;
                    cs.cid_out++;
                }
            }
        }
    }
    if (cs.do_after) cs.do_after();
} // do_work()

} // namespace detail
/// @endcond

template < typename Input, typename Chunk_Output >
void pfor(unsigned num_threads,
          size_t chunk_size,
          std::function< bool(Input&) > get_item,
          std::function< void(Input&, Chunk_Output&) > process_item,
          std::function< void(Chunk_Output&) > output_chunk,
          std::function< void(void) > do_before,
          std::function< void(void) > do_after,
          std::function< void(size_t, size_t) > progress_report,
          size_t progress_count)
{
    std::vector< std::thread > thread_v;
    detail::Common_Storage< Input, Chunk_Output > cs;
    cs.get_item = get_item;
    cs.process_item = process_item;
    cs.output_chunk = output_chunk;
    cs.do_before = do_before;
    cs.do_after = do_after;
    cs.progress_report = progress_report;
    cs.chunk_size = chunk_size;
    if (progress_count == 0) progress_count = 10 * num_threads * chunk_size;
    cs.progress_count = std::max(progress_count / chunk_size, (size_t)1) * chunk_size;
    cs.item_count = 0;
    cs.cid_in = 0;
    cs.cid_out = 0;
    cs.start_time = std::chrono::system_clock::now();
    static const bool sort_output = not std::is_same< Chunk_Output, detail::empty >::value;
    for (unsigned i = 0; i < num_threads; ++i)
    {
        thread_v.emplace_back(detail::do_work< Input, Chunk_Output, sort_output >, i, std::ref(cs));
    }
    for (auto& t : thread_v)
    {
        t.join();
    }
    if (cs.progress_report and cs.item_count % cs.progress_count != 0)
    {
        auto crt_time = std::chrono::system_clock::now();
        auto elapsed = std::chrono::duration_cast< std::chrono::seconds >(crt_time - cs.start_time);
        cs.progress_report(cs.item_count, elapsed.count());
    }
} // pfor()

template < typename Input >
void pfor(unsigned num_threads,
          size_t chunk_size,
          std::function< bool(Input&) > get_item,
          std::function< void(Input&) > process_item,
          std::function< void(size_t, size_t) > progress_report,
          size_t progress_count)
{
    pfor< Input, detail::empty >(
        num_threads,
        chunk_size,
        get_item,
        [&] (Input& in, detail::empty&) { process_item(in); },
        nullptr,
        nullptr,
        nullptr,
        progress_report,
        progress_count);
} // pfor()

template < typename Input, typename Chunk_Output >
void pfor(unsigned num_threads,
          size_t chunk_size,
          std::function< bool(Input&) > get_item,
          std::function< void(Input&, Chunk_Output&) > process_item,
          std::function< void(Chunk_Output&) > output_chunk,
          std::function< void(size_t, size_t) > progress_report,
          size_t progress_count)
{
    pfor< Input, Chunk_Output >(
        num_threads,
        chunk_size,
        get_item,
        process_item,
        output_chunk,
        nullptr,
        nullptr,
        progress_report,
        progress_count);
} // pfor()

} // namespace pfor

#endif

#ifdef SAMPLE_PFOR
/*

Compile with:

g++ -std=c++11 -pthread -D SAMPLE_PFOR -x c++ pfor.hpp -o pfor

*/

#include <iostream>
#include <sstream>
#include "logger.hpp"

using namespace std;

int main()
{
    logger::Logger::set_default_level(logger::info);
    //
    // pfor loop WITHOUT output sorting
    //
    vector< unsigned > v(1000);
    unsigned crt_idx = 0;
    pfor::pfor< unsigned >(
        // num_threads
        4,
        // chunk_size
        10,
        // get_item
        [&] (unsigned& i) {
            if (crt_idx >= v.size()) return false;
            i = crt_idx++;
            return true;
        },
        // process_item
        [&] (unsigned& i) {
            thread::id this_id = this_thread::get_id();
            LOG("main", info) << "process_item thread=" << this_id << " i=" << i << endl;
            v[i] = i*i;
        },
        //
        // The following 2 parameters are optional
        //
        // progress_report
        [&] (size_t items, size_t seconds) {
            cout << "processed " << items << " items in " << seconds << " seconds" << endl;
        },
        // progress_count
        110);
    //
    // pfor loop WITH output sorting
    //
    crt_idx = 0;
    pfor::pfor< unsigned, std::ostringstream >(
        // num_threads
        4,
        // chunk_size
        2,
        // get_item
        [&] (unsigned& i) {
            if (crt_idx >= 10) return false;
            i = crt_idx++;
            return true;
        },
        // process_item
        [&] (unsigned& i, std::ostringstream& os) {
            thread::id this_id = this_thread::get_id();
            unsigned long this_id_int;
            {
                ostringstream tmp_os;
                tmp_os << this_id << endl;
                istringstream(tmp_os.str()) >> this_id_int;
            }
            unsigned sleep_secs = this_id_int % 17; // 0..16
            sleep_secs = (float)sleep_secs * 3.0 / 17.0; // 0..2
            LOG("main", info) << "process_item thread=" << this_id << " i=" << i << " sleep_secs=" << sleep_secs << endl;
            os << "sorted_output thread=" << this_id << " i=" << i << endl;
            this_thread::sleep_for(chrono::duration< int >(sleep_secs));
        },
        // output_chunk
        [&] (std::ostringstream& os) {
            cout << os.str();
        });
}

#endif