Skip to content

Commit

Permalink
Develop
Browse files Browse the repository at this point in the history
* Added watcher tests
* Added test reporting and test summary to ci
* Replaced Uber rate limiter with Golang rate limiter
* Added buffer fill support to windows and linux
  • Loading branch information
danlapid authored Sep 1, 2022
1 parent f9023ad commit 641b6bf
Show file tree
Hide file tree
Showing 29 changed files with 896 additions and 244 deletions.
39 changes: 39 additions & 0 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: Coverage
on:
push:
branches:
- main
pull_request:
branches:
- main
jobs:
coverage:
strategy:
fail-fast: false
matrix:
os:
- ubuntu-latest
go:
- "1.19"
runs-on: ${{ matrix.os }}
steps:
- name: Checkout code
uses: actions/checkout@v3
with:
fetch-depth: 1

- name: Install Go
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go }}

- name: Calc coverage
run: |
go test -v -covermode=count -coverprofile=coverage.out -coverpkg ./pkg/... ./...
- name: Convert coverage.out to coverage.lcov
uses: jandelgado/gcov2lcov-action@master
- name: Coveralls
uses: coverallsapp/github-action@master
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
path-to-lcov: coverage.lcov
46 changes: 19 additions & 27 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,28 @@ jobs:
go-version: ${{ matrix.go }}

- name: test
if: runner.os != 'Windows'
run: |
go test -v -race ./...
go install github.com/jstemmer/go-junit-report/v2@latest
go test -v -race ./... 2>&1 | go-junit-report -set-exit-code -iocopy -out report.xml
coverage:
strategy:
fail-fast: false
matrix:
os:
- ubuntu-latest
go:
- "1.19"
runs-on: ${{ matrix.os }}
steps:
- name: Checkout code
uses: actions/checkout@v3
with:
fetch-depth: 1
- name: test-windows
if: runner.os == 'Windows'
run: |
go install github.com/jstemmer/go-junit-report/v2@latest
go test -v ./... 2>&1 | go-junit-report -set-exit-code -iocopy -out report.xml
- name: Install Go
uses: actions/setup-go@v3
- name: Test Report
uses: dorny/test-reporter@v1
with:
go-version: ${{ matrix.go }}
name: ${{ matrix.os }} Tests
path: report.xml
reporter: java-junit
fail-on-error: "false"
if: always()

- name: Calc coverage
run: |
go test -v -covermode=count -coverprofile=coverage.out -coverpkg ./pkg/... ./...
- name: Convert coverage.out to coverage.lcov
uses: jandelgado/gcov2lcov-action@master
- name: Coveralls
uses: coverallsapp/github-action@master
- name: Test Summary
uses: test-summary/action@v1
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
path-to-lcov: coverage.lcov
paths: report.xml
if: always()
12 changes: 1 addition & 11 deletions cmd/receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ import (
"oneway-filesync/pkg/database"
"oneway-filesync/pkg/receiver"
"oneway-filesync/pkg/utils"
"os"
"os/signal"
"syscall"

"github.com/sirupsen/logrus"
)
Expand All @@ -28,16 +25,9 @@ func main() {
return
}

if err = database.ConfigureDatabase(db); err != nil {
logrus.Errorf("Failed setting up db with err %v", err)
return
}

ctx, cancel := context.WithCancel(context.Background()) // Create a cancelable context and pass it to all goroutines, allows us to gracefully shut down the program
receiver.Receiver(ctx, db, conf)

done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
<-done
<-utils.CtrlC()
cancel() // Gracefully shutdown and stop all goroutines
}
12 changes: 1 addition & 11 deletions cmd/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ import (
"oneway-filesync/pkg/database"
"oneway-filesync/pkg/sender"
"oneway-filesync/pkg/utils"
"os"
"os/signal"
"syscall"

"github.com/sirupsen/logrus"
)
Expand All @@ -27,16 +24,9 @@ func main() {
return
}

if err = database.ConfigureDatabase(db); err != nil {
logrus.Errorf("Failed setting up db with err %v", err)
return
}

ctx, cancel := context.WithCancel(context.Background()) // Create a cancelable context and pass it to all goroutines, allows us to gracefully shut down the program
sender.Sender(ctx, db, conf)

done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
<-done
<-utils.CtrlC()
cancel() // Gracefully shutdown and stop all goroutines
}
5 changes: 0 additions & 5 deletions cmd/sendfiles/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ func main() {
return
}

if err = database.ConfigureDatabase(db); err != nil {
fmt.Printf("Failed setting up db with err %v\n", err)
return
}

path := os.Args[1]
err = filepath.Walk(path, func(filepath string, info os.FileInfo, e error) error {
if !info.IsDir() {
Expand Down
23 changes: 6 additions & 17 deletions cmd/watcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,31 @@ package main

import (
"context"
"oneway-filesync/pkg/config"
"oneway-filesync/pkg/database"
"oneway-filesync/pkg/utils"
"oneway-filesync/pkg/watcher"
"os"
"os/signal"
"syscall"

"github.com/rjeczalik/notify"
"github.com/sirupsen/logrus"
)

func main() {
utils.InitializeLogging("watcher.log")
if len(os.Args) < 2 {
logrus.Errorf("Usage: %s <dir_path>", os.Args[0])
conf, err := config.GetConfig("config.toml")
if err != nil {
logrus.Errorf("Failed reading config with err %v", err)
return
}
path := os.Args[1]

db, err := database.OpenDatabase("s_")
if err != nil {
logrus.Errorf("%v", err)
return
}

if err = database.ConfigureDatabase(db); err != nil {
logrus.Errorf("Failed setting up db with err %v", err)
return
}

events := make(chan notify.EventInfo, 20)
ctx, cancel := context.WithCancel(context.Background()) // Create a cancelable context and pass it to all goroutines, allows us to gracefully shut down the program
watcher.CreateWatcher(ctx, db, path, events)
watcher.Watcher(ctx, db, conf)

done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
<-done
<-utils.CtrlC()
cancel() // Gracefully shutdown and stop all goroutines
}
1 change: 1 addition & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ ChunkSize = 8192
ChunkFecRequired = 5
ChunkFecTotal = 10
OutDir = "./out"
WatchDir = "./tmp"
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@ go 1.19
require (
github.com/BurntSushi/toml v1.2.0
github.com/klauspost/reedsolomon v1.10.0
github.com/rjeczalik/notify v0.9.2
github.com/rjeczalik/notify v0.9.3-0.20210809113154-3472d85e95cd
github.com/sirupsen/logrus v1.9.0
github.com/zhuangsirui/binpacker v2.0.0+incompatible
go.uber.org/ratelimit v0.2.0
golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9
gorm.io/driver/sqlite v1.3.6
gorm.io/gorm v1.23.8
)

require (
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/klauspost/cpuid/v2 v2.0.14 // indirect
Expand Down
16 changes: 6 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
github.com/BurntSushi/toml v1.2.0 h1:Rt8g24XnyGTyglgET/PRUNlrUeu9F5L+7FilkXfZgs0=
github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI=
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -20,23 +18,21 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rjeczalik/notify v0.9.2 h1:MiTWrPj55mNDHEiIX5YUSKefw/+lCQVoAFmD6oQm5w8=
github.com/rjeczalik/notify v0.9.2/go.mod h1:aErll2f0sUX9PXZnVNyeiObbmTlk5jnMoCa4QEjJeqM=
github.com/rjeczalik/notify v0.9.3-0.20210809113154-3472d85e95cd h1:LHLg0gdpRUCvujg2Zol6e2Uknq5vHycLxqEzYwxt1vY=
github.com/rjeczalik/notify v0.9.3-0.20210809113154-3472d85e95cd/go.mod h1:gF3zSOrafR9DQEWSE8TjfI9NkooDxbyT4UgRGKZA0lc=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/zhuangsirui/binpacker v2.0.0+incompatible h1:s2wDYWXT4IznT7NUFzn5gJHqjtWz/zIwUxdiFGNomdk=
github.com/zhuangsirui/binpacker v2.0.0+incompatible/go.mod h1:TdE7uEZ8Q7sMzbCpk2Y+ksFB8yA5AErPz0meDB612rU=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/ratelimit v0.2.0 h1:UQE2Bgi7p2B85uP5dC2bbRtig0C+OeNRnNEafLjsLPA=
go.uber.org/ratelimit v0.2.0/go.mod h1:YYBV4e4naJvhpitQrWJu1vCpgB7CboMe0qhltKt6mUg=
golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64 h1:UiNENfZ8gDvpiWw7IpOMQ27spWmThO1RwwdQVbJahJM=
golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY=
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 h1:ftMN5LMiBFjbzleLqtoBZk7KdJwhuybIU+FckUHgoyQ=
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
16 changes: 10 additions & 6 deletions pkg/bandwidthlimiter/bandwidthlimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"oneway-filesync/pkg/structs"

"go.uber.org/ratelimit"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)

type bandwidthLimiterConfig struct {
rl ratelimit.Limiter
rl *rate.Limiter
input chan *structs.Chunk
output chan *structs.Chunk
}
Expand All @@ -19,15 +20,18 @@ func worker(ctx context.Context, conf *bandwidthLimiterConfig) {
case <-ctx.Done():
return
case buf := <-conf.input:
conf.rl.Take()
conf.output <- buf
if err := conf.rl.WaitN(ctx, len(buf.Data)); err != nil {
logrus.Error(err)
} else {
conf.output <- buf
}
}
}
}

func CreateBandwidthLimiter(ctx context.Context, chunks_per_sec int, input chan *structs.Chunk, output chan *structs.Chunk) {
func CreateBandwidthLimiter(ctx context.Context, bandwidth int, chunksize int, input chan *structs.Chunk, output chan *structs.Chunk) {
conf := bandwidthLimiterConfig{
rl: ratelimit.New(chunks_per_sec),
rl: rate.NewLimiter(rate.Limit(bandwidth), chunksize),
input: input,
output: output,
}
Expand Down
28 changes: 17 additions & 11 deletions pkg/bandwidthlimiter/bandwidthlimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,37 @@ import (

func TestCreateBandwidthLimiter(t *testing.T) {
type args struct {
chunks int
chunks_per_sec int
chunk_count int
chunk_size int
bytes_per_sec int
}
tests := []struct {
name string
args args
}{
{name: "test1", args: args{chunks: 100, chunks_per_sec: 10}},
{name: "test1", args: args{chunk_count: 100, chunk_size: 8000, bytes_per_sec: 240000}},
{name: "test2", args: args{chunk_count: 300, chunk_size: 8000, bytes_per_sec: 240000}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
expected := float64(tt.args.chunks) / float64(tt.args.chunks_per_sec)
ch_in := make(chan *structs.Chunk, tt.args.chunks)
ch_out := make(chan *structs.Chunk, tt.args.chunks)
for i := 0; i < tt.args.chunks; i++ {
ch_in <- &structs.Chunk{}
expected := (float64(tt.args.chunk_size) / float64(tt.args.bytes_per_sec)) * float64(tt.args.chunk_count)
ch_in := make(chan *structs.Chunk, tt.args.chunk_count)
ch_out := make(chan *structs.Chunk, tt.args.chunk_count)

chunk := structs.Chunk{Data: make([]byte, tt.args.chunk_size)}
for i := 0; i < tt.args.chunk_count; i++ {
ch_in <- &chunk
}

ctx, cancel := context.WithCancel(context.Background())
start := time.Now()
bandwidthlimiter.CreateBandwidthLimiter(ctx, tt.args.chunks_per_sec, ch_in, ch_out)
for i := 0; i < tt.args.chunks; i++ {
bandwidthlimiter.CreateBandwidthLimiter(ctx, tt.args.bytes_per_sec, tt.args.chunk_size, ch_in, ch_out)
for i := 0; i < tt.args.chunk_count; i++ {
<-ch_out
}
timepast := time.Since(start)
if timepast > time.Duration(expected+1)*time.Second || timepast < time.Duration(expected-1)*time.Second {

if timepast > time.Duration(expected*1.2)*time.Second || timepast < time.Duration(expected*0.8)*time.Second {
t.Fatalf("Bandwidthlimiter took %f seconds instead of %f", timepast.Seconds(), expected)
}
cancel()
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Config struct {
ChunkFecRequired int
ChunkFecTotal int
OutDir string
WatchDir string
}

func GetConfig(file string) (Config, error) {
Expand Down
Loading

0 comments on commit 641b6bf

Please sign in to comment.