This file is indexed.

/usr/share/gocode/src/github.com/influxdata/influxdb/tsdb/batcher.go is in golang-github-influxdb-influxdb-dev 1.1.1+dfsg1-4.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package tsdb

import (
	"sync"
	"sync/atomic"
	"time"

	"github.com/influxdata/influxdb/models"
)

// PointBatcher accepts Points and will emit a batch of those points when either
// a) the batch reaches a certain size, or b) a certain time passes.
type PointBatcher struct {
	stats PointBatcherStats

	size     int
	duration time.Duration

	stop  chan struct{}
	in    chan models.Point
	out   chan []models.Point
	flush chan struct{}

	wg *sync.WaitGroup
}

// NewPointBatcher returns a new PointBatcher. sz is the batching size,
// bp is the maximum number of batches that may be pending. d is the time
// after which a batch will be emitted after the first point is received
// for the batch, regardless of its size.
func NewPointBatcher(sz int, bp int, d time.Duration) *PointBatcher {
	return &PointBatcher{
		size:     sz,
		duration: d,
		stop:     make(chan struct{}),
		in:       make(chan models.Point, bp*sz),
		out:      make(chan []models.Point),
		flush:    make(chan struct{}),
	}
}

// PointBatcherStats are the statistics each batcher tracks.
type PointBatcherStats struct {
	BatchTotal   uint64 // Total count of batches transmitted.
	PointTotal   uint64 // Total count of points processed.
	SizeTotal    uint64 // Number of batches that reached size threshold.
	TimeoutTotal uint64 // Number of timeouts that occurred.
}

// Start starts the batching process. Returns the in and out channels for points
// and point-batches respectively.
func (b *PointBatcher) Start() {
	// Already running?
	if b.wg != nil {
		return
	}

	var timer *time.Timer
	var batch []models.Point
	var timerCh <-chan time.Time

	emit := func() {
		b.out <- batch
		atomic.AddUint64(&b.stats.BatchTotal, 1)
		batch = nil
	}

	b.wg = &sync.WaitGroup{}
	b.wg.Add(1)

	go func() {
		defer b.wg.Done()
		for {
			select {
			case <-b.stop:
				if len(batch) > 0 {
					emit()
					timerCh = nil
				}
				return
			case p := <-b.in:
				atomic.AddUint64(&b.stats.PointTotal, 1)
				if batch == nil {
					batch = make([]models.Point, 0, b.size)
					if b.duration > 0 {
						timer = time.NewTimer(b.duration)
						timerCh = timer.C
					}
				}

				batch = append(batch, p)
				if len(batch) >= b.size { // 0 means send immediately.
					atomic.AddUint64(&b.stats.SizeTotal, 1)
					emit()
					timerCh = nil
				}

			case <-b.flush:
				if len(batch) > 0 {
					emit()
					timerCh = nil
				}

			case <-timerCh:
				atomic.AddUint64(&b.stats.TimeoutTotal, 1)
				emit()
			}
		}
	}()
}

// Stop stops the batching process. Stop waits for the batching routine
// to stop before returning.
func (b *PointBatcher) Stop() {
	// If not running, nothing to stop.
	if b.wg == nil {
		return
	}

	close(b.stop)
	b.wg.Wait()
}

// In returns the channel to which points should be written.
func (b *PointBatcher) In() chan<- models.Point {
	return b.in
}

// Out returns the channel from which batches should be read.
func (b *PointBatcher) Out() <-chan []models.Point {
	return b.out
}

// Flush instructs the batcher to emit any pending points in a batch, regardless of batch size.
// If there are no pending points, no batch is emitted.
func (b *PointBatcher) Flush() {
	b.flush <- struct{}{}
}

// Stats returns a PointBatcherStats object for the PointBatcher. While the each statistic should be
// closely correlated with each other statistic, it is not guaranteed.
func (b *PointBatcher) Stats() *PointBatcherStats {
	stats := PointBatcherStats{}
	stats.BatchTotal = atomic.LoadUint64(&b.stats.BatchTotal)
	stats.PointTotal = atomic.LoadUint64(&b.stats.PointTotal)
	stats.SizeTotal = atomic.LoadUint64(&b.stats.SizeTotal)
	stats.TimeoutTotal = atomic.LoadUint64(&b.stats.TimeoutTotal)
	return &stats
}