Skip to content

Commit

Permalink
feat: -randomize-fields argument instead of every field having the sa…
Browse files Browse the repository at this point in the history
…me value
  • Loading branch information
lesam committed Nov 26, 2021
1 parent 81620ce commit cf2f47e
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 27 deletions.
20 changes: 11 additions & 9 deletions cmd/inch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (m *Main) ParseFlags(args []string) error {
fs.IntVar(&m.inch.PointsPerSeries, "p", 100, "Points per series")
fs.StringVar(&m.inch.FieldPrefix, "field-prefix", "v0", "Field key prefix")
fs.IntVar(&m.inch.FieldsPerPoint, "f", 1, "Fields per point")
fs.BoolVar(&m.inch.RandomizeFields, "randomize-fields", false, "Randomize field values")
fs.IntVar(&m.inch.BatchSize, "b", 5000, "Batch size")
fs.StringVar(&m.inch.Database, "db", "stress", "Database to write to")
fs.StringVar(&m.inch.ShardDuration, "shard-duration", "7d", "Set shard duration (default 7d)")
Expand Down Expand Up @@ -109,15 +110,16 @@ func (m *Main) ParseFlags(args []string) error {

// Basic report tags.
m.inch.ReportTags = map[string]string{
"stress_tool": "inch",
"t": *tags,
"batch_size": fmt.Sprint(m.inch.BatchSize),
"p": fmt.Sprint(m.inch.PointsPerSeries),
"c": fmt.Sprint(m.inch.Concurrency),
"m": fmt.Sprint(m.inch.Measurements),
"f": fmt.Sprint(m.inch.FieldsPerPoint),
"virtual_hosts": fmt.Sprint(m.inch.VHosts),
"sd": m.inch.ShardDuration,
"stress_tool": "inch",
"t": *tags,
"batch_size": fmt.Sprint(m.inch.BatchSize),
"p": fmt.Sprint(m.inch.PointsPerSeries),
"c": fmt.Sprint(m.inch.Concurrency),
"m": fmt.Sprint(m.inch.Measurements),
"f": fmt.Sprint(m.inch.FieldsPerPoint),
"randomize_fields": fmt.Sprint(m.inch.RandomizeFields),
"virtual_hosts": fmt.Sprint(m.inch.VHosts),
"sd": m.inch.ShardDuration,
}

// Parse report tags.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module github.com/influxdata/inch

go 1.12
go 1.17

require github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d
57 changes: 40 additions & 17 deletions inch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"io/ioutil"
"math"
"math/rand"
"net/http"
"os"
"sort"
Expand All @@ -21,7 +22,7 @@ import (
"time"

"github.com/influxdata/influxdb1-client/models"
"github.com/influxdata/influxdb1-client/v2"
client "github.com/influxdata/influxdb1-client/v2"
)

// ErrConnectionRefused indicates that the connection to the remote server was refused.
Expand Down Expand Up @@ -91,6 +92,7 @@ type Simulator struct {
VHosts uint64 // Simulate multiple virtual hosts
PointsPerSeries int
FieldsPerPoint int
RandomizeFields bool
FieldPrefix string
BatchSize int
TargetMaxLatency time.Duration
Expand Down Expand Up @@ -125,6 +127,7 @@ func NewSimulator() *Simulator {
VHosts: 0,
PointsPerSeries: 100,
FieldsPerPoint: 1,
RandomizeFields: false,
FieldPrefix: "v0",
BatchSize: 5000,
Database: "db",
Expand Down Expand Up @@ -194,6 +197,7 @@ func (s *Simulator) Run(ctx context.Context) error {
fmt.Fprintf(s.Stdout, "Total series: %d\n", s.SeriesN())
fmt.Fprintf(s.Stdout, "Total points: %d\n", s.PointN())
fmt.Fprintf(s.Stdout, "Total fields per point: %d\n", s.FieldsPerPoint)
fmt.Fprintf(s.Stdout, "Randomized field values: %t\n", s.RandomizeFields)
fmt.Fprintf(s.Stdout, "Batch Size: %d\n", s.BatchSize)
fmt.Fprintf(s.Stdout, "Database: %s (Shard duration: %s)\n", s.Database, s.ShardDuration)
fmt.Fprintf(s.Stdout, "Write Consistency: %s\n", s.Consistency)
Expand Down Expand Up @@ -319,6 +323,24 @@ func (s *Simulator) BatchN() int {
return n
}

func (s *Simulator) makeField(val int) []byte {
var fields []byte
for i := 0; i < s.FieldsPerPoint; i++ {
var delim string
if i < s.FieldsPerPoint-1 {
delim = ","
}

// First field doesn't have a number incremented.
pair := fmt.Sprintf("%s=%d%s", s.FieldPrefix, val, delim)
if i > 0 {
pair = fmt.Sprintf("%s%d=%d%s", s.FieldPrefix, i, val, delim)
}
fields = append(fields, []byte(pair)...)
}
return fields
}

// generateBatches returns a channel for streaming batches.
func (s *Simulator) generateBatches() <-chan []byte {
ch := make(chan []byte, 10)
Expand All @@ -333,20 +355,14 @@ func (s *Simulator) generateBatches() <-chan []byte {
// For writing space between tags and field.
space := []byte(" ")

// Generate field string.
var fields []byte
for i := 0; i < s.FieldsPerPoint; i++ {
var delim string
if i < s.FieldsPerPoint-1 {
delim = ","
}

// First field doesn't have a number incremented.
pair := fmt.Sprintf("%s=1%s", s.FieldPrefix, delim)
if i > 0 {
pair = fmt.Sprintf("%s%d=1%s", s.FieldPrefix, i, delim)
}
fields = append(fields, []byte(pair)...)
// Generate field strings
var fields [][]byte
maxFieldVal := 1
if s.RandomizeFields {
maxFieldVal = 10000
}
for i := 0; i < maxFieldVal; i++ {
fields = append(fields, s.makeField(i))
}

// Size internal buffer to consider mx+tags+ +fields.
Expand All @@ -355,6 +371,7 @@ func (s *Simulator) generateBatches() <-chan []byte {
// Write points.
var lastMN int
lastM := []byte("m0")
fieldRandomize := rand.New(rand.NewSource(1234))
for i := 0; i < s.PointN(); i++ {
lastMN = i % s.Measurements
lastM = append(lastM[:1], []byte(strconv.Itoa(lastMN))...)
Expand All @@ -366,8 +383,14 @@ func (s *Simulator) generateBatches() <-chan []byte {
buf.Write(tags)
tags = tags[:0] // Reset slice but use backing array.

buf.Write(space) // Write fields.
buf.Write(fields) // Write a space.
buf.Write(space) // Write a space.

// Write all fields.
if s.RandomizeFields {
buf.Write(fields[fieldRandomize.Intn(maxFieldVal)])
} else {
buf.Write(fields[0])
}

if s.timePerSeries != 0 {
delta := time.Duration(int64(lastWrittenTotal+i) * s.timePerSeries)
Expand Down

0 comments on commit cf2f47e

Please sign in to comment.