-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathinfluxdb-collectd-proxy.go
272 lines (232 loc) · 7.07 KB
/
influxdb-collectd-proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
package main
import (
"flag"
"log"
"math"
"os"
"os/signal"
"strings"
"time"
influxdb "github.com/influxdb/influxdb/client"
collectd "github.com/paulhammond/gocollectd"
)
const (
appName = "influxdb-collectd-proxy"
influxWriteInterval = time.Second
influxWriteLimit = 50
influxDbPassword = "INFLUXDB_PROXY_PASSWORD"
influxDbUsername = "INFLUXDB_PROXY_USERNAME"
influxDbName = "INFLUXDB_PROXY_DATABASE"
)
var (
proxyHost *string
proxyPort *string
typesdbPath *string
logPath *string
verbose *bool
// influxdb options
host *string
username *string
password *string
database *string
normalize *bool
storeRates *bool
// Format
hostnameAsColumn *bool
pluginnameAsColumn *bool
types Types
client *influxdb.Client
beforeCache map[string]CacheEntry
)
// point cache to perform data normalization for COUNTER and DERIVE types
type CacheEntry struct {
Timestamp int64
Value float64
Hostname string
}
// signal handler
func handleSignals(c chan os.Signal) {
// block until a signal is received
sig := <-c
log.Printf("exit with a signal: %v\n", sig)
os.Exit(1)
}
func getenvOrDefault(key, defaultValue string) string {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
return value
}
func init() {
// log options
log.SetPrefix("[" + appName + "] ")
// proxy options
proxyHost = flag.String("proxyhost", "0.0.0.0", "host for proxy")
proxyPort = flag.String("proxyport", "8096", "port for proxy")
typesdbPath = flag.String("typesdb", "types.db", "path to Collectd's types.db")
logPath = flag.String("logfile", "", "path to log file (log to stderr if empty)")
verbose = flag.Bool("verbose", false, "true if you need to trace the requests")
// influxdb options
host = flag.String("influxdb", "localhost:8086", "host:port for influxdb")
username = flag.String("username", getenvOrDefault(influxDbUsername, "root"), "username for influxdb or $INFLUXDB_PROXY_USERNAME env")
password = flag.String("password", getenvOrDefault(influxDbPassword, "root"), "password for influxdb or $INFLUXDB_PROXY_PASSWORD env")
database = flag.String("database", getenvOrDefault(influxDbName, ""), "database for influxdb or $INFLUXDB_PROXY_DATABASE env")
normalize = flag.Bool("normalize", true, "true if you need to normalize data for COUNTER types (over time)")
storeRates = flag.Bool("storerates", true, "true if you need to derive rates from DERIVE types")
// format options
hostnameAsColumn = flag.Bool("hostname-as-column", false, "true if you want the hostname as column, not in series name")
pluginnameAsColumn = flag.Bool("pluginname-as-column", false, "true if you want the plugin name as column")
flag.Parse()
beforeCache = make(map[string]CacheEntry)
// read types.db
var err error
types, err = ParseTypesDB(*typesdbPath)
if err != nil {
log.Fatalf("failed to read types.db: %v\n", err)
}
}
func main() {
var err error
if *logPath != "" {
logFile, err := os.OpenFile(*logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("failed to open file: %v\n", err)
}
log.SetOutput(logFile)
defer logFile.Close()
}
// make influxdb client
client, err = influxdb.NewClient(&influxdb.ClientConfig{
Host: *host,
Username: *username,
Password: *password,
Database: *database,
})
if err != nil {
log.Fatalf("failed to make a influxdb client: %v\n", err)
}
// register a signal handler
sc := make(chan os.Signal, 1)
signal.Notify(sc, os.Interrupt, os.Kill)
go handleSignals(sc)
// make channel for collectd
c := make(chan collectd.Packet)
// then start to listen
go collectd.Listen(*proxyHost+":"+*proxyPort, c)
log.Printf("proxy started on %s:%s\n", *proxyHost, *proxyPort)
timer := time.Now()
var seriesGroup []*influxdb.Series
for {
packet := <-c
seriesGroup = append(seriesGroup, processPacket(packet)...)
if time.Since(timer) < influxWriteInterval && len(seriesGroup) < influxWriteLimit {
continue
} else {
if len(seriesGroup) > 0 {
if err := client.WriteSeries(seriesGroup); err != nil {
log.Printf("failed to write series group to influxdb: %s\n", err)
}
if *verbose {
log.Printf("[TRACE] wrote %d series\n", len(seriesGroup))
}
seriesGroup = make([]*influxdb.Series, 0)
}
timer = time.Now()
}
}
}
func processPacket(packet collectd.Packet) []*influxdb.Series {
if *verbose {
log.Printf("[TRACE] got a packet: %v\n", packet)
}
var seriesGroup []*influxdb.Series
// for all metrics in the packet
for i, _ := range packet.ValueNames() {
values, _ := packet.ValueNumbers()
// get a type for this packet
t := types[packet.Type]
// pass the unknowns
if t == nil && packet.TypeInstance == "" {
log.Printf("unknown type instance on %s\n", packet.Plugin)
continue
}
// as hostname contains commas, let's replace them
hostName := strings.Replace(packet.Hostname, ".", "_", -1)
// if there's a PluginInstance, use it
pluginName := packet.Plugin
if packet.PluginInstance != "" {
pluginName += "-" + packet.PluginInstance
}
// if there's a TypeInstance, use it
typeName := packet.Type
if packet.TypeInstance != "" {
typeName += "-" + packet.TypeInstance
} else if t != nil {
typeName += "-" + t[i][0]
}
// Append "-rx" or "-tx" for Plugin:Interface - by linyanzhong
if packet.Plugin == "interface" {
if i == 0 {
typeName += "-tx"
} else if i == 1 {
typeName += "-rx"
}
}
name := hostName + "." + pluginName + "." + typeName
nameNoHostname := pluginName + "." + typeName
// influxdb stuffs
timestamp := packet.Time().UnixNano() / 1000000
value := values[i].Float64()
dataType := packet.DataTypes[i]
readyToSend := true
normalizedValue := value
if *normalize && dataType == collectd.TypeCounter || *storeRates && dataType == collectd.TypeDerive {
if before, ok := beforeCache[name]; ok && before.Value != math.NaN() {
// normalize over time
if timestamp-before.Timestamp > 0 {
normalizedValue = (value - before.Value) / float64((timestamp-before.Timestamp)/1000)
} else {
normalizedValue = value - before.Value
}
} else {
// skip current data if there's no initial entry
readyToSend = false
}
entry := CacheEntry{
Timestamp: timestamp,
Value: value,
Hostname: hostName,
}
beforeCache[name] = entry
}
if readyToSend {
columns := []string{"time", "value"}
points_values := []interface{}{timestamp, normalizedValue}
name_value := name
// option hostname-as-column is true
if *hostnameAsColumn {
name_value = nameNoHostname
columns = append(columns, "hostname")
points_values = append(points_values, hostName)
}
// option pluginname-as-column is true
if *pluginnameAsColumn {
columns = append(columns, "plugin")
points_values = append(points_values, pluginName)
}
series := &influxdb.Series{
Name: name_value,
Columns: columns,
Points: [][]interface{}{
points_values,
},
}
if *verbose {
log.Printf("[TRACE] ready to send series: %v\n", series)
}
seriesGroup = append(seriesGroup, series)
}
}
return seriesGroup
}