Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle octet-stream for passthru and credentials in Reqest X-Headers #67

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 71 additions & 23 deletions clickhouse.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"crypto/tls"
"errors"
"fmt"
Expand All @@ -21,6 +22,7 @@ type ClickhouseServer struct {
Bad bool
Client *http.Client
LogQueries bool
Credentials *Credentials
}

// Clickhouse - main clickhouse sender object
Expand All @@ -33,15 +35,15 @@ type Clickhouse struct {
Dumper Dumper
wg sync.WaitGroup
Transport *http.Transport
Credentials *Credentials
}

// ClickhouseRequest - request struct for queue
type ClickhouseRequest struct {
Params string
Query string
Content string
Count int
isInsert bool
Params string
Query string
Content string
Count int
}

// ErrServerIsDown - signals about server is down
Expand All @@ -52,29 +54,40 @@ var ErrNoServers = errors.New("No working clickhouse servers")

// NewClickhouse - get clickhouse object
func NewClickhouse(downTimeout int, connectTimeout int, tlsServerName string, tlsSkipVerify bool) (c *Clickhouse) {
tlsConfig := &tls.Config{}
tlsConfig := new(tls.Config)
if tlsServerName != "" {
tlsConfig.ServerName = tlsServerName
}
if tlsSkipVerify == true {
tlsConfig.InsecureSkipVerify = tlsSkipVerify
}

c = new(Clickhouse)
c.DownTimeout = downTimeout
c.ConnectTimeout = connectTimeout
if c.ConnectTimeout <= 0 {
c.ConnectTimeout = 10
if connectTimeout <= 0 {
connectTimeout = 10
}
c.Servers = make([]*ClickhouseServer, 0)
c.Queue = queue.New(1000)
c.Transport = &http.Transport{
TLSClientConfig: tlsConfig,

c = &Clickhouse{
DownTimeout: downTimeout,
ConnectTimeout: connectTimeout,
Servers: make([]*ClickhouseServer, 0),
Queue: queue.New(1000),
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
Credentials: &Credentials{
User: "default",
Pass: "",
},
}

go c.Run()
return c
}

func (click *Clickhouse) SetCreds(creds *Credentials) {
click.Credentials = creds
}

// AddServer - add clickhouse server url
func (c *Clickhouse) AddServer(url string, logQueries bool) {
c.mu.Lock()
Expand Down Expand Up @@ -125,6 +138,8 @@ func (c *Clickhouse) GetNextServer() (srv *ClickhouseServer) {
if srv != nil {
srv.LastRequest = time.Now()
}

srv.Credentials = c.Credentials
return srv

}
Expand Down Expand Up @@ -190,29 +205,39 @@ func (c *Clickhouse) WaitFlush() (err error) {
// SendQuery - sends query to server and return result
func (srv *ClickhouseServer) SendQuery(r *ClickhouseRequest) (response string, status int, err error) {
if srv.URL != "" {
log.Printf("INFO: sending %+v rows to %+v\n", r.Count, srv.URL)
if cnf.Debug {
log.Printf("DEBUG: query %+v\n", r.Query)
}

url := srv.URL
if r.Params != "" {
url += "?" + r.Params
}
if r.isInsert && srv.LogQueries {
log.Printf("INFO: sending %+v rows to %+v of %+v\n", r.Count, srv.URL, r.Query)
}
resp, err := srv.Client.Post(url, "text/plain", strings.NewReader(r.Content))

conn := srv.Client
req, _ := http.NewRequest("POST", url, strings.NewReader(r.Content))
req.Header.Add("X-ClickHouse-User", srv.Credentials.User)
req.Header.Add("X-ClickHouse-Key", srv.Credentials.Pass)
resp, err := conn.Do(req)
if err != nil {
srv.Bad = true
return err.Error(), http.StatusBadGateway, ErrServerIsDown
}
if r.isInsert && srv.LogQueries {
log.Printf("INFO: sent %+v rows to %+v of %+v\n", r.Count, srv.URL, r.Query)
}
defer resp.Body.Close()
buf, _ := ioutil.ReadAll(resp.Body)
s := string(buf)
if resp.StatusCode >= 502 {
srv.Bad = true
err = ErrServerIsDown
} else if resp.StatusCode >= 400 {
err = fmt.Errorf("Wrong server status %+v:\nresponse: %+v\nrequest: %#v", resp.StatusCode, s, r.Content)
err = fmt.Errorf("ERROR: Wrong server status %+v:\nresponse: %+v\n", resp.StatusCode, s)
if cnf.Debug {
err = fmt.Errorf("ERROR: Wrong server status %+v:\nresponse: %+v\nRequest: %#v\n", resp.StatusCode, s, r.Content)
}
}

log.Printf("INFO: sent %+v rows to %+v\n", r.Count, srv.URL)
return s, resp.StatusCode, err
}

Expand All @@ -234,3 +259,26 @@ func (c *Clickhouse) SendQuery(r *ClickhouseRequest) (response string, status in
return "", http.StatusServiceUnavailable, ErrNoServers
}
}

func (c *Clickhouse) PassThru(req *http.Request, clientReqBody []byte) (res *http.Response, buf *bytes.Buffer) {
for {
s := c.GetNextServer()
if s != nil {
reqBuf := bytes.NewBuffer(clientReqBody)

clickReq, _ := http.NewRequest(req.Method, s.URL, reqBuf)

CopyHeader(clickReq.Header, req.Header)
res, err := s.Client.Do(clickReq)
if errors.Is(err, ErrServerIsDown) {
log.Printf("ERROR: server down (%+v): %+v\n", res.Status, res)
continue
}

resBody, _ := ioutil.ReadAll(res.Body)
defer res.Body.Close()

return res, bytes.NewBuffer(resBody)
}
}
}
11 changes: 5 additions & 6 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewCollector(sender Sender, count int, interval int, cleanInterval int, rem
// Content - get text content of rowsfor query
func (t *Table) Content() string {
rowDelimiter := "\n"
if t.Format == "RowBinary" {
if strings.HasPrefix(t.Format, "RowBinary") {
rowDelimiter = ""
}
return t.Query + "\n" + strings.Join(t.Rows, rowDelimiter)
Expand All @@ -81,11 +81,10 @@ func (t *Table) Content() string {
// Flush - sends collected data in table to clickhouse
func (t *Table) Flush() {
req := ClickhouseRequest{
Params: t.Params,
Query: t.Query,
Content: t.Content(),
Count: len(t.Rows),
isInsert: true,
Params: t.Params,
Query: t.Query,
Content: t.Content(),
Count: len(t.Rows),
}
t.Sender.Send(&req)
t.Rows = make([]string, 0, t.FlushCount)
Expand Down
8 changes: 7 additions & 1 deletion dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,13 @@ func (d *FileDumper) ProcessNextDump(sender Sender) error {
query = lines[1]
data = strings.Join(lines[1:], "\n")
}
_, status, err := sender.SendQuery(&ClickhouseRequest{Params: params, Content: data, Query: query, Count: len(lines[2:]), isInsert: true})
cr := &ClickhouseRequest{
Params: params,
Content: data,
Query: query,
Count: len(lines[2:]),
}
_, status, err := sender.SendQuery(cr)
if err != nil {
return fmt.Errorf("server error (%+v) %+v", status, err)
}
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ func main() {
return
}

cnf, err := ReadConfig(*configFile)
err := ReadConfig(*configFile)
if err != nil {
log.Fatalf("ERROR: %+v\n", err)
}
RunServer(cnf)
RunServer()
}
9 changes: 9 additions & 0 deletions sender.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"log"
"net/http"
"sync"
Expand All @@ -10,9 +11,11 @@ import (
type Sender interface {
Send(r *ClickhouseRequest)
SendQuery(r *ClickhouseRequest) (response string, status int, err error)
PassThru(req *http.Request, clientReqBody []byte) (res *http.Response, buf *bytes.Buffer)
Len() int64
Empty() bool
WaitFlush() (err error)
SetCreds(c *Credentials)
}

type fakeSender struct {
Expand All @@ -32,6 +35,9 @@ func (s *fakeSender) SendQuery(r *ClickhouseRequest) (response string, status in
log.Printf("DEBUG: send query: %+v\n", s.sendQueryHistory)
return "", http.StatusOK, nil
}
func (c *fakeSender) PassThru(req *http.Request, clientReqBody []byte) (res *http.Response, buf *bytes.Buffer) {
return
}

func (s *fakeSender) Len() int64 {
return 0
Expand All @@ -44,3 +50,6 @@ func (s *fakeSender) Empty() bool {
func (s *fakeSender) WaitFlush() error {
return nil
}

func (s *fakeSender) SetCreds(c *Credentials) {
}
29 changes: 15 additions & 14 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,17 @@ func NewServer(listen string, collector *Collector, debug bool, logQueries bool)
}

func (server *Server) writeHandler(c echo.Context) error {
q, _ := ioutil.ReadAll(c.Request().Body)
req := c.Request()
q, _ := ioutil.ReadAll(req.Body)
s := string(q)

if server.Debug {
log.Printf("DEBUG: query %+v %+v\n", c.QueryString(), s)
}

qs := c.QueryString()
user, password, ok := c.Request().BasicAuth()
if ok {
if qs == "" {
qs = "user=" + user + "&password=" + password
} else {
qs = "user=" + user + "&password=" + password + "&" + qs
}
}
server.Collector.Sender.SetCreds(getAuth(req))

params, content, insert := server.Collector.ParseQuery(qs, s)
if insert {
if len(content) == 0 {
Expand All @@ -67,8 +62,14 @@ func (server *Server) writeHandler(c echo.Context) error {
go server.Collector.Push(params, content)
return c.String(http.StatusOK, "")
}
resp, status, _ := server.Collector.Sender.SendQuery(&ClickhouseRequest{Params: qs, Content: s, isInsert: false})
return c.String(status, resp)

res, buf := server.Collector.Sender.PassThru(req, q)

defer res.Body.Close()
CopyHeader(c.Response().Header(), res.Header)
c.Response().WriteHeader(res.StatusCode)
c.Response().Header().Set("Collection", "close")
return c.Stream(200, "application/octet-stream", buf)
}

func (server *Server) statusHandler(c echo.Context) error {
Expand Down Expand Up @@ -100,7 +101,7 @@ func (server *Server) tablesCleanHandler(c echo.Context) error {
}

// Start - start http server
func (server *Server) Start(cnf Config) error {
func (server *Server) Start() error {
if cnf.UseTLS {
return server.echo.StartTLS(server.Listen, cnf.TLSCertFile, cnf.TLSKeyFile)
} else {
Expand Down Expand Up @@ -141,7 +142,7 @@ func SafeQuit(collect *Collector, sender Sender) {
}

// RunServer - run all
func RunServer(cnf Config) {
func RunServer() {
InitMetrics(cnf.MetricsPrefix)
dumper := NewDumper(cnf.DumpDir)
sender := NewClickhouse(cnf.Clickhouse.DownTimeout, cnf.Clickhouse.ConnectTimeout, cnf.Clickhouse.tlsServerName, cnf.Clickhouse.tlsSkipVerify)
Expand Down Expand Up @@ -176,7 +177,7 @@ func RunServer(cnf Config) {
dumper.Listen(sender, cnf.DumpCheckInterval)
}

err := srv.Start(cnf)
err := srv.Start()
if err != nil {
log.Printf("ListenAndServe: %+v\n", err)
SafeQuit(collect, sender)
Expand Down
8 changes: 4 additions & 4 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
)

func TestRunServer(t *testing.T) {
cnf, _ := ReadConfig("wrong_config.json")
_ = ReadConfig("wrong_config.json")
collector := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
server := InitServer("", collector, false, true)
go server.Start(cnf)
go server.Start()
server.echo.POST("/", server.writeHandler)

status, resp := request("POST", "/", "", server.echo)
Expand Down Expand Up @@ -125,11 +125,11 @@ func TestServer_MultiServer(t *testing.T) {
assert.True(t, sender.Empty())

os.Setenv("DUMP_CHECK_INTERVAL", "10")
cnf, err := ReadConfig("wrong_config.json")
err := ReadConfig("wrong_config.json")
os.Unsetenv("DUMP_CHECK_INTERVAL")
assert.Nil(t, err)
assert.Equal(t, 10, cnf.DumpCheckInterval)
go RunServer(cnf)
go RunServer()
time.Sleep(1000)
}

Expand Down
Loading