Skip to content

Commit

Permalink
[INLONG-4961][DataProxy] Add dataproxy-sdk-golang (#8111)
Browse files Browse the repository at this point in the history
Co-authored-by: gunli <[email protected]>
  • Loading branch information
gunli and gunli authored Jun 1, 2023
1 parent b3ba359 commit 5552019
Show file tree
Hide file tree
Showing 31 changed files with 5,033 additions and 0 deletions.
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,6 @@ header:

# Go sum
- '**/tubemq-client-go/go.sum'
- '**/dataproxy-sdk-golang/go.sum'

comment: on-failure
40 changes: 40 additions & 0 deletions inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/.golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

run:
timeout: 10m
tests: false
linters:
disable-all: true
enable:
- asciicheck
- bodyclose
- durationcheck
- errcheck
- exportloopref
- gofmt
- goimports
- gosimple
- ineffassign
- makezero
- prealloc
- predeclared
- rowserrcheck
- staticcheck
- stylecheck
- typecheck
- unused
30 changes: 30 additions & 0 deletions inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


.PHONY : clean all lint

all :
gofmt -w .
goimports -w .
go vet ./... || exit 1
golint -set_exit_status ./... || exit 2

go build dataproxy/*
clean :

lint:
golangci-lint run
189 changes: 189 additions & 0 deletions inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
## Overview

dataproxy-sdk-golang is the golang version of InLong DataProxy client SDK.

## Features

- Service discovery;
- Connection pool, buffer pool, byte pool;
- Backoff retry;
- Concurrently batch send;
- Send synchronously;
- Send asynchronously;
- Close gracefully;
- Hookable debug log;
- Heartbeat;
- Metrics;
- Snappy compress;
- Additional column;
- Server offline re-balance;

## Usage

### example

refer: cli/main.go

``` go
package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
"strings"
"time"

"go.uber.org/atomic"

"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy"
)

var (
set string
url string
groupID string
streamID string
payload string
count int
addCols mapFlag
async bool
succeed atomic.Int32
failed atomic.Int32
)

type mapFlag map[string]string

func (f mapFlag) String() string {
return fmt.Sprintf("%v", map[string]string(f))
}

func (f mapFlag) Set(value string) error {
split := strings.SplitN(value, "=", 2)
if len(split) < 2 {
return errors.New("invalid map flag")
}

f[split[0]] = split[1]
return nil
}

func main() {
addCols = make(map[string]string)
flag.StringVar(&set, "set", "SH_IEG", "dataproxy set")
flag.StringVar(&url, "url", dataproxy.DefaultURL, "dataproxy URL")
flag.StringVar(&groupID, "group-id", "b_ieg_tglogv3_test", "dataproxy group ID")
flag.StringVar(&streamID, "stream-id", "GameSvrState", "dataproxy stream ID")
flag.StringVar(&payload, "payload", "GameSvrState|GameSvrId-Test|2023-01-11 10:08:30|127.0.0.1|1", "message payload")
flag.IntVar(&count, "count", 10, "send count")
flag.Var(&addCols, "col", "add columns, for example: -col k1=v1 -col k2=v2")
flag.BoolVar(&async, "async", false, "send asynchronously")
flag.Parse()

var err error
client, err := dataproxy.NewClient(
dataproxy.WithSet(set),
dataproxy.WithGroupID(groupID),
dataproxy.WithURL(url),
dataproxy.WithMetricsName("clit"),
dataproxy.WithAddColumns(addCols),
)

if err != nil {
log.Fatal(err)
}

msg := dataproxy.Message{GroupID: groupID, StreamID: streamID, Payload: []byte(payload)}
for i := 0; i < count; i++ {
if !async {
err = client.Send(context.Background(), msg)
if err != nil {
fmt.Println(err)
}
} else {
client.SendAsync(context.Background(), msg, onResult)
}
}

if async {
wait()
}
}

func onResult(msg dataproxy.Message, err error) {
if err != nil {
fmt.Println("error message, streamID = " + msg.StreamID + ", Payload = " + string(msg.Payload))
failed.Add(1)
} else {
succeed.Add(1)
}
}

func wait() {
for {
if int(succeed.Load()+failed.Load()) >= count {
fmt.Println("succeed:", succeed.Load())
fmt.Println("failed:", failed.Load())
return
}
time.Sleep(1 * time.Second)
}
}

```

### Options

refer: dataproxy/options.go

``` go
// Options is the DataProxy go client configs
type Options struct {
Set string // the set name of the server
GroupID string // InLong group ID
URL string // URL where the discoverer to get the endpoint list of the server
UpdateInterval time.Duration // interval to refresh the endpoint list, default: 5m
ConnTimeout time.Duration // connection timeout: default: 3000ms
WriteBufferSize int // write buffer size in bytes, default: 16M
ReadBufferSize int // read buffer size in bytes, default: 16M
SocketSendBufferSize int // socket send buffer size in bytes, default: 16M
SocketRecvBufferSize int // socket receive buffer size in bytes, default: 16M
BufferPool bufferpool.BufferPool // encoding/decoding buffer pool, if not given, SDK will init a new one
BytePool bufferpool.BytePool // encoding/decoding byte pool, if not given, SDK will init a new one
BufferPoolSize int // buffer pool size, default: 409600
BytePoolSize int // byte pool size, default: 409600
BytePoolWidth int // byte pool width, default: equals to BatchingMaxSize
Logger logger.Logger // debug logger, default: stdout
MetricsName string // the unique metrics name of this SDK, used to isolate metrics in the case that more than 1 client are initialized in one process
MetricsRegistry prometheus.Registerer // metrics registry, default: prometheus.DefaultRegisterer
WorkerNum int // worker number, default: 8
SendTimeout time.Duration // send timeout, default: 20000ms
MaxRetries int // max retry count, default: 2
BatchingMaxPublishDelay time.Duration // the time period within which the messages sent will be batched, default: 10ms
BatchingMaxMessages int // the maximum number of messages permitted in a batch, default: 10
BatchingMaxSize int // the maximum number of bytes permitted in a batch, default: 4K
MaxPendingMessages int // the max size of the queue holding the messages pending to receive an acknowledgment from the broker, default: 409600
BlockIfQueueIsFull bool // whether Send and SendAsync block if producer's message queue is full, default: false
AddColumns map[string]string // addition columns to add to the message, for example: __addcol1__worldid=xxx&__addcol2__ip=yyy, all the message will be added 2 more columns with worldid=xxx and ip=yyy
addColumnStr string // the string format of the AddColumns, just a cache, used internal
}
```

## FAQ

Q: Why should I provide a MetricsName option?

A: It is used to isolate the prometheus metrics in the case you initialize more than one client in a process.

Q: What is the purpose of the "AddColumns" option?

A: In some case, you may need to add some meta/headers to you message, AddColumns can help you to do that. AddColumns
can add some fix columns and values to your message. For example: \_\_addcol1\_\_worldid=xxx&\_\_addcol2\_\_ip=yyy, all
the messages will be updated with 2 more columns with worldid=xxx and ip=yyy.

Q: How to hook the debug logger?

A: The debug logger is defined as an interface, while logrus logger and zap sugar logger are compatible with that
interface, so you can pass a logrus logger or zap sugar logger as the debug logger.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bufferpool

import (
"bytes"

"github.com/oxtoacart/bpool"
)

// BufferPool buffer pool interface
type BufferPool interface {
Get() *bytes.Buffer
Put(buffer *bytes.Buffer)
}

// BytePool byte pool interface
type BytePool interface {
Get() []byte
Put(b []byte)
}

// NewSizedBuffer news a buffer pool with specific buffer size
func NewSizedBuffer(poolSize, bufferSize int) BufferPool {
return bpool.NewSizedBufferPool(poolSize, bufferSize)
}

// NewBuffer news a buffer pool
func NewBuffer(poolSize int) BufferPool {
return bpool.NewBufferPool(poolSize)
}

// NewBytePool news a byte pool
func NewBytePool(poolSize, length int) BytePool {
return bpool.NewBytePool(poolSize, length)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bytecloser

import (
"bytes"

"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool"
)

// ByteCloser is the interface of a byte Buffer that can be closed
type ByteCloser interface {
// Bytes return the bytes
Bytes() []byte
// Close closes the bytes
Close()
}

// Bytes is the raw byte implementations of ByteCloser
type Bytes []byte

// Bytes return the bytes
func (b Bytes) Bytes() []byte {
return b
}

// Close closes the bytes
func (b Bytes) Close() {
}

// BufferBytes is the buffered byte implementations of ByteCloser
type BufferBytes struct {
BufferPool bufferpool.BufferPool
Buffer *bytes.Buffer
}

// Bytes return the bytes
func (b *BufferBytes) Bytes() []byte {
if b.Buffer == nil {
return nil
}

return b.Buffer.Bytes()
}

// Close closes the bytes
func (b *BufferBytes) Close() {
if b.Buffer == nil {
return
}

if b.BufferPool == nil {
b.Buffer.Reset()
return
}

b.BufferPool.Put(b.Buffer)
b.Buffer = nil
}
Loading

0 comments on commit 5552019

Please sign in to comment.