Skip to content

Commit

Permalink
update batching/sending logic, pull out logger methods to logger.go
Browse files Browse the repository at this point in the history
  • Loading branch information
samjsong committed Aug 15, 2017
1 parent 520b663 commit b3e9b12
Show file tree
Hide file tree
Showing 6 changed files with 345 additions and 234 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ To specify additional logging driver options, you can use the `--log-opt NAME=VA
| `sumo-insecure-skip-verify` | No | false | Ignore server certificate validation. Boolean.
| `sumo-root-ca-path` | No | | Set the path to a custom root certificate.
| `sumo-server-name` | No | | Name used to validate the server certificate. By default, uses hostname of the `sumo-url`.
| `sumo-queue-size` | No | 100 | The maximum number of log batches of size sumo-batch-size we can store in memory in the event of network failure, before we begin dropping batches.
| `sumo-queue-size` | No | 500 | The maximum number of log batches of size sumo-batch-size we can store in memory in the event of network failure, before we begin dropping batches.

```bash
$ docker run --log-driver=sumologic \
Expand Down
152 changes: 28 additions & 124 deletions driver.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,27 @@
package main

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"net/http"
"sync"
"syscall"
"time"

"github.com/docker/docker/api/types/plugins/logdriver"
"github.com/docker/docker/daemon/logger"
protoio "github.com/gogo/protobuf/io"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/tonistiigi/fifo"
)

const (
defaultSendingFrequency = 2 * time.Second
defaultStreamSize = 4000
defaultBatchSize = 1000
logOptUrl = "sumo-url"

defaultSendingIntervalMs = 2000 * time.Millisecond
defaultQueueSizeItems = 500
defaultBatchSizeBytes = 1000000

fileMode = 0700
fileReaderMaxSize = 1e6
)

type SumoDriver interface {
Expand All @@ -46,59 +41,58 @@ type HttpClient interface {
type sumoLogger struct {
httpSourceUrl string
httpClient HttpClient
inputQueueFile io.ReadWriteCloser

inputFile io.ReadWriteCloser
logQueue chan *sumoLog
sendingFrequency time.Duration
logBatchQueue chan []*sumoLog
sendingInterval time.Duration
batchSize int
}

type sumoLog struct {
line []byte
source string
time string
isPartial bool
}

func newSumoDriver() *sumoDriver {
return &sumoDriver{
loggers: make(map[string]*sumoLogger),
}
}

func (sumoDriver *sumoDriver) StartLogging(file string, info logger.Info) error {
newSumoLogger, err := sumoDriver.startLoggingInternal(file, info)
newSumoLogger, err := sumoDriver.NewSumoLogger(file, info)
if err != nil {
return err
}
go consumeLogsFromFifo(newSumoLogger)
go queueLogsForSending(newSumoLogger)
go newSumoLogger.consumeLogsFromFile()
go newSumoLogger.batchLogs()
go newSumoLogger.handleBatchedLogs()
return nil
}

func (sumoDriver *sumoDriver) startLoggingInternal(file string, info logger.Info) (*sumoLogger, error) {
func (sumoDriver *sumoDriver) NewSumoLogger(file string, info logger.Info) (*sumoLogger, error) {
sumoDriver.mu.Lock()
if _, exists := sumoDriver.loggers[file]; exists {
sumoDriver.mu.Unlock()
return nil, fmt.Errorf("a logger for %q already exists", file)
}
sumoDriver.mu.Unlock()

inputQueueFile, err := fifo.OpenFifo(context.Background(), file, syscall.O_RDONLY, fileMode)
httpClient := &http.Client{}

sendingInterval := defaultSendingIntervalMs
queueSize := defaultQueueSizeItems
batchSize := defaultBatchSizeBytes

/* https://github.com/containerd/fifo */
inputFile, err := fifo.OpenFifo(context.Background(), file, syscall.O_RDONLY, fileMode)
if err != nil {
return nil, errors.Wrapf(err, "error opening logger fifo: %q", file)
}

// TODO: make options configurable through logOpts
sendingFrequency := defaultSendingFrequency
streamSize := defaultStreamSize
batchSize := defaultBatchSize

newSumoLogger := &sumoLogger{
httpSourceUrl: info.Config[logOptUrl],
httpClient: &http.Client{},
inputQueueFile: inputQueueFile,
logQueue: make(chan *sumoLog, streamSize),
sendingFrequency: sendingFrequency,
httpClient: httpClient,
inputFile: inputFile,
logQueue: make(chan *sumoLog, 10 * queueSize),
logBatchQueue: make(chan []*sumoLog, queueSize),
sendingInterval: sendingInterval,
batchSize: batchSize,
}

Expand All @@ -113,99 +107,9 @@ func (sumoDriver *sumoDriver) StopLogging(file string) error {
sumoDriver.mu.Lock()
sumoLogger, exists := sumoDriver.loggers[file]
if exists {
sumoLogger.inputQueueFile.Close()
sumoLogger.inputFile.Close()
delete(sumoDriver.loggers, file)
}
sumoDriver.mu.Unlock()
return nil
}

func consumeLogsFromFifo(sumoLogger *sumoLogger) {
dec := protoio.NewUint32DelimitedReader(sumoLogger.inputQueueFile, binary.BigEndian, fileReaderMaxSize)
defer dec.Close()
var buf logdriver.LogEntry
for {
if err := dec.ReadMsg(&buf); err != nil {
if err == io.EOF {
sumoLogger.inputQueueFile.Close()
close(sumoLogger.logQueue)
return
}
logrus.Error(err)
dec = protoio.NewUint32DelimitedReader(sumoLogger.inputQueueFile, binary.BigEndian, fileReaderMaxSize)
}

// TODO: handle multi-line detection via Partial
log := &sumoLog{
line: buf.Line,
source: buf.Source,
time: time.Unix(0, buf.TimeNano).String(),
isPartial: buf.Partial,
}
sumoLogger.logQueue <- log
buf.Reset()
}
}

func queueLogsForSending(sumoLogger *sumoLogger) {
timer := time.NewTicker(sumoLogger.sendingFrequency)
var logs []*sumoLog
for {
select {
case <-timer.C:
if err := sumoLogger.sendLogs(logs); err != nil {
logrus.Error(err)
} else {
logs = logs[:0]
}
case log, open := <-sumoLogger.logQueue:
if !open {
if err := sumoLogger.sendLogs(logs); err != nil {
logrus.Error(err)
}
return
}
logs = append(logs, log)
if len(logs) % sumoLogger.batchSize == 0 {
if err := sumoLogger.sendLogs(logs); err != nil {
logrus.Error(err)
} else {
logs = logs[:0]
}
}
}
}
}

func (sumoLogger *sumoLogger) sendLogs(logs []*sumoLog) error {
logsCount := len(logs)
if logsCount == 0 {
return nil
}
var logsBatch bytes.Buffer
for _, log := range logs {
if _, err := logsBatch.Write(log.line); err != nil {
return err
}
}

// TODO: error handling, retries and exponential backoff
request, err := http.NewRequest("POST", sumoLogger.httpSourceUrl, bytes.NewBuffer(logsBatch.Bytes()))
if err != nil {
return err
}
response, err := sumoLogger.httpClient.Do(request)
if err != nil {
return err
}

defer response.Body.Close()
if response.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return err
}
return fmt.Errorf("%s: Failed to send event: %s - %s", pluginName, response.Status, body)
}
return nil
}
18 changes: 9 additions & 9 deletions driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
filePath1 = "/tmp/test1"
filePath2 = "/tmp/test2"

httpSourceUrl = "https://example.org"
testHttpSourceUrl = "https://example.org"

testSource = "sumo-test"
testTime = 1234567890
Expand All @@ -40,26 +40,26 @@ func TestDrivers (t *testing.T) {

info := logger.Info{
Config: map[string]string{
logOptUrl: httpSourceUrl,
logOptUrl: testHttpSourceUrl,
},
ContainerID: "containeriid",
}

t.Run("startLoggingInternal", func(t *testing.T) {
t.Run("NewSumoLogger", func(t *testing.T) {
testSumoDriver := newSumoDriver()
assert.Equal(t, 0, len(testSumoDriver.loggers), "there should be no loggers when the driver is initialized")

testSumoLogger1, err := testSumoDriver.startLoggingInternal(filePath1, info)
testSumoLogger1, err := testSumoDriver.NewSumoLogger(filePath1, info)
assert.Nil(t, err)
assert.Equal(t, 1, len(testSumoDriver.loggers), "there should be one logger after calling StartLogging on driver")
assert.Equal(t, info.Config[logOptUrl], testSumoLogger1.httpSourceUrl, "http source url should be configured correctly")

_, err = testSumoDriver.startLoggingInternal(filePath1, info)
_, err = testSumoDriver.NewSumoLogger(filePath1, info)
assert.Error(t, err, "trying to call StartLogging for filepath that already exists should return error")
assert.Equal(t, 1, len(testSumoDriver.loggers),
"there should still be one logger after calling StartLogging for filepath that already exists")

testSumoLogger2, err := testSumoDriver.startLoggingInternal(filePath2, info)
testSumoLogger2, err := testSumoDriver.NewSumoLogger(filePath2, info)
assert.Nil(t, err)
assert.Equal(t, 2, len(testSumoDriver.loggers),
"there should be two loggers now after calling StartLogging on driver for different filepaths")
Expand All @@ -81,7 +81,7 @@ func TestDrivers (t *testing.T) {
assert.Nil(t, err, "trying to call StopLogging for nonexistent logger should NOT return error")
assert.Equal(t, 0, len(testSumoDriver.loggers), "no loggers should be changed after calling StopLogging for nonexistent logger")

_, err = testSumoDriver.startLoggingInternal(filePath1, info)
_, err = testSumoDriver.NewSumoLogger(filePath1, info)
assert.Nil(t, err)
assert.Equal(t, 1, len(testSumoDriver.loggers), "there should be one logger after calling StartLogging on driver")

Expand All @@ -94,14 +94,14 @@ func TestDrivers (t *testing.T) {
assert.Equal(t, 0, len(testSumoDriver.loggers), "calling StopLogging on existing logger should remove the logger")
})

t.Run("startLoggingInternal, concurrently", func(t *testing.T) {
t.Run("NewSumoLogger, concurrently", func(t *testing.T) {
testSumoDriver := newSumoDriver()
assert.Equal(t, 0, len(testSumoDriver.loggers), "there should be no loggers when the driver is initialized")

waitForAllLoggers := make(chan int)
for i := 0; i < testLoggersCount; i++ {
go func(i int) {
_, err := testSumoDriver.startLoggingInternal(filePath + strconv.Itoa(i + 1), info)
_, err := testSumoDriver.NewSumoLogger(filePath + strconv.Itoa(i + 1), info)
assert.Nil(t, err)
waitForAllLoggers <- i
}(i)
Expand Down
Loading

0 comments on commit b3e9b12

Please sign in to comment.