diff --git a/clickhouse.go b/clickhouse.go index 72e01ad..12e0772 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -34,10 +34,11 @@ type Clickhouse struct { // ClickhouseRequest - request struct for queue type ClickhouseRequest struct { - Params string - Query string - Content string - Count int + Params string + Query string + Content string + Count int + isInsert bool } // ErrServerIsDown - signals about server is down @@ -179,11 +180,17 @@ func (srv *ClickhouseServer) SendQuery(r *ClickhouseRequest) (response string, s if r.Params != "" { url += "?" + r.Params } - log.Printf("INFO: send %+v rows to %+v of %+v\n", r.Count, srv.URL, r.Query) + if r.isInsert { + log.Printf("INFO: sending %+v rows to %+v of %+v\n", r.Count, srv.URL, r.Query) + } resp, err := srv.Client.Post(url, "", strings.NewReader(r.Content)) if err != nil { srv.Bad = true return err.Error(), http.StatusBadGateway, ErrServerIsDown + } else { + if r.isInsert { + log.Printf("INFO: sent %+v rows to %+v of %+v\n", r.Count, srv.URL, r.Query) + } } buf, _ := ioutil.ReadAll(resp.Body) s := string(buf) @@ -211,6 +218,6 @@ func (c *Clickhouse) SendQuery(r *ClickhouseRequest) (response string, status in } return response, status, err } - return response, status, ErrNoServers + return "", http.StatusServiceUnavailable, ErrNoServers } } diff --git a/clickhouse_test.go b/clickhouse_test.go index 6239743..b19d0f9 100644 --- a/clickhouse_test.go +++ b/clickhouse_test.go @@ -43,7 +43,7 @@ func TestClickhouse_SendQuery(t *testing.T) { c.GetNextServer() c.Servers[0].Bad = true _, status, err := c.SendQuery(&ClickhouseRequest{}) - assert.Equal(t, 0, status) + assert.Equal(t, 503, status) assert.True(t, errors.Is(err, ErrNoServers)) } diff --git a/collector.go b/collector.go index 359be1e..21c6fdb 100644 --- a/collector.go +++ b/collector.go @@ -27,6 +27,8 @@ type Table struct { FlushInterval int mu sync.Mutex Sender Sender + TickerChan *chan struct{} + lastUpdate time.Time // todo add Last Error } @@ -36,7 +38,10 @@ type Collector struct { mu sync.RWMutex Count int FlushInterval int + CleanInterval int + RemoveQueryID bool Sender Sender + TickerChan *chan struct{} } // NewTable - default table constructor @@ -50,12 +55,17 @@ func NewTable(name string, sender Sender, count int, interval int) (t *Table) { } // NewCollector - default collector constructor -func NewCollector(sender Sender, count int, interval int) (c *Collector) { +func NewCollector(sender Sender, count int, interval int, cleanInterval int, removeQueryID bool) (c *Collector) { c = new(Collector) c.Sender = sender c.Tables = make(map[string]*Table) c.Count = count c.FlushInterval = interval + c.CleanInterval = cleanInterval + c.RemoveQueryID = removeQueryID + if cleanInterval > 0 { + c.TickerChan = c.RunTimer() + } return c } @@ -71,10 +81,11 @@ func (t *Table) Content() string { // Flush - sends collected data in table to clickhouse func (t *Table) Flush() { req := ClickhouseRequest{ - Params: t.Params, - Query: t.Query, - Content: t.Content(), - Count: t.count, + Params: t.Params, + Query: t.Query, + Content: t.Content(), + Count: len(t.Rows), + isInsert: true, } t.Sender.Send(&req) t.Rows = make([]string, 0, t.FlushCount) @@ -105,13 +116,21 @@ func (t *Table) GetCount() int { } // RunTimer - timer for periodical savings data -func (t *Table) RunTimer() { - ticker := time.NewTicker(time.Millisecond * time.Duration(t.FlushInterval)) +func (t *Table) RunTimer() *chan struct{} { + done := make(chan struct{}) go func() { - for range ticker.C { - t.CheckFlush() + ticker := time.NewTicker(time.Millisecond * time.Duration(t.FlushInterval)) + defer ticker.Stop() + for { + select { + case <-ticker.C: + t.CheckFlush() + case <-done: + return + } } }() + return &done } // Add - Adding query to table @@ -119,10 +138,54 @@ func (t *Table) Add(text string) { t.mu.Lock() defer t.mu.Unlock() t.count++ - t.Rows = append(t.Rows, text) + if t.Format == "TabSeparated" { + t.Rows = append(t.Rows, strings.Split(text, "\n")...) + } else { + t.Rows = append(t.Rows, text) + } if len(t.Rows) >= t.FlushCount { t.Flush() } + t.lastUpdate = time.Now() +} + +// CleanTable - delete table from map +func (t *Table) CleanTable() { + t.mu.Lock() + close(*t.TickerChan) + t = nil +} + +// CleanTables - clean unsused tables +func (c *Collector) CleanTables() { + c.mu.Lock() + defer c.mu.Unlock() + for k, t := range c.Tables { + if t.lastUpdate.Add(time.Duration(c.CleanInterval) * time.Millisecond).Before(time.Now()) { + // table was not updated for CleanInterval - delete that table - otherwise it can cause memLeak + t.CleanTable() + defer delete(c.Tables, k) + } + + } +} + +// RunTimer - timer for periodical cleaning unused tables +func (c *Collector) RunTimer() *chan struct{} { + done := make(chan struct{}) + go func() { + ticker := time.NewTicker(time.Duration(c.CleanInterval) * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ticker.C: + c.CleanTables() + case <-done: + return + } + } + }() + return &done } // Empty - check if all tables are empty @@ -198,12 +261,29 @@ func (c *Collector) addTable(name string) *Table { t.Params = params t.Format = c.getFormat(query) c.Tables[name] = t - t.RunTimer() + t.TickerChan = t.RunTimer() + t.lastUpdate = time.Now() return t } // Push - adding query to collector with query params (with query) and rows -func (c *Collector) Push(params string, content string) { +func (c *Collector) Push(paramsIn string, content string) { + // as we are using all params as a table key, we have to remove query_id + // otherwise every query will be threated as unique thus it will consume more memory + params := "" + if c.RemoveQueryID { + items := strings.Split(paramsIn, "&") + for _, p := range items { + if !HasPrefix(p, "query_id=") { + params += "&" + p + } + } + if len(params) > 0 { + params = strings.TrimSpace(params[1:]) + } + } else { + params = paramsIn + } c.mu.RLock() table, ok := c.Tables[params] if ok { diff --git a/collector_test.go b/collector_test.go index c19200a..2910558 100644 --- a/collector_test.go +++ b/collector_test.go @@ -30,14 +30,14 @@ var escSelect = url.QueryEscape(qSelect) var escParamsAndSelect = qParams + "&query=" + escSelect func BenchmarkCollector_Push(t *testing.B) { - c := NewCollector(&fakeSender{}, 1000, 1000) + c := NewCollector(&fakeSender{}, 1000, 1000, 0, true) for i := 0; i < 30000; i++ { c.Push(escTitle, qContent) } } func TestCollector_Push(t *testing.T) { - c := NewCollector(&fakeSender{}, 1000, 1000) + c := NewCollector(&fakeSender{}, 1000, 1000, 0, true) for i := 0; i < 10400; i++ { c.Push(escTitle, qContent) } @@ -45,7 +45,7 @@ func TestCollector_Push(t *testing.T) { } func BenchmarkCollector_ParseQuery(b *testing.B) { - c := NewCollector(&fakeSender{}, 1000, 1000) + c := NewCollector(&fakeSender{}, 1000, 1000, 0, true) c.ParseQuery("", qTitle+" "+qContent) c.ParseQuery(qParams, qTitle+" "+qContent) c.ParseQuery("query="+escTitle, qContent) @@ -53,7 +53,7 @@ func BenchmarkCollector_ParseQuery(b *testing.B) { } func TestCollector_ParseQuery(t *testing.T) { - c := NewCollector(&fakeSender{}, 1000, 1000) + c := NewCollector(&fakeSender{}, 1000, 1000, 0, true) var params string var content string var insert bool @@ -139,20 +139,20 @@ func TestCollector_ParseQuery(t *testing.T) { } func TestCollector_separateQuery(t *testing.T) { - c := NewCollector(&fakeSender{}, 1000, 1000) + c := NewCollector(&fakeSender{}, 1000, 1000, 0, true) query, params := c.separateQuery(escParamsAndSelect) assert.Equal(t, qSelect, query) assert.Equal(t, qParams, params) } func TestTable_getFormat(t *testing.T) { - c := NewCollector(&fakeSender{}, 1000, 1000) + c := NewCollector(&fakeSender{}, 1000, 1000, 0, true) f := c.getFormat(qTitle) assert.Equal(t, "TabSeparated", f) } func TestTable_CheckFlush(t *testing.T) { - c := NewCollector(&fakeSender{}, 1000, 1000) + c := NewCollector(&fakeSender{}, 1000, 1000, 0, true) c.Push(qTitle, qContent) count := 0 for !c.Tables[qTitle].Empty() { @@ -163,7 +163,7 @@ func TestTable_CheckFlush(t *testing.T) { } func TestCollector_FlushAll(t *testing.T) { - c := NewCollector(&fakeSender{}, 1000, 1000) + c := NewCollector(&fakeSender{}, 1000, 1000, 0, true) c.Push(qTitle, qContent) c.FlushAll() } diff --git a/config.sample.json b/config.sample.json index 518167e..9974419 100644 --- a/config.sample.json +++ b/config.sample.json @@ -2,6 +2,8 @@ "listen": ":8124", "flush_count": 10000, "flush_interval": 1000, + "clean_interval": 0, + "remove_query_id": true, "dump_check_interval": 300, "debug": false, "dump_dir": "dumps", @@ -12,4 +14,4 @@ "http://127.0.0.1:8123" ] } -} \ No newline at end of file +} diff --git a/dump.go b/dump.go index 4d9b6a2..3543092 100644 --- a/dump.go +++ b/dump.go @@ -154,16 +154,18 @@ func (d *FileDumper) ProcessNextDump(sender Sender) error { } if data != "" { params := "" + query := "" lines := strings.Split(data, "\n") if !HasPrefix(lines[0], "insert") { params = lines[0] + query = lines[1] data = strings.Join(lines[1:], "\n") } - _, status, err := sender.SendQuery(&ClickhouseRequest{Params: params, Content: data}) + _, status, err := sender.SendQuery(&ClickhouseRequest{Params: params, Content: data, Query: query, Count: len(lines[2:]), isInsert: true}) if err != nil { return fmt.Errorf("server error (%+v) %+v", status, err) } - log.Printf("INFO: dump sended: %+v\n", f) + log.Printf("INFO: dump sent: %+v\n", f) } err = d.DeleteDump(f) if err != nil { diff --git a/go.mod b/go.mod index 513ad30..a0cce61 100644 --- a/go.mod +++ b/go.mod @@ -5,11 +5,14 @@ require ( github.com/labstack/gommon v0.2.8 // indirect github.com/mattn/go-colorable v0.1.0 // indirect github.com/mattn/go-isatty v0.0.4 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect github.com/nikepan/go-datastructures v1.0.32 github.com/prometheus/client_golang v1.1.0 github.com/stretchr/testify v1.3.0 github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4 // indirect + golang.org/x/tools v0.0.0-20201105220310-78b158585360 // indirect ) go 1.13 diff --git a/go.sum b/go.sum index 8306b0d..77d9690 100644 --- a/go.sum +++ b/go.sum @@ -67,19 +67,43 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4 h1:gKMu1Bf6QINDnvyZuTaACm9ofY+PRh+5vFz4oxBZeF8= github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4/go.mod h1:50wTf68f99/Zt14pr046Tgt3Lp2vLyFZKzbFXTOabXw= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 h1:4y9KwBHBgBNwDbtu44R5o1fdOCQUEXhbk/P4A9WmJq0= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20201105220310-78b158585360 h1:/9CzsU8hOpnSUCtem1vfWNgsVeCTgkMdx+VE5YIYxnU= +golang.org/x/tools v0.0.0-20201105220310-78b158585360/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go index 5d4385c..62f5e88 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ var date = "unknown" func main() { log.SetOutput(os.Stdout) + log.SetFlags(log.LstdFlags | log.Lmicroseconds) configFile := flag.String("config", "config.json", "config file (json)") diff --git a/server.go b/server.go index 2ba35a2..b821c92 100644 --- a/server.go +++ b/server.go @@ -10,6 +10,11 @@ import ( "syscall" "time" + // debug stuff + _ "net/http/pprof" + "runtime" + "runtime/debug" + "github.com/labstack/echo" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -57,7 +62,7 @@ func (server *Server) writeHandler(c echo.Context) error { go server.Collector.Push(params, content) return c.String(http.StatusOK, "") } - resp, status, _ := server.Collector.Sender.SendQuery(&ClickhouseRequest{Params: qs, Content: s}) + resp, status, _ := server.Collector.Sender.SendQuery(&ClickhouseRequest{Params: qs, Content: s, isInsert: false}) return c.String(status, resp) } @@ -65,6 +70,30 @@ func (server *Server) statusHandler(c echo.Context) error { return c.JSON(200, Status{Status: "ok"}) } +func (server *Server) gcHandler(c echo.Context) error { + runtime.GC() + return c.JSON(200, Status{Status: "GC"}) +} + +func (server *Server) freeMemHandler(c echo.Context) error { + debug.FreeOSMemory() + return c.JSON(200, Status{Status: "freeMem"}) +} + +// manual trigger for cleaning tables +func (server *Server) tablesCleanHandler(c echo.Context) error { + log.Printf("DEBUG: clean tables:\n%+v", server.Collector.Tables) + for k, t := range server.Collector.Tables { + log.Printf("DEBUG: check if table is empty: %+v with key:%+v\n", t, k) + if ok := t.Empty(); ok { + log.Printf("DEBUG: delete empty table: %+v with key:%+v\n", t, k) + server.Collector.Tables[k].CleanTable() + defer delete(server.Collector.Tables, k) + } + } + return c.JSON(200, Status{Status: "cleaned empty tables"}) +} + // Start - start http server func (server *Server) Start() error { return server.echo.Start(server.Listen) @@ -81,6 +110,11 @@ func InitServer(listen string, collector *Collector, debug bool) *Server { server.echo.POST("/", server.writeHandler) server.echo.GET("/status", server.statusHandler) server.echo.GET("/metrics", echo.WrapHandler(promhttp.Handler())) + // debug stuff + server.echo.GET("/debug/gc", server.gcHandler) + server.echo.GET("/debug/freemem", server.freeMemHandler) + server.echo.GET("/debug/pprof/*", echo.WrapHandler(http.DefaultServeMux)) + server.echo.GET("/debug/tables-clean", server.tablesCleanHandler) return server } @@ -107,7 +141,7 @@ func RunServer(cnf Config) { sender.AddServer(url) } - collect := NewCollector(sender, cnf.FlushCount, cnf.FlushInterval) + collect := NewCollector(sender, cnf.FlushCount, cnf.FlushInterval, cnf.CleanInterval, cnf.RemoveQueryID) // send collected data on SIGTERM and exit signals := make(chan os.Signal) diff --git a/server_test.go b/server_test.go index 120ba53..ea941c1 100644 --- a/server_test.go +++ b/server_test.go @@ -18,7 +18,7 @@ import ( ) func TestRunServer(t *testing.T) { - collector := NewCollector(&fakeSender{}, 1000, 1000) + collector := NewCollector(&fakeSender{}, 1000, 1000, 0, true) server := InitServer("", collector, false) go server.Start() server.echo.POST("/", server.writeHandler) @@ -58,7 +58,7 @@ func TestRunServer(t *testing.T) { func TestServer_SafeQuit(t *testing.T) { sender := &fakeSender{} - collect := NewCollector(sender, 1000, 1000) + collect := NewCollector(sender, 1000, 1000, 0, true) collect.AddTable("test") collect.Push("sss", "sss") @@ -96,7 +96,7 @@ func TestServer_MultiServer(t *testing.T) { sender := NewClickhouse(10, 10) sender.AddServer(s1.URL) sender.AddServer(s2.URL) - collect := NewCollector(sender, 1000, 1000) + collect := NewCollector(sender, 1000, 1000, 0, true) collect.AddTable("test") collect.Push("eee", "eee") collect.Push("fff", "fff") diff --git a/utils.go b/utils.go index bfc0028..9de428c 100644 --- a/utils.go +++ b/utils.go @@ -22,6 +22,8 @@ type Config struct { Clickhouse clickhouseConfig `json:"clickhouse"` FlushCount int `json:"flush_count"` FlushInterval int `json:"flush_interval"` + CleanInterval int `json:"clean_interval"` + RemoveQueryID bool `json:"remove_query_id"` DumpCheckInterval int `json:"dump_check_interval"` DumpDir string `json:"dump_dir"` Debug bool `json:"debug"` @@ -54,6 +56,17 @@ func readEnvInt(name string, value *int) { } } +func readEnvBool(name string, value *bool) { + s := os.Getenv(name) + if s != "" { + v, err := strconv.ParseBool(s) + if err != nil { + log.Printf("ERROR: Wrong %+v env: %+v\n", name, err) + } + *value = v + } +} + // ReadConfig init config data func ReadConfig(configFile string) (Config, error) { cnf := Config{} @@ -66,8 +79,11 @@ func ReadConfig(configFile string) (Config, error) { } } + readEnvBool("CLICKHOUSE_BULK_DEBUG", &cnf.Debug) readEnvInt("CLICKHOUSE_FLUSH_COUNT", &cnf.FlushCount) readEnvInt("CLICKHOUSE_FLUSH_INTERVAL", &cnf.FlushInterval) + readEnvInt("CLICKHOUSE_CLEAN_INTERVAL", &cnf.CleanInterval) + readEnvBool("CLICKHOUSE_REMOVE_QUERY_ID", &cnf.RemoveQueryID) readEnvInt("DUMP_CHECK_INTERVAL", &cnf.DumpCheckInterval) readEnvInt("CLICKHOUSE_DOWN_TIMEOUT", &cnf.Clickhouse.DownTimeout) readEnvInt("CLICKHOUSE_CONNECT_TIMEOUT", &cnf.Clickhouse.ConnectTimeout)