/usr/share/gocode/src/github.com/influxdata/influxdb/tsdb/batcher.go is in golang-github-influxdb-influxdb-dev 1.1.1+dfsg1-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | package tsdb
import (
"sync"
"sync/atomic"
"time"
"github.com/influxdata/influxdb/models"
)
// PointBatcher accepts Points and will emit a batch of those points when either
// a) the batch reaches a certain size, or b) a certain time passes.
type PointBatcher struct {
stats PointBatcherStats
size int
duration time.Duration
stop chan struct{}
in chan models.Point
out chan []models.Point
flush chan struct{}
wg *sync.WaitGroup
}
// NewPointBatcher returns a new PointBatcher. sz is the batching size,
// bp is the maximum number of batches that may be pending. d is the time
// after which a batch will be emitted after the first point is received
// for the batch, regardless of its size.
func NewPointBatcher(sz int, bp int, d time.Duration) *PointBatcher {
return &PointBatcher{
size: sz,
duration: d,
stop: make(chan struct{}),
in: make(chan models.Point, bp*sz),
out: make(chan []models.Point),
flush: make(chan struct{}),
}
}
// PointBatcherStats are the statistics each batcher tracks.
type PointBatcherStats struct {
BatchTotal uint64 // Total count of batches transmitted.
PointTotal uint64 // Total count of points processed.
SizeTotal uint64 // Number of batches that reached size threshold.
TimeoutTotal uint64 // Number of timeouts that occurred.
}
// Start starts the batching process. Returns the in and out channels for points
// and point-batches respectively.
func (b *PointBatcher) Start() {
// Already running?
if b.wg != nil {
return
}
var timer *time.Timer
var batch []models.Point
var timerCh <-chan time.Time
emit := func() {
b.out <- batch
atomic.AddUint64(&b.stats.BatchTotal, 1)
batch = nil
}
b.wg = &sync.WaitGroup{}
b.wg.Add(1)
go func() {
defer b.wg.Done()
for {
select {
case <-b.stop:
if len(batch) > 0 {
emit()
timerCh = nil
}
return
case p := <-b.in:
atomic.AddUint64(&b.stats.PointTotal, 1)
if batch == nil {
batch = make([]models.Point, 0, b.size)
if b.duration > 0 {
timer = time.NewTimer(b.duration)
timerCh = timer.C
}
}
batch = append(batch, p)
if len(batch) >= b.size { // 0 means send immediately.
atomic.AddUint64(&b.stats.SizeTotal, 1)
emit()
timerCh = nil
}
case <-b.flush:
if len(batch) > 0 {
emit()
timerCh = nil
}
case <-timerCh:
atomic.AddUint64(&b.stats.TimeoutTotal, 1)
emit()
}
}
}()
}
// Stop stops the batching process. Stop waits for the batching routine
// to stop before returning.
func (b *PointBatcher) Stop() {
// If not running, nothing to stop.
if b.wg == nil {
return
}
close(b.stop)
b.wg.Wait()
}
// In returns the channel to which points should be written.
func (b *PointBatcher) In() chan<- models.Point {
return b.in
}
// Out returns the channel from which batches should be read.
func (b *PointBatcher) Out() <-chan []models.Point {
return b.out
}
// Flush instructs the batcher to emit any pending points in a batch, regardless of batch size.
// If there are no pending points, no batch is emitted.
func (b *PointBatcher) Flush() {
b.flush <- struct{}{}
}
// Stats returns a PointBatcherStats object for the PointBatcher. While the each statistic should be
// closely correlated with each other statistic, it is not guaranteed.
func (b *PointBatcher) Stats() *PointBatcherStats {
stats := PointBatcherStats{}
stats.BatchTotal = atomic.LoadUint64(&b.stats.BatchTotal)
stats.PointTotal = atomic.LoadUint64(&b.stats.PointTotal)
stats.SizeTotal = atomic.LoadUint64(&b.stats.SizeTotal)
stats.TimeoutTotal = atomic.LoadUint64(&b.stats.TimeoutTotal)
return &stats
}
|