/usr/share/gocode/src/github.com/influxdata/influxdb/stress/run.go is in golang-github-influxdb-influxdb-dev 1.1.1+dfsg1-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 | package stress // import "github.com/influxdata/influxdb/stress"
import (
"bytes"
"fmt"
"net/http"
"sync"
"time"
)
// Point is an interface that is used to represent
// the abstract idea of a point in InfluxDB.
type Point interface {
Line() []byte
Graphite() []byte
OpenJSON() []byte
OpenTelnet() []byte
}
///////////////////////////////////////////////////
// Example Implementation of the Point Interface //
///////////////////////////////////////////////////
// KeyValue is an intermediate type that is used
// to express Tag and Field similarly.
type KeyValue struct {
Key string
Value string
}
// Tag is a struct for a tag in influxdb.
type Tag KeyValue
// Field is a struct for a field in influxdb.
type Field KeyValue
// Tags is an slice of all the tags for a point.
type Tags []Tag
// Fields is an slice of all the fields for a point.
type Fields []Field
// tagset returns a byte array for a points tagset.
func (t Tags) tagset() []byte {
var buf bytes.Buffer
for _, tag := range t {
buf.Write([]byte(fmt.Sprintf("%v=%v,", tag.Key, tag.Value)))
}
b := buf.Bytes()
b = b[0 : len(b)-1]
return b
}
// fieldset returns a byte array for a points fieldset.
func (f Fields) fieldset() []byte {
var buf bytes.Buffer
for _, field := range f {
buf.Write([]byte(fmt.Sprintf("%v=%v,", field.Key, field.Value)))
}
b := buf.Bytes()
b = b[0 : len(b)-1]
return b
}
// StdPoint represents a point in InfluxDB
type StdPoint struct {
Measurement string
Tags Tags
Fields Fields
Timestamp int64
}
// Line returns a byte array for a point in
// line-protocol format
func (p StdPoint) Line() []byte {
var buf bytes.Buffer
buf.Write([]byte(fmt.Sprintf("%v,", p.Measurement)))
buf.Write(p.Tags.tagset())
buf.Write([]byte(" "))
buf.Write(p.Fields.fieldset())
buf.Write([]byte(" "))
buf.Write([]byte(fmt.Sprintf("%v", p.Timestamp)))
byt := buf.Bytes()
return byt
}
// Graphite returns a byte array for a point
// in graphite-protocol format
func (p StdPoint) Graphite() []byte {
// TODO: implement
// timestamp is at second level resolution
// but can be specified as a float to get nanosecond
// level precision
t := "tag_1.tag_2.measurement[.field] acutal_value timestamp"
return []byte(t)
}
// OpenJSON returns a byte array for a point
// in JSON format
func (p StdPoint) OpenJSON() []byte {
// TODO: implement
//[
// {
// "metric": "sys.cpu.nice",
// "timestamp": 1346846400,
// "value": 18,
// "tags": {
// "host": "web01",
// "dc": "lga"
// }
// },
// {
// "metric": "sys.cpu.nice",
// "timestamp": 1346846400,
// "value": 9,
// "tags": {
// "host": "web02",
// "dc": "lga"
// }
// }
//]
return []byte("hello")
}
// OpenTelnet returns a byte array for a point
// in OpenTSDB-telnet format
func (p StdPoint) OpenTelnet() []byte {
// TODO: implement
// timestamp can be 13 digits at most
// sys.cpu.nice timestamp value tag_key_1=tag_value_1 tag_key_2=tag_value_2
return []byte("hello")
}
////////////////////////////////////////
// response is the results making
// a request to influxdb.
type response struct {
Resp *http.Response
Time time.Time
Timer *Timer
}
// Success returns true if the request
// was successful and false otherwise.
func (r response) Success() bool {
// ADD success for tcp, udp, etc
return !(r.Resp == nil || r.Resp.StatusCode != 204)
}
// WriteResponse is a response for a Writer
type WriteResponse response
// QueryResponse is a response for a Querier
type QueryResponse struct {
response
Body string
}
///////////////////////////////
// Definition of the Writer ///
///////////////////////////////
// PointGenerator is an interface for generating points.
type PointGenerator interface {
Generate() (<-chan Point, error)
Time() time.Time
}
// InfluxClient is an interface for writing data to the database.
type InfluxClient interface {
Batch(ps <-chan Point, r chan<- response) error
send(b []byte) (response, error)
//ResponseHandler
}
// Writer is a PointGenerator and an InfluxClient.
type Writer struct {
PointGenerator
InfluxClient
}
// NewWriter returns a Writer.
func NewWriter(p PointGenerator, i InfluxClient) Writer {
w := Writer{
PointGenerator: p,
InfluxClient: i,
}
return w
}
////////////////////////////////
// Definition of the Querier ///
////////////////////////////////
// Query is query
type Query string
// QueryGenerator is an interface that is used
// to define queries that will be ran on the DB.
type QueryGenerator interface {
QueryGenerate(f func() time.Time) (<-chan Query, error)
SetTime(t time.Time)
}
// QueryClient is an interface that can write a query
// to an InfluxDB instance.
type QueryClient interface {
Query(q Query) (response, error)
Exec(qs <-chan Query, r chan<- response) error
}
// Querier queries the database.
type Querier struct {
QueryGenerator
QueryClient
}
// NewQuerier returns a Querier.
func NewQuerier(q QueryGenerator, c QueryClient) Querier {
r := Querier{
QueryGenerator: q,
QueryClient: c,
}
return r
}
///////////////////////////////////
// Definition of the Provisioner //
///////////////////////////////////
// Provisioner is an interface that provisions an
// InfluxDB instance
type Provisioner interface {
Provision() error
}
/////////////////////////////////
// Definition of StressTest /////
/////////////////////////////////
// StressTest is a struct that contains all of
// the logic required to execute a Stress Test
type StressTest struct {
Provisioner
Writer
Querier
}
// responseHandler
type responseHandler func(r <-chan response, t *Timer)
// Start executes the Stress Test
func (s *StressTest) Start(wHandle responseHandler, rHandle responseHandler) {
var wg sync.WaitGroup
// Provision the Instance
s.Provision()
wg.Add(1)
// Starts Writing
go func() {
r := make(chan response, 0)
wt := NewTimer()
go func() {
defer wt.StopTimer()
defer close(r)
p, err := s.Generate()
if err != nil {
fmt.Println(err)
return
}
err = s.Batch(p, r)
if err != nil {
fmt.Println(err)
return
}
}()
// Write Results Handler
wHandle(r, wt)
wg.Done()
}()
wg.Add(1)
// Starts Querying
go func() {
r := make(chan response, 0)
rt := NewTimer()
go func() {
defer rt.StopTimer()
defer close(r)
q, err := s.QueryGenerate(s.Time)
if err != nil {
fmt.Println(err)
return
}
err = s.Exec(q, r)
if err != nil {
fmt.Println(err)
return
}
}()
// Read Results Handler
rHandle(r, rt)
wg.Done()
}()
wg.Wait()
}
// NewStressTest returns an instance of a StressTest
func NewStressTest(p Provisioner, w Writer, r Querier) StressTest {
s := StressTest{
Provisioner: p,
Writer: w,
Querier: r,
}
return s
}
|