This file is indexed.

/usr/share/gocode/src/github.com/influxdata/influxdb/stress/v2/stress_client/stress_client_write.go is in golang-github-influxdb-influxdb-dev 1.1.1+dfsg1-4.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
package stressClient

import (
	"bytes"
	"fmt"
	"io"
	"io/ioutil"
	"net/http"
	"time"
)

// ###############################################
// A selection of methods to manage the write path
// ###############################################

// Packages up Package from channel in goroutine
func (sc *stressClient) spinOffWritePackage(p Package, serv int) {
	sc.Add(1)
	sc.wc.Increment()
	go func() {
		sc.retry(p, time.Duration(time.Nanosecond), serv)
		sc.Done()
		sc.wc.Decrement()
	}()
}

// Implements backoff and retry logic for 500 responses
func (sc *stressClient) retry(p Package, backoff time.Duration, serv int) {

	// Set Backoff Interval to 500ms
	backoffInterval := time.Duration(500 * time.Millisecond)

	// Arithmetic backoff for kicks
	bo := backoff + backoffInterval

	// Make the write request
	resp, elapsed, err := sc.prepareWrite(p.Body, serv)

	// Find number of times request has been retried
	numBackoffs := int(bo/backoffInterval) - 1

	// On 500 responses, resp == nil. This logic keeps program for panicing
	var statusCode int

	if resp == nil {
		statusCode = 500
	} else {
		statusCode = resp.StatusCode
	}

	// Make a point for reporting
	point := sc.writePoint(numBackoffs, p.StatementID, statusCode, elapsed, p.Tracer.Tags, len(p.Body))

	// Send the Response(point, tracer)
	sc.responseChan <- NewResponse(point, p.Tracer)

	// BatchInterval enforcement
	bi, _ := time.ParseDuration(sc.wdelay)
	time.Sleep(bi)

	// Retry if the statusCode was not 204 or the err != nil
	if !(statusCode == 204) || err != nil {
		// Increment the *Tracer waitgroup if we are going to retry the request
		p.Tracer.Add(1)
		// Log the error if there is one
		fmt.Println(err)
		// Backoff enforcement
		time.Sleep(bo)
		sc.retry(p, bo, serv)
	}

}

// Prepares to send the POST request
func (sc *stressClient) prepareWrite(points []byte, serv int) (*http.Response, time.Duration, error) {

	// Construct address string
	var writeTemplate string
	if sc.ssl {
		writeTemplate = "https://%v/write?db=%v&precision=%v&u=%v&p=%v"
	} else {
		writeTemplate = "http://%v/write?db=%v&precision=%v&u=%v&p=%v"
	}
	address := fmt.Sprintf(writeTemplate, sc.addresses[serv], sc.database, sc.precision, sc.username, sc.password)

	// Start timer
	t := time.Now()
	resp, err := makePost(address, bytes.NewBuffer(points))
	elapsed := time.Since(t)

	return resp, elapsed, err
}

// Send POST request, read it, and handle errors
func makePost(url string, points io.Reader) (*http.Response, error) {

	resp, err := http.Post(url, "text/plain", points)

	if err != nil {
		return resp, fmt.Errorf("Error making write POST request\n  error: %v\n  url: %v\n", err, url)
	}

	body, _ := ioutil.ReadAll(resp.Body)

	if resp.StatusCode != 204 {
		return resp, fmt.Errorf("Write returned non-204 status code\n  StatusCode: %v\n  InfluxDB Error: %v\n", resp.StatusCode, string(body))
	}

	resp.Body.Close()

	return resp, nil
}