This file is indexed.

/usr/share/gocode/src/github.com/influxdata/influxdb/stress/v2/statement/insert.go is in golang-github-influxdb-influxdb-dev 1.1.1+dfsg1-4.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
package statement

import (
	"bytes"
	"fmt"
	"log"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/influxdata/influxdb/stress/v2/stress_client"
)

// InsertStatement is a Statement Implementation that creates points to be written to the target InfluxDB instance
type InsertStatement struct {
	TestID      string
	StatementID string

	// Statement Name
	Name string

	// Template string for points. Filled by the output of stringers
	TemplateString string

	// TagCount is used to find the number of series in the dataset
	TagCount int

	// The Tracer prevents InsertStatement.Run() from returning early
	Tracer *stressClient.Tracer

	// Timestamp is #points to write and percision
	Timestamp *Timestamp

	// Templates turn into stringers
	Templates Templates
	stringers Stringers

	// Number of series in this insert Statement
	series int

	// Returns the proper time for the next point
	time func() int64

	// Concurrency utiliities
	sync.WaitGroup
	sync.Mutex

	// Timer for runtime and pps calculation
	runtime time.Duration
}

func (i *InsertStatement) tags() map[string]string {
	tags := map[string]string{
		"number_fields":       i.numFields(),
		"number_series":       fmtInt(i.series),
		"number_points_write": fmtInt(i.Timestamp.Count),
	}
	return tags
}

// SetID statisfies the Statement Interface
func (i *InsertStatement) SetID(s string) {
	i.StatementID = s
}

// SetVars sets up the environment for InsertStatement to call it's Run function
func (i *InsertStatement) SetVars(s *stressClient.StressTest) chan<- string {
	// Set the #series at 1 to start
	i.series = 1

	// Num series is the product of the cardinality of the tags
	for _, tmpl := range i.Templates[0:i.TagCount] {
		i.series *= tmpl.numSeries()
	}

	// make stringers from the templates
	i.stringers = i.Templates.Init(i.series)

	// Set the time function, keeps track of 'time' of the points being created
	i.time = i.Timestamp.Time(s.StartDate, i.series, s.Precision)

	// Set a commune on the StressTest
	s.Lock()
	comCh := s.SetCommune(i.Name)
	s.Unlock()

	// Set the tracer
	i.Tracer = stressClient.NewTracer(i.tags())

	return comCh
}

// Run statisfies the Statement Interface
func (i *InsertStatement) Run(s *stressClient.StressTest) {

	// Set variables on the InsertStatement and make the comCh
	comCh := i.SetVars(s)

	// TODO: Refactor to eleminate the ctr
	// Start the counter
	ctr := 0

	// Create the first bytes buffer
	buf := bytes.NewBuffer([]byte{})

	runtime := time.Now()

	for k := 0; k < i.Timestamp.Count; k++ {

		// Increment the counter. ctr == k + 1?
		ctr++

		// Make the point from the template string and the stringers
		point := fmt.Sprintf(i.TemplateString, i.stringers.Eval(i.time)...)

		// Add the string to the buffer
		buf.WriteString(point)
		// Add a newline char to seperate the points
		buf.WriteString("\n")

		// If len(batch) == batchSize then send it
		if ctr%s.BatchSize == 0 && ctr != 0 {
			b := buf.Bytes()
			// Trimming the trailing newline character
			b = b[0 : len(b)-1]

			// Create the package
			p := stressClient.NewPackage(stressClient.Write, b, i.StatementID, i.Tracer)

			// Use Tracer to wait for all operations to finish
			i.Tracer.Add(1)

			// Send the package
			s.SendPackage(p)

			// Reset the bytes Buffer
			temp := bytes.NewBuffer([]byte{})
			buf = temp
		}

		// TODO: Racy
		// Has to do with InsertStatement and QueryStatement communication
		if len(comCh) < cap(comCh) {
			select {
			case comCh <- point:
				break
			default:
				break
			}
		}

	}

	// If There are additional points remaining in the buffer send them before exiting
	if buf.Len() != 0 {
		b := buf.Bytes()
		// Trimming the trailing newline character
		b = b[0 : len(b)-1]

		// Create the package
		p := stressClient.NewPackage(stressClient.Write, b, i.StatementID, i.Tracer)

		// Use Tracer to wait for all operations to finish
		i.Tracer.Add(1)

		// Send the package
		s.SendPackage(p)
	}

	// Wait for all tracers to decrement
	i.Tracer.Wait()

	// Stop the timer
	i.runtime = time.Since(runtime)
}

// Report statisfies the Statement Interface
func (i *InsertStatement) Report(s *stressClient.StressTest) string {
	// Pull data via StressTest client
	allData := s.GetStatementResults(i.StatementID, "write")

	if allData == nil || allData[0].Series == nil {
		log.Fatalf("No data returned for write report\n  Statement Name: %v\n  Statement ID: %v\n", i.Name, i.StatementID)
	}

	ir := &insertReport{
		name:    i.Name,
		columns: allData[0].Series[0].Columns,
		values:  allData[0].Series[0].Values,
	}

	responseTimes := responseTimes(ir.columns, ir.values)

	ir.percentile = percentile(responseTimes)
	ir.avgResponseTime = avgDuration(responseTimes)
	ir.stdDevResponseTime = stddevDuration(responseTimes)
	ir.pointsPerSecond = int(float64(i.Timestamp.Count) / i.runtime.Seconds())
	ir.numRetries = countRetries(ir.columns, ir.values)
	ir.successfulWrites = countSuccesses(ir.columns, ir.values)
	ir.avgRequestBytes = numberBytes(ir.columns, ir.values)

	return ir.String()
}

func (i *InsertStatement) numFields() string {
	pt := strings.Split(i.TemplateString, " ")
	fields := strings.Split(pt[1], ",")
	return fmtInt(len(fields))
}

func fmtInt(i int) string {
	return strconv.FormatInt(int64(i), 10)
}