From 46a4cbbd03cb53a306336f92f7c78ac5a3f1f51c Mon Sep 17 00:00:00 2001 From: Dan Lapid Date: Sat, 27 Aug 2022 20:44:34 +0300 Subject: [PATCH] Develop (#2) Added watcher and a lot of ci stuff --- .devcontainer/.env | 4 - .devcontainer/Dockerfile | 19 -- .devcontainer/devcontainer.json | 36 ---- .devcontainer/docker-compose.yml | 55 ------ .gitattributes | 1 + .github/workflows/build.yml | 30 ++++ .github/workflows/go.yml | 24 --- .github/workflows/lint.yml | 39 +++++ .github/workflows/release.yml | 35 ++++ .github/workflows/test.yml | 65 +++++++ .goreleaser.yml | 95 ++++++++++ cmd/receiver/main.go | 9 +- cmd/sender/main.go | 9 +- cmd/sendfiles/main.go | 7 +- cmd/watcher/main.go | 43 +++++ go.mod | 3 +- go.sum | 6 +- pkg/bandwidthlimiter/bandwidthlimiter.go | 8 +- pkg/bandwidthlimiter/bandwidthlimiter_test.go | 43 +++++ pkg/database/database.go | 5 +- pkg/fecdecoder/fecdecoder.go | 23 +-- pkg/fecencoder/fecencoder.go | 8 +- pkg/filecloser/filecloser.go | 30 +++- pkg/filereader/filereader.go | 8 +- pkg/filewriter/filewriter.go | 37 ++-- pkg/queuereader/queuereader.go | 8 +- pkg/receiver/receiver.go | 2 +- pkg/sender/sender.go | 2 +- pkg/shareassembler/shareassembler.go | 55 +++--- pkg/udpreceiver/udpreceiver.go | 43 ++--- pkg/udpsender/udpsender.go | 19 +- pkg/utils/unix_ioctl.go | 50 ++++++ pkg/utils/windows_ioctl.go | 16 ++ pkg/watcher/watcher.go | 58 +++++++ tests/system_test.go | 164 ++++++++++-------- 35 files changed, 715 insertions(+), 344 deletions(-) delete mode 100644 .devcontainer/.env delete mode 100644 .devcontainer/Dockerfile delete mode 100644 .devcontainer/devcontainer.json delete mode 100644 .devcontainer/docker-compose.yml create mode 100644 .gitattributes create mode 100644 .github/workflows/build.yml delete mode 100644 .github/workflows/go.yml create mode 100644 .github/workflows/lint.yml create mode 100644 .github/workflows/release.yml create mode 100644 .github/workflows/test.yml create mode 100644 .goreleaser.yml create mode 100644 cmd/watcher/main.go create mode 100644 pkg/bandwidthlimiter/bandwidthlimiter_test.go create mode 100644 pkg/utils/unix_ioctl.go create mode 100644 pkg/utils/windows_ioctl.go create mode 100644 pkg/watcher/watcher.go diff --git a/.devcontainer/.env b/.devcontainer/.env deleted file mode 100644 index 15181a6..0000000 --- a/.devcontainer/.env +++ /dev/null @@ -1,4 +0,0 @@ -POSTGRES_USER=postgres -POSTGRES_PASSWORD=postgres -POSTGRES_DB=postgres -POSTGRES_HOSTNAME=localhost diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile deleted file mode 100644 index 4feec86..0000000 --- a/.devcontainer/Dockerfile +++ /dev/null @@ -1,19 +0,0 @@ -# [Choice] Go version (use -bullseye variants on local arm64/Apple Silicon): 1, 1.18, 1.17, 1-bullseye, 1.18-bullseye, 1.17-bullseye, 1-buster, 1.18-buster, 1.17-buster -ARG VARIANT=1-bullseye -FROM mcr.microsoft.com/vscode/devcontainers/go:0-${VARIANT} - -# [Choice] Node.js version: none, lts/*, 16, 14, 12, 10 -ARG NODE_VERSION="none" -RUN if [ "${NODE_VERSION}" != "none" ]; then su vscode -c "umask 0002 && . /usr/local/share/nvm/nvm.sh && nvm install ${NODE_VERSION} 2>&1"; fi - -# [Optional] Uncomment this section to install additional OS packages. -# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ -# && apt-get -y install --no-install-recommends - -# [Optional] Uncomment the next lines to use go get to install anything else you need -# USER vscode -# RUN go get -x -# USER root - -# [Optional] Uncomment this line to install global node packages. -# RUN su vscode -c "source /usr/local/share/nvm/nvm.sh && npm install -g " 2>&1 diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json deleted file mode 100644 index b27c0aa..0000000 --- a/.devcontainer/devcontainer.json +++ /dev/null @@ -1,36 +0,0 @@ -// For format details, see https://aka.ms/devcontainer.json. For config options, see the README at: -// https://github.com/microsoft/vscode-dev-containers/tree/v0.245.0/containers/go-postgres -{ - "name": "Go & PostgreSQL", - "dockerComposeFile": "docker-compose.yml", - "service": "app", - "workspaceFolder": "/workspace", - - // Configure tool-specific properties. - "customizations": { - // Configure properties specific to VS Code. - "vscode": { - // Set *default* container specific settings.json values on container create. - "settings": { - "go.toolsManagement.checkForUpdates": "local", - "go.useLanguageServer": true, - "go.gopath": "/go", - "go.goroot": "/usr/local/go" - }, - - // Add the IDs of extensions you want installed when the container is created. - "extensions": [ - "golang.Go" - ] - } - }, - - // Use 'forwardPorts' to make a list of ports inside the container available locally. - // "forwardPorts": [5432], - - // Use 'postCreateCommand' to run commands after the container is created. - // "postCreateCommand": "go version", - - // Comment out to connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. - "remoteUser": "vscode" -} diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml deleted file mode 100644 index 48ec27b..0000000 --- a/.devcontainer/docker-compose.yml +++ /dev/null @@ -1,55 +0,0 @@ -version: '3.8' - -volumes: - postgres-data: - -services: - app: - build: - context: . - dockerfile: Dockerfile - args: - # [Choice] Go version 1, 1.18, 1.17 - # Append -bullseye or -buster to pin to an OS version. - # Use -bullseye variants on local arm64/Apple Silicon. - VARIANT: 1-bullseye - # Options - NODE_VERSION: "lts/*" - env_file: - # Ensure that the variables in .env match the same variables in devcontainer.json - - .env - - # Security Opt and cap_add allow for C++ based debuggers to work. - # See `runArgs`: https://github.com/Microsoft/vscode-docs/blob/main/docs/remote/devcontainerjson-reference.md -# security_opt: -# - seccomp:unconfined -# cap_add: -# - SYS_PTRACE - - volumes: - - ..:/workspace:cached - - # Overrides default command so things don't shut down after the process ends. - command: sleep infinity - - # Runs app on the same network as the database container, allows "forwardPorts" in devcontainer.json function. - network_mode: service:db - - # Uncomment the next line to use a non-root user for all processes. - # user: vscode - - # Use "forwardPorts" in **devcontainer.json** to forward an app port locally. - # (Adding the "ports" property to this file will not forward from a Codespace.) - - db: - image: postgres:latest - restart: unless-stopped - volumes: - - postgres-data:/var/lib/postgresql/data - env_file: - # Ensure that the variables in .env match the same variables in devcontainer.json - - .env - - - # Add "forwardPorts": ["5432"] to **devcontainer.json** to forward PostgreSQL locally. - # (Adding the "ports" property to this file will not forward from a Codespace.) diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..d207b18 --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +*.go text eol=lf diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..74f3865 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,30 @@ +name: Build + +on: + push: + paths: ["**.go", "go.mod", ".github/workflows/*"] + +jobs: + build: + strategy: + fail-fast: false + matrix: + go: + - "1.19" + runs-on: macos-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + with: + fetch-depth: 1 + + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: ${{ matrix.go }} + + - name: Run GoReleaser + uses: goreleaser/goreleaser-action@v3 + with: + version: latest + args: release --snapshot --rm-dist diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml deleted file mode 100644 index 75bbc75..0000000 --- a/.github/workflows/go.yml +++ /dev/null @@ -1,24 +0,0 @@ -name: Go - -on: - push: - branches: [ "main" ] - pull_request: - branches: [ "main" ] - -jobs: - build: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v3 - with: - go-version: 1.19 - - - name: Build - run: go build -v ./... - - - name: Test - run: go test -v ./... diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..44593ab --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,39 @@ +name: Lint +on: + push: + paths: ["**.go", "go.mod", ".github/workflows/*"] + +jobs: + lint: + name: "lint" + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + go: + - "1.19" + steps: + - name: Checkout code + uses: actions/checkout@v3 + with: + fetch-depth: 1 + + - name: setup Go + uses: actions/setup-go@v3 + with: + go-version: ${{ matrix.go }} + + - name: Run linters + uses: golangci/golangci-lint-action@v3 + with: + version: latest + + - name: Go vet + run: "go vet ./..." + + - name: Staticcheck + uses: dominikh/staticcheck-action@master + with: + version: "2022.1" + install-go: false + cache-key: ${{ matrix.go }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..33404fc --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,35 @@ +on: + push: + tags: + - "v*" + +name: Upload release assets after tagging +jobs: + build: + name: Create assets + strategy: + fail-fast: false + matrix: + os: + - macos-latest + go: + - "1.19" + runs-on: ${{ matrix.os }} + steps: + - name: Checkout code + uses: actions/checkout@v3 + with: + fetch-depth: 1 + + - name: Install Go + uses: actions/setup-go@v3 + with: + go-version: ${{ matrix.go }} + + - name: Run GoReleaser + uses: goreleaser/goreleaser-action@v3 + with: + version: latest + args: release --rm-dist + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..f54905f --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,65 @@ +name: Test +on: + push: + branches: + - main + pull_request: + branches: + - main +jobs: + test: + strategy: + fail-fast: false + matrix: + os: + - ubuntu-latest + - windows-latest + - macos-latest + go: + - "1.19" + runs-on: ${{ matrix.os }} + steps: + - name: Checkout code + uses: actions/checkout@v3 + with: + fetch-depth: 1 + + - name: setup Go + uses: actions/setup-go@v3 + with: + go-version: ${{ matrix.go }} + + - name: test + run: | + go test -v -race ./... + + coverage: + strategy: + fail-fast: false + matrix: + os: + - ubuntu-latest + go: + - "1.19" + runs-on: ${{ matrix.os }} + steps: + - name: Checkout code + uses: actions/checkout@v3 + with: + fetch-depth: 1 + + - name: Install Go + uses: actions/setup-go@v3 + with: + go-version: ${{ matrix.go }} + + - name: Calc coverage + run: | + go test -v -covermode=count -coverprofile=coverage.out -coverpkg ./pkg/... ./... + - name: Convert coverage.out to coverage.lcov + uses: jandelgado/gcov2lcov-action@master + - name: Coveralls + uses: coverallsapp/github-action@master + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + path-to-lcov: coverage.lcov diff --git a/.goreleaser.yml b/.goreleaser.yml new file mode 100644 index 0000000..3289379 --- /dev/null +++ b/.goreleaser.yml @@ -0,0 +1,95 @@ +before: + hooks: + - go mod tidy +builds: + - id: sender + main: ./cmd/sender/main.go + binary: sender + env: + - CGO_ENABLED=0 + goos: + - linux + - darwin + - windows + goarch: + - amd64 + - "386" + - arm64 + ignore: + - goos: darwin + goarch: "386" + - id: receiver + main: ./cmd/receiver/main.go + binary: receiver + env: + - CGO_ENABLED=0 + goos: + - linux + - darwin + - windows + goarch: + - amd64 + - "386" + - arm64 + ignore: + - goos: darwin + goarch: "386" + - id: sendfiles + main: ./cmd/sendfiles/main.go + binary: sendfiles + env: + - CGO_ENABLED=0 + goos: + - linux + - darwin + - windows + goarch: + - amd64 + - "386" + - arm64 + ignore: + - goos: darwin + goarch: "386" + - id: watcher + main: ./cmd/watcher/main.go + binary: watcher + env: + - CGO_ENABLED=0 + goos: + - linux + - windows + goarch: + - amd64 + - "386" + - arm64 + - id: watcher-mac + main: ./cmd/watcher/main.go + binary: watcher + env: + - CGO_ENABLED=1 + goos: + - darwin + goarch: + - amd64 + - arm64 +archives: + - files: + - README.md + - LICENSE + replacements: + darwin: MacOS + linux: Linux + windows: Windows + 386: x86_32 + amd64: x86_64 + arm64: Arm64 +checksum: + name_template: "checksums.txt" +snapshot: + name_template: "{{ incpatch .Version }}" +changelog: + sort: asc + filters: + exclude: + - "^docs:" + - "^test:" diff --git a/cmd/receiver/main.go b/cmd/receiver/main.go index 07058a7..0856643 100644 --- a/cmd/receiver/main.go +++ b/cmd/receiver/main.go @@ -14,26 +14,25 @@ import ( ) func main() { + utils.InitializeLogging("receiver.log") conf, err := config.GetConfig("config.toml") if err != nil { - logrus.Errorf("Failed reading config with err %v\n", err) + logrus.Errorf("Failed reading config with err %v", err) return } db, err := database.OpenDatabase("r_") if err != nil { - logrus.Errorf("Failed connecting to db with err %v\n", err) + logrus.Errorf("Failed connecting to db with err %v", err) return } if err = database.ConfigureDatabase(db); err != nil { - logrus.Errorf("Failed setting up db with err %v\n", err) + logrus.Errorf("Failed setting up db with err %v", err) return } - utils.InitializeLogging("receiver.log") - ctx, cancel := context.WithCancel(context.Background()) // Create a cancelable context and pass it to all goroutines, allows us to gracefully shut down the program receiver.Receiver(ctx, db, conf) diff --git a/cmd/sender/main.go b/cmd/sender/main.go index 442897e..76eabf7 100644 --- a/cmd/sender/main.go +++ b/cmd/sender/main.go @@ -14,25 +14,24 @@ import ( ) func main() { + utils.InitializeLogging("sender.log") conf, err := config.GetConfig("config.toml") if err != nil { - logrus.Errorf("Failed reading config with err %v\n", err) + logrus.Errorf("Failed reading config with err %v", err) return } db, err := database.OpenDatabase("s_") if err != nil { - logrus.Errorf("Failed connecting to db with err %v\n", err) + logrus.Errorf("Failed connecting to db with err %v", err) return } if err = database.ConfigureDatabase(db); err != nil { - logrus.Errorf("Failed setting up db with err %v\n", err) + logrus.Errorf("Failed setting up db with err %v", err) return } - utils.InitializeLogging("sender.log") - ctx, cancel := context.WithCancel(context.Background()) // Create a cancelable context and pass it to all goroutines, allows us to gracefully shut down the program sender.Sender(ctx, db, conf) diff --git a/cmd/sendfiles/main.go b/cmd/sendfiles/main.go index 6939e3a..142109a 100644 --- a/cmd/sendfiles/main.go +++ b/cmd/sendfiles/main.go @@ -16,6 +16,7 @@ func main() { db, err := database.OpenDatabase("s_") if err != nil { fmt.Printf("%v\n", err) + return } if err = database.ConfigureDatabase(db); err != nil { @@ -24,7 +25,7 @@ func main() { } path := os.Args[1] - filepath.Walk(path, func(filepath string, info os.FileInfo, e error) error { + err = filepath.Walk(path, func(filepath string, info os.FileInfo, e error) error { if !info.IsDir() { err := database.QueueFileForSending(db, filepath) if err != nil { @@ -35,4 +36,8 @@ func main() { } return nil }) + if err != nil { + fmt.Printf("Failed walking dir with err %v\n", err) + return + } } diff --git a/cmd/watcher/main.go b/cmd/watcher/main.go new file mode 100644 index 0000000..a0a557a --- /dev/null +++ b/cmd/watcher/main.go @@ -0,0 +1,43 @@ +package main + +import ( + "context" + "oneway-filesync/pkg/database" + "oneway-filesync/pkg/utils" + "oneway-filesync/pkg/watcher" + "os" + "os/signal" + "syscall" + + "github.com/rjeczalik/notify" + "github.com/sirupsen/logrus" +) + +func main() { + utils.InitializeLogging("watcher.log") + if len(os.Args) < 2 { + logrus.Errorf("Usage: %s ", os.Args[0]) + return + } + path := os.Args[1] + + db, err := database.OpenDatabase("s_") + if err != nil { + logrus.Errorf("%v", err) + return + } + + if err = database.ConfigureDatabase(db); err != nil { + logrus.Errorf("Failed setting up db with err %v", err) + return + } + + events := make(chan notify.EventInfo, 20) + ctx, cancel := context.WithCancel(context.Background()) // Create a cancelable context and pass it to all goroutines, allows us to gracefully shut down the program + watcher.CreateWatcher(ctx, db, path, events) + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) + <-done + cancel() // Gracefully shutdown and stop all goroutines +} diff --git a/go.mod b/go.mod index c9cb99d..ca29ed0 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,11 @@ go 1.19 require ( github.com/BurntSushi/toml v1.2.0 github.com/klauspost/reedsolomon v1.10.0 + github.com/rjeczalik/notify v0.9.2 github.com/sirupsen/logrus v1.9.0 github.com/zhuangsirui/binpacker v2.0.0+incompatible go.uber.org/ratelimit v0.2.0 - golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 + golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64 gorm.io/driver/sqlite v1.3.6 gorm.io/gorm v1.23.8 ) diff --git a/go.sum b/go.sum index 1ece9af..459c504 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/mattn/go-sqlite3 v1.14.12 h1:TJ1bhYJPV44phC+IMu1u2K/i5RriLTPe+yc68XDJ github.com/mattn/go-sqlite3 v1.14.12/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rjeczalik/notify v0.9.2 h1:MiTWrPj55mNDHEiIX5YUSKefw/+lCQVoAFmD6oQm5w8= +github.com/rjeczalik/notify v0.9.2/go.mod h1:aErll2f0sUX9PXZnVNyeiObbmTlk5jnMoCa4QEjJeqM= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -31,8 +33,10 @@ go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/ratelimit v0.2.0 h1:UQE2Bgi7p2B85uP5dC2bbRtig0C+OeNRnNEafLjsLPA= go.uber.org/ratelimit v0.2.0/go.mod h1:YYBV4e4naJvhpitQrWJu1vCpgB7CboMe0qhltKt6mUg= -golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64 h1:UiNENfZ8gDvpiWw7IpOMQ27spWmThO1RwwdQVbJahJM= +golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/bandwidthlimiter/bandwidthlimiter.go b/pkg/bandwidthlimiter/bandwidthlimiter.go index 6cf1adb..29f06f8 100644 --- a/pkg/bandwidthlimiter/bandwidthlimiter.go +++ b/pkg/bandwidthlimiter/bandwidthlimiter.go @@ -7,13 +7,13 @@ import ( "go.uber.org/ratelimit" ) -type BandwidthLimiter struct { +type bandwidthLimiterConfig struct { rl ratelimit.Limiter input chan *structs.Chunk output chan *structs.Chunk } -func Worker(ctx context.Context, conf *BandwidthLimiter) { +func worker(ctx context.Context, conf *bandwidthLimiterConfig) { for { select { case <-ctx.Done(): @@ -26,10 +26,10 @@ func Worker(ctx context.Context, conf *BandwidthLimiter) { } func CreateBandwidthLimiter(ctx context.Context, chunks_per_sec int, input chan *structs.Chunk, output chan *structs.Chunk) { - conf := BandwidthLimiter{ + conf := bandwidthLimiterConfig{ rl: ratelimit.New(chunks_per_sec), input: input, output: output, } - go Worker(ctx, &conf) + go worker(ctx, &conf) } diff --git a/pkg/bandwidthlimiter/bandwidthlimiter_test.go b/pkg/bandwidthlimiter/bandwidthlimiter_test.go new file mode 100644 index 0000000..87f6e53 --- /dev/null +++ b/pkg/bandwidthlimiter/bandwidthlimiter_test.go @@ -0,0 +1,43 @@ +package bandwidthlimiter_test + +import ( + "context" + "oneway-filesync/pkg/bandwidthlimiter" + "oneway-filesync/pkg/structs" + "testing" + "time" +) + +func TestCreateBandwidthLimiter(t *testing.T) { + type args struct { + chunks int + chunks_per_sec int + } + tests := []struct { + name string + args args + }{ + {name: "test1", args: args{chunks: 100, chunks_per_sec: 10}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + expected := float64(tt.args.chunks) / float64(tt.args.chunks_per_sec) + ch_in := make(chan *structs.Chunk, tt.args.chunks) + ch_out := make(chan *structs.Chunk, tt.args.chunks) + for i := 0; i < tt.args.chunks; i++ { + ch_in <- &structs.Chunk{} + } + ctx, cancel := context.WithCancel(context.Background()) + start := time.Now() + bandwidthlimiter.CreateBandwidthLimiter(ctx, tt.args.chunks_per_sec, ch_in, ch_out) + for i := 0; i < tt.args.chunks; i++ { + <-ch_out + } + timepast := time.Since(start) + if timepast > time.Duration(expected+1)*time.Second || timepast < time.Duration(expected-1)*time.Second { + t.Fatalf("Bandwidthlimiter took %f seconds instead of %f", timepast.Seconds(), expected) + } + cancel() + }) + } +} diff --git a/pkg/database/database.go b/pkg/database/database.go index 5eb1eab..2ecf5b2 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -44,7 +44,10 @@ func ConfigureDatabase(db *gorm.DB) error { func ClearDatabase(db *gorm.DB) error { stmt := &gorm.Statement{DB: db} - stmt.Parse(&File{}) + err := stmt.Parse(&File{}) + if err != nil { + return err + } tablename := stmt.Schema.Table return db.Exec(fmt.Sprintf("DELETE FROM %s", tablename)).Error } diff --git a/pkg/fecdecoder/fecdecoder.go b/pkg/fecdecoder/fecdecoder.go index 6fe46cd..edaa18d 100644 --- a/pkg/fecdecoder/fecdecoder.go +++ b/pkg/fecdecoder/fecdecoder.go @@ -4,34 +4,19 @@ import ( "context" "fmt" "oneway-filesync/pkg/structs" - "time" "github.com/klauspost/reedsolomon" "github.com/sirupsen/logrus" ) -// Cache docs: -// For every (FileHash,FileDataOffset) we save a cache of shares -// Since we need at least shares to create the original data we have to cache them somewhere -// After we get shares we can pull them and create the data but then up to (-) will continue coming in -// The LastUpdated is a field which we can time out based upon and -type CacheKey struct { - Hash [structs.HASHSIZE]byte - DataOffset int64 -} -type CacheValue struct { - Shares chan *structs.Chunk - LastUpdated time.Time -} - -type FecDecoder struct { +type fecDecoderConfig struct { required int total int input chan []*structs.Chunk output chan *structs.Chunk } -func Worker(ctx context.Context, conf *FecDecoder) { +func worker(ctx context.Context, conf *fecDecoderConfig) { fec, err := reedsolomon.New(conf.required, conf.total-conf.required) if err != nil { logrus.Errorf("Error creating fec object: %v", err) @@ -70,13 +55,13 @@ func Worker(ctx context.Context, conf *FecDecoder) { } func CreateFecDecoder(ctx context.Context, required int, total int, input chan []*structs.Chunk, output chan *structs.Chunk, workercount int) { - conf := FecDecoder{ + conf := fecDecoderConfig{ required: required, total: total, input: input, output: output, } for i := 0; i < workercount; i++ { - go Worker(ctx, &conf) + go worker(ctx, &conf) } } diff --git a/pkg/fecencoder/fecencoder.go b/pkg/fecencoder/fecencoder.go index 7402ef8..bc1ef15 100644 --- a/pkg/fecencoder/fecencoder.go +++ b/pkg/fecencoder/fecencoder.go @@ -9,7 +9,7 @@ import ( "github.com/sirupsen/logrus" ) -type FecEncoder struct { +type fecEncoderConfig struct { chunksize int required int total int @@ -22,7 +22,7 @@ type FecEncoder struct { // These are encoding using reed solomon FEC // Then we send each share seperately // At the end they are combined and concatenated to form the file. -func Worker(ctx context.Context, conf *FecEncoder) { +func worker(ctx context.Context, conf *fecEncoderConfig) { fec, err := reedsolomon.New(conf.required, conf.total-conf.required) if err != nil { logrus.Errorf("Error creating fec object: %v", err) @@ -72,7 +72,7 @@ func Worker(ctx context.Context, conf *FecEncoder) { } func CreateFecEncoder(ctx context.Context, chunksize int, required int, total int, input chan *structs.Chunk, output chan *structs.Chunk, workercount int) { - conf := FecEncoder{ + conf := fecEncoderConfig{ chunksize: chunksize, required: required, total: total, @@ -80,6 +80,6 @@ func CreateFecEncoder(ctx context.Context, chunksize int, required int, total in output: output, } for i := 0; i < workercount; i++ { - go Worker(ctx, &conf) + go worker(ctx, &conf) } } diff --git a/pkg/filecloser/filecloser.go b/pkg/filecloser/filecloser.go index 2e2386b..ee50fb6 100644 --- a/pkg/filecloser/filecloser.go +++ b/pkg/filecloser/filecloser.go @@ -7,18 +7,29 @@ import ( "oneway-filesync/pkg/structs" "os" "path/filepath" + "strings" "github.com/sirupsen/logrus" "gorm.io/gorm" ) -type FileCloser struct { +func normalizePath(path string) string { + newpath := strings.ReplaceAll(path, ":", "") + if strings.Contains(newpath, "\\") { + return filepath.Join(strings.Split(newpath, "\\")...) + } else { + + return filepath.Join(strings.Split(newpath, "/")...) + } +} + +type fileCloserConfig struct { db *gorm.DB outdir string input chan *structs.OpenTempFile } -func Worker(ctx context.Context, conf *FileCloser) { +func worker(ctx context.Context, conf *fileCloserConfig) { for { select { case <-ctx.Done(): @@ -42,9 +53,9 @@ func Worker(ctx context.Context, conf *FileCloser) { conf.db.Save(&dbentry) continue } - defer f.Close() hash, err := structs.HashFile(f) + err2 := f.Close() if err != nil { logrus.WithFields(logrus.Fields{ "TempFile": file.TempFile, @@ -66,8 +77,15 @@ func Worker(ctx context.Context, conf *FileCloser) { conf.db.Save(&dbentry) continue } + if err2 != nil { + logrus.WithFields(logrus.Fields{ + "TempFile": file.TempFile, + "Path": file.Path, + "Hash": fmt.Sprintf("%x", file.Hash), + }).Errorf("Error ckisubg tempfile: %v", err) + } - newpath := filepath.Join(conf.outdir, file.Path) + newpath := filepath.Join(conf.outdir, normalizePath(file.Path)) err = os.MkdirAll(filepath.Dir(newpath), os.ModePerm) if err != nil { logrus.WithFields(logrus.Fields{ @@ -106,12 +124,12 @@ func Worker(ctx context.Context, conf *FileCloser) { } func CreateFileCloser(ctx context.Context, db *gorm.DB, outdir string, input chan *structs.OpenTempFile, workercount int) { - conf := FileCloser{ + conf := fileCloserConfig{ db: db, outdir: outdir, input: input, } for i := 0; i < workercount; i++ { - go Worker(ctx, &conf) + go worker(ctx, &conf) } } diff --git a/pkg/filereader/filereader.go b/pkg/filereader/filereader.go index 50455dd..b576265 100644 --- a/pkg/filereader/filereader.go +++ b/pkg/filereader/filereader.go @@ -13,7 +13,7 @@ import ( "gorm.io/gorm" ) -type FileReader struct { +type fileReaderConfig struct { db *gorm.DB chunksize int required int @@ -21,7 +21,7 @@ type FileReader struct { output chan *structs.Chunk } -func Worker(ctx context.Context, conf *FileReader) { +func worker(ctx context.Context, conf *fileReaderConfig) { for { select { case <-ctx.Done(): @@ -91,7 +91,7 @@ func Worker(ctx context.Context, conf *FileReader) { } func CreateFileReader(ctx context.Context, db *gorm.DB, chunksize int, required int, input chan database.File, output chan *structs.Chunk, workercount int) { - conf := FileReader{ + conf := fileReaderConfig{ db: db, chunksize: chunksize, required: required, @@ -99,6 +99,6 @@ func CreateFileReader(ctx context.Context, db *gorm.DB, chunksize int, required output: output, } for i := 0; i < workercount; i++ { - go Worker(ctx, &conf) + go worker(ctx, &conf) } } diff --git a/pkg/filewriter/filewriter.go b/pkg/filewriter/filewriter.go index f33d654..6b0f3b5 100644 --- a/pkg/filewriter/filewriter.go +++ b/pkg/filewriter/filewriter.go @@ -13,7 +13,15 @@ import ( "github.com/sirupsen/logrus" ) -type FileWriter struct { +func pathReplace(path string) string { + newpath := path + newpath = strings.ReplaceAll(newpath, "/", "_") + newpath = strings.ReplaceAll(newpath, "\\", "_") + newpath = strings.ReplaceAll(newpath, ":", "_") + return newpath +} + +type fileWriterConfig struct { tempdir string input chan *structs.Chunk output chan *structs.OpenTempFile @@ -24,7 +32,7 @@ type FileWriter struct { // Since we can never really be sure all the chunks arrive // But 30 seconds after no more chunks arrive we can be rather certain // no more chunks will arrive -func Manager(ctx context.Context, conf *FileWriter) { +func manager(ctx context.Context, conf *fileWriterConfig) { ticker := time.NewTicker(15 * time.Second) for { select { @@ -42,18 +50,14 @@ func Manager(ctx context.Context, conf *FileWriter) { } } -func Worker(ctx context.Context, conf *FileWriter) { +func worker(ctx context.Context, conf *fileWriterConfig) { for { select { case <-ctx.Done(): return case chunk := <-conf.input: - tempfilepath := filepath.Join(conf.tempdir, fmt.Sprintf("%s___%x.tmp", strings.ReplaceAll(chunk.Path, "/", "_"), chunk.Hash)) - tempfile, err := os.OpenFile( - tempfilepath, - os.O_RDWR|os.O_CREATE, - 0600, - } + tempfilepath := filepath.Join(conf.tempdir, fmt.Sprintf("%s___%x.tmp", pathReplace(chunk.Path), chunk.Hash)) + tempfile, err := os.OpenFile(tempfilepath, os.O_RDWR|os.O_CREATE, 0600) if err != nil { logrus.WithFields(logrus.Fields{ "TempFile": tempfilepath, @@ -64,6 +68,7 @@ func Worker(ctx context.Context, conf *FileWriter) { } _, err = tempfile.WriteAt(chunk.Data, chunk.DataOffset) + err2 := tempfile.Close() // Not using defer because of overhead concerns if err != nil { logrus.WithFields(logrus.Fields{ "TempFile": tempfilepath, @@ -73,6 +78,14 @@ func Worker(ctx context.Context, conf *FileWriter) { continue } + if err2 != nil { + logrus.WithFields(logrus.Fields{ + "TempFile": tempfilepath, + "Path": chunk.Path, + "Hash": fmt.Sprintf("%x", chunk.Hash), + }).Errorf("Error closing tempfile: %v", err) + } + conf.cache.Store(tempfilepath, &structs.OpenTempFile{ TempFile: tempfilepath, Path: chunk.Path, @@ -84,14 +97,14 @@ func Worker(ctx context.Context, conf *FileWriter) { } func CreateFileWriter(ctx context.Context, tempdir string, input chan *structs.Chunk, output chan *structs.OpenTempFile, workercount int) { - conf := FileWriter{ + conf := fileWriterConfig{ tempdir: tempdir, input: input, output: output, cache: utils.RWMutexMap[string, *structs.OpenTempFile]{}, } for i := 0; i < workercount; i++ { - go Worker(ctx, &conf) + go worker(ctx, &conf) } - go Manager(ctx, &conf) + go manager(ctx, &conf) } diff --git a/pkg/queuereader/queuereader.go b/pkg/queuereader/queuereader.go index 954547a..f38f588 100644 --- a/pkg/queuereader/queuereader.go +++ b/pkg/queuereader/queuereader.go @@ -11,12 +11,12 @@ import ( "gorm.io/gorm" ) -type QueueReader struct { +type queueReaderConfig struct { db *gorm.DB output chan database.File } -func Worker(ctx context.Context, conf *QueueReader) { +func worker(ctx context.Context, conf *queueReaderConfig) { ticker := time.NewTicker(300 * time.Millisecond) for { select { @@ -42,9 +42,9 @@ func Worker(ctx context.Context, conf *QueueReader) { } func CreateQueueReader(ctx context.Context, db *gorm.DB, output chan database.File) { - conf := QueueReader{ + conf := queueReaderConfig{ db: db, output: output, } - go Worker(ctx, &conf) + go worker(ctx, &conf) } diff --git a/pkg/receiver/receiver.go b/pkg/receiver/receiver.go index 6383dd4..a9bbba8 100644 --- a/pkg/receiver/receiver.go +++ b/pkg/receiver/receiver.go @@ -22,7 +22,7 @@ func Receiver(ctx context.Context, db *gorm.DB, conf config.Config) { tmpdir := filepath.Join(conf.OutDir, "tempfiles") err := os.MkdirAll(tmpdir, os.ModePerm) if err != nil { - logrus.Errorf("Failed creating tempdir with err %v\n", err) + logrus.Errorf("Failed creating tempdir with err %v", err) return } diff --git a/pkg/sender/sender.go b/pkg/sender/sender.go index 8373f78..beb5b5b 100644 --- a/pkg/sender/sender.go +++ b/pkg/sender/sender.go @@ -26,5 +26,5 @@ func Sender(ctx context.Context, db *gorm.DB, conf config.Config) { filereader.CreateFileReader(ctx, db, conf.ChunkSize, conf.ChunkFecRequired, queue_chan, chunks_chan, maxprocs) fecencoder.CreateFecEncoder(ctx, conf.ChunkSize, conf.ChunkFecRequired, conf.ChunkFecTotal, chunks_chan, shares_chan, maxprocs) bandwidthlimiter.CreateBandwidthLimiter(ctx, conf.BandwidthLimit/conf.ChunkSize, shares_chan, bw_limited_chunks) - udpsender.CreateSender(ctx, conf.ReceiverIP, conf.ReceiverPort, bw_limited_chunks, maxprocs) + udpsender.CreateUdpSender(ctx, conf.ReceiverIP, conf.ReceiverPort, bw_limited_chunks, maxprocs) } diff --git a/pkg/shareassembler/shareassembler.go b/pkg/shareassembler/shareassembler.go index 5c79f83..3328f9a 100644 --- a/pkg/shareassembler/shareassembler.go +++ b/pkg/shareassembler/shareassembler.go @@ -14,36 +14,37 @@ import ( // Since we need at least shares to create the original data we have to cache them somewhere // After we get shares we can pull them and create the data but then up to (-) will continue coming in // The LastUpdated is a field which we can time out based upon and -type CacheKey struct { - Hash [structs.HASHSIZE]byte - DataOffset int64 +type cacheKey struct { + hash [structs.HASHSIZE]byte + dataOffset int64 } -type CacheValue struct { - Shares chan *structs.Chunk - LastUpdated atomic.Int64 - Lock sync.Mutex +type cacheValue struct { + shares chan *structs.Chunk + lastUpdated atomic.Int64 + lock sync.Mutex } -type ShareAssembler struct { +type shareAssemblerConfig struct { required int total int input chan *structs.Chunk output chan []*structs.Chunk - cache utils.RWMutexMap[CacheKey, *CacheValue] + cache utils.RWMutexMap[cacheKey, *cacheValue] } // The manager acts as a "Garbage collector" // every chunk that didn't get any new shares for the past 10 seconds can be // assumed to never again receive more shares and deleted -func Manager(ctx context.Context, conf *ShareAssembler) { +func manager(ctx context.Context, conf *shareAssemblerConfig) { ticker := time.NewTicker(5 * time.Second) for { select { case <-ctx.Done(): return case <-ticker.C: - conf.cache.Range(func(key CacheKey, value *CacheValue) bool { - if (time.Now().Unix() - value.LastUpdated.Load()) > 10 { + conf.cache.Range(func(key cacheKey, value *cacheValue) bool { + lastUpdated := value.lastUpdated.Load() + if lastUpdated != 0 && (time.Now().Unix()-lastUpdated) > 10 { conf.cache.Delete(key) } return true @@ -52,31 +53,29 @@ func Manager(ctx context.Context, conf *ShareAssembler) { } } -func Worker(ctx context.Context, conf *ShareAssembler) { +func worker(ctx context.Context, conf *shareAssemblerConfig) { for { select { case <-ctx.Done(): return case chunk := <-conf.input: - timenow := atomic.Int64{} - timenow.Store(time.Now().Unix()) value, _ := conf.cache.LoadOrStore( - CacheKey{Hash: chunk.Hash, DataOffset: chunk.DataOffset}, - &CacheValue{Shares: make(chan *structs.Chunk, conf.total*2), LastUpdated: timenow}) - value.Shares <- chunk - value.LastUpdated.Store(timenow.Load()) + cacheKey{hash: chunk.Hash, dataOffset: chunk.DataOffset}, + &cacheValue{shares: make(chan *structs.Chunk, conf.total*2)}) + value.shares <- chunk + value.lastUpdated.Store(time.Now().Unix()) - aquired := value.Lock.TryLock() + aquired := value.lock.TryLock() if aquired { - if len(value.Shares) >= conf.required { + if len(value.shares) >= conf.required { var shares []*structs.Chunk for i := 0; i < conf.required; i++ { - shares = append(shares, <-value.Shares) + shares = append(shares, <-value.shares) } - value.Lock.Unlock() + value.lock.Unlock() conf.output <- shares } else { - value.Lock.Unlock() + value.lock.Unlock() } } } @@ -84,15 +83,15 @@ func Worker(ctx context.Context, conf *ShareAssembler) { } func CreateShareAssembler(ctx context.Context, required int, total int, input chan *structs.Chunk, output chan []*structs.Chunk, workercount int) { - conf := ShareAssembler{ + conf := shareAssemblerConfig{ required: required, total: total, input: input, output: output, - cache: utils.RWMutexMap[CacheKey, *CacheValue]{}, + cache: utils.RWMutexMap[cacheKey, *cacheValue]{}, } for i := 0; i < workercount; i++ { - go Worker(ctx, &conf) + go worker(ctx, &conf) } - go Manager(ctx, &conf) + go manager(ctx, &conf) } diff --git a/pkg/udpreceiver/udpreceiver.go b/pkg/udpreceiver/udpreceiver.go index 1a7c02d..e7af94f 100644 --- a/pkg/udpreceiver/udpreceiver.go +++ b/pkg/udpreceiver/udpreceiver.go @@ -5,45 +5,34 @@ import ( "errors" "net" "oneway-filesync/pkg/structs" + "oneway-filesync/pkg/utils" "runtime" - "syscall" "time" "github.com/sirupsen/logrus" - "golang.org/x/sys/unix" ) -type UdpReceiver struct { +type udpReceiverConfig struct { conn *net.UDPConn chunksize int output chan *structs.Chunk } -func Manager(ctx context.Context, conf *UdpReceiver) { - var FIONREAD uint = 0 - if runtime.GOOS == "linux" { - FIONREAD = 0x541B - } else if runtime.GOOS == "darwin" { - FIONREAD = 0x4004667f - } else { +func manager(ctx context.Context, conf *udpReceiverConfig) { + if runtime.GOOS != "linux" && runtime.GOOS != "darwin" { logrus.Infof("Buffers fill detection not supported on the current OS") return } + ticker := time.NewTicker(200 * time.Millisecond) rawconn, err := conf.conn.SyscallConn() if err != nil { logrus.Errorf("Error getting raw socket: %v", err) return } - var bufsize int - err2 := rawconn.Control(func(fd uintptr) { - bufsize, err = unix.GetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_RCVBUF) - }) - if err2 != nil { - logrus.Errorf("Error running Control for FIONREAD: %v", err2) - } + bufsize, err := utils.GetReadBuffer(rawconn) if err != nil { - logrus.Errorf("Error getting FIONREAD: %v", err) + logrus.Errorf("Error getting read buffer size: %v", err) } for { @@ -51,15 +40,9 @@ func Manager(ctx context.Context, conf *UdpReceiver) { case <-ctx.Done(): return case <-ticker.C: - var toread int = 0 - err2 := rawconn.Control(func(fd uintptr) { - toread, err = unix.IoctlGetInt(int(fd), FIONREAD) - }) - if err2 != nil { - logrus.Errorf("Error running Control for FIONREAD: %v", err2) - } + toread, err := utils.GetAvailableBytes(rawconn) if err != nil { - logrus.Errorf("Error getting FIONREAD: %v", err) + logrus.Errorf("Error getting available bytes on socket: %v", err) } if float64(toread)/float64(bufsize) > 0.8 { @@ -69,7 +52,7 @@ func Manager(ctx context.Context, conf *UdpReceiver) { } } -func Worker(ctx context.Context, conf *UdpReceiver) { +func worker(ctx context.Context, conf *udpReceiverConfig) { buf := make([]byte, conf.chunksize) for { @@ -113,13 +96,13 @@ func CreateUdpReceiver(ctx context.Context, ip string, port int, chunksize int, conn.Close() }() - conf := UdpReceiver{ + conf := udpReceiverConfig{ conn: conn, chunksize: chunksize, output: output, } for i := 0; i < workercount; i++ { - go Worker(ctx, &conf) + go worker(ctx, &conf) } - go Manager(ctx, &conf) + go manager(ctx, &conf) } diff --git a/pkg/udpsender/udpsender.go b/pkg/udpsender/udpsender.go index 388a940..c5be83c 100644 --- a/pkg/udpsender/udpsender.go +++ b/pkg/udpsender/udpsender.go @@ -9,13 +9,13 @@ import ( "github.com/sirupsen/logrus" ) -type UdpSender struct { +type udpSenderConfig struct { ip string port int input chan *structs.Chunk } -func Worker(ctx context.Context, conf *UdpSender) { +func worker(ctx context.Context, conf *udpSenderConfig) { conn, err := net.Dial("udp", fmt.Sprintf("%s:%d", conf.ip, conf.port)) if err != nil { logrus.Errorf("Error creating udp socket: %v", err) @@ -35,18 +35,25 @@ func Worker(ctx context.Context, conf *UdpSender) { }).Errorf("Error encoding share: %v", err) continue } - conn.Write(buf) + _, err = conn.Write(buf) + if err != nil { + logrus.WithFields(logrus.Fields{ + "Path": share.Path, + "Hash": fmt.Sprintf("%x", share.Hash), + }).Errorf("Error sending share: %v", err) + continue + } } } } -func CreateSender(ctx context.Context, ip string, port int, input chan *structs.Chunk, workercount int) { - conf := UdpSender{ +func CreateUdpSender(ctx context.Context, ip string, port int, input chan *structs.Chunk, workercount int) { + conf := udpSenderConfig{ ip: ip, port: port, input: input, } for i := 0; i < workercount; i++ { - go Worker(ctx, &conf) + go worker(ctx, &conf) } } diff --git a/pkg/utils/unix_ioctl.go b/pkg/utils/unix_ioctl.go new file mode 100644 index 0000000..54d5e8d --- /dev/null +++ b/pkg/utils/unix_ioctl.go @@ -0,0 +1,50 @@ +//go:build linux || darwin + +package utils + +import ( + "errors" + "runtime" + "syscall" + + "golang.org/x/sys/unix" +) + +func GetReadBuffer(rawconn syscall.RawConn) (int, error) { + var err error + var bufsize int + err2 := rawconn.Control(func(fd uintptr) { + bufsize, err = unix.GetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_RCVBUF) + }) + if err2 != nil { + return 0, err2 + } + if err != nil { + return 0, err + } + return bufsize, nil +} + +func GetAvailableBytes(rawconn syscall.RawConn) (int, error) { + var FIONREAD uint = 0 + if runtime.GOOS == "linux" { + FIONREAD = 0x541B + } else if runtime.GOOS == "darwin" { + FIONREAD = 0x4004667f + } else { + return 0, errors.New("unsupported OS") + } + var err error + var avail int + err2 := rawconn.Control(func(fd uintptr) { + avail, err = unix.IoctlGetInt(int(fd), FIONREAD) + }) + if err2 != nil { + return 0, err2 + } + if err != nil { + return 0, err + } + return avail, nil + +} diff --git a/pkg/utils/windows_ioctl.go b/pkg/utils/windows_ioctl.go new file mode 100644 index 0000000..e643e4e --- /dev/null +++ b/pkg/utils/windows_ioctl.go @@ -0,0 +1,16 @@ +//go:build windows + +package utils + +import ( + "errors" + "syscall" +) + +func GetReadBuffer(rawconn syscall.RawConn) (int, error) { + return 0, errors.New("unsupported OS") +} + +func GetAvailableBytes(rawconn syscall.RawConn) (int, error) { + return 0, errors.New("unsupported OS") +} diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go new file mode 100644 index 0000000..ef23174 --- /dev/null +++ b/pkg/watcher/watcher.go @@ -0,0 +1,58 @@ +package watcher + +import ( + "context" + "oneway-filesync/pkg/database" + "path/filepath" + "time" + + "github.com/rjeczalik/notify" + "github.com/sirupsen/logrus" + "gorm.io/gorm" +) + +type watcherConfig struct { + db *gorm.DB + input chan notify.EventInfo + cache map[string]time.Time +} + +// To save up on resources we only send files that haven't changed for the past 60 seconds +// otherwise many consecutive small changes will cause a large overhead on the sender/receiver +func worker(ctx context.Context, conf *watcherConfig) { + ticker := time.NewTicker(15 * time.Second) + for { + select { + case <-ctx.Done(): + notify.Stop(conf.input) + return + case ei := <-conf.input: + conf.cache[ei.Path()] = time.Now() + case <-ticker.C: + for path, lastupdated := range conf.cache { + if time.Since(lastupdated).Seconds() > 60 { + delete(conf.cache, path) + err := database.QueueFileForSending(conf.db, path) + if err != nil { + logrus.Errorf("%v", err) + } else { + logrus.Infof("File '%s' queued for sending", path) + } + } + } + } + } +} + +func CreateWatcher(ctx context.Context, db *gorm.DB, watchdir string, input chan notify.EventInfo) { + if err := notify.Watch(filepath.Join(watchdir, "..."), input, notify.Write, notify.Create); err != nil { + logrus.Errorf("%v", err) + return + } + conf := watcherConfig{ + db: db, + input: input, + cache: make(map[string]time.Time), + } + go worker(ctx, &conf) +} diff --git a/tests/system_test.go b/tests/system_test.go index 08307ff..c10a833 100644 --- a/tests/system_test.go +++ b/tests/system_test.go @@ -59,6 +59,85 @@ func getDiff(t *testing.T, path1 string, path2 string) int { } return diff } + +func waitForFinishedFile(t *testing.T, db *gorm.DB, path string, timeout time.Duration, outdir string) { + start := time.Now() + ticker := time.NewTicker(1 * time.Second) + for { + <-ticker.C + if time.Since(start) > timeout { + t.Fatalf("File '%s' did not transfer in time", path) + } + var file database.File + err := db.Where("Path = ?", path).First(&file).Error + if err != nil { + continue + } + if !file.Finished || !file.Success { + tmpfilepath := filepath.Join(outdir, "tempfiles", fmt.Sprintf("%s___%x.tmp", strings.ReplaceAll(file.Path, "/", "_"), file.Hash)) + diff := getDiff(t, path, tmpfilepath) + t.Fatalf("File '%s' transferred but not successfully %d different bytes", path, diff) + } else { + return + } + } +} + +func tempFile(t *testing.T, size int) string { + file, err := os.CreateTemp("", "") + if err != nil { + log.Fatal(err) + } + defer file.Close() + _, err = io.CopyN(file, rand.Reader, int64(size)) + if err != nil { + log.Fatal(err) + } + return file.Name() +} + +func setupTest(t *testing.T, conf config.Config) (*gorm.DB, *gorm.DB, func()) { + senderdb, err := database.OpenDatabase("t_s_") + if err != nil { + t.Fatalf("Failed setting up db with err: %v\n", err) + } + if err := database.ConfigureDatabase(senderdb); err != nil { + t.Fatalf("Failed setting up db with err: %v\n", err) + } + + receiverdb, err := database.OpenDatabase("t_r_") + if err != nil { + t.Fatalf("Failed setting up db with err: %v\n", err) + } + if err := database.ConfigureDatabase(receiverdb); err != nil { + t.Fatalf("Failed setting up db with err: %v\n", err) + } + + if err := os.MkdirAll(conf.OutDir, os.ModePerm); err != nil { + t.Fatalf("Failed creating outdir with err: %v\n", err) + } + + ctx, cancel := context.WithCancel(context.Background()) // Create a cancelable context and pass it to all goroutines, allows us to gracefully shut down the program + receiver.Receiver(ctx, receiverdb, conf) + sender.Sender(ctx, senderdb, conf) + + return senderdb, receiverdb, func() { + cancel() + if err := os.RemoveAll(conf.OutDir); err != nil { + t.Log(err) + } + if err := database.ClearDatabase(receiverdb); err != nil { + t.Log(err) + } + if err := database.ClearDatabase(senderdb); err != nil { + t.Log(err) + } + if err := os.Remove(database.DBFILE); err != nil { + t.Log(err) + } + } +} + func TestSetup(t *testing.T) { _, _, teardowntest := setupTest(t, config.Config{ ReceiverIP: "127.0.0.1", @@ -88,7 +167,10 @@ func TestSmallFile(t *testing.T) { testfile := tempFile(t, 500) defer os.Remove(testfile) - database.QueueFileForSending(senderdb, testfile) + err := database.QueueFileForSending(senderdb, testfile) + if err != nil { + t.Fatal(err) + } waitForFinishedFile(t, receiverdb, testfile, time.Minute, conf.OutDir) } @@ -108,7 +190,10 @@ func TestLargeFile(t *testing.T) { testfile := tempFile(t, 50*1024*1024) defer os.Remove(testfile) - database.QueueFileForSending(senderdb, testfile) + err := database.QueueFileForSending(senderdb, testfile) + if err != nil { + t.Fatal(err) + } waitForFinishedFile(t, receiverdb, testfile, time.Minute*2, conf.OutDir) } @@ -116,7 +201,7 @@ func TestVeryLargeFile(t *testing.T) { conf := config.Config{ ReceiverIP: "127.0.0.1", ReceiverPort: 5000, - BandwidthLimit: 25 * 1024 * 1024, + BandwidthLimit: 10 * 1024 * 1024, ChunkSize: 8192, ChunkFecRequired: 5, ChunkFecTotal: 10, @@ -128,76 +213,9 @@ func TestVeryLargeFile(t *testing.T) { testfile := tempFile(t, 1*1024*1024*1024) defer os.Remove(testfile) - database.QueueFileForSending(senderdb, testfile) - waitForFinishedFile(t, receiverdb, testfile, time.Minute*20, conf.OutDir) -} - -func waitForFinishedFile(t *testing.T, db *gorm.DB, path string, timeout time.Duration, outdir string) { - start := time.Now() - ticker := time.NewTicker(1 * time.Second) - for { - <-ticker.C - if time.Since(start) > timeout { - t.Fatalf("File '%s' did not transfer in time", path) - } - var file database.File - err := db.Where("Path = ?", path).First(&file).Error - if err != nil { - continue - } - if !file.Finished || !file.Success { - tmpfilepath := filepath.Join(outdir, "tempfiles", fmt.Sprintf("%s___%x.tmp", strings.ReplaceAll(file.Path, "/", "_"), file.Hash)) - diff := getDiff(t, path, tmpfilepath) - t.Fatalf("File '%s' transferred but not successfully %d different bytes", path, diff) - } else { - return - } - } -} - -func tempFile(t *testing.T, size int) string { - file, err := os.CreateTemp("", "") - if err != nil { - log.Fatal(err) - } - defer file.Close() - _, err = io.CopyN(file, rand.Reader, int64(size)) - if err != nil { - log.Fatal(err) - } - return file.Name() -} - -func setupTest(t *testing.T, conf config.Config) (*gorm.DB, *gorm.DB, func()) { - senderdb, err := database.OpenDatabase("t_s_") - if err != nil { - t.Fatalf("Failed setting up db with err: %v\n", err) - } - if err := database.ConfigureDatabase(senderdb); err != nil { - t.Fatalf("Failed setting up db with err: %v\n", err) - } - - receiverdb, err := database.OpenDatabase("t_r_") + err := database.QueueFileForSending(senderdb, testfile) if err != nil { - t.Fatalf("Failed setting up db with err: %v\n", err) - } - if err := database.ConfigureDatabase(receiverdb); err != nil { - t.Fatalf("Failed setting up db with err: %v\n", err) - } - - if err := os.MkdirAll(conf.OutDir, os.ModePerm); err != nil { - t.Fatalf("Failed creating outdir with err: %v\n", err) - } - - ctx, cancel := context.WithCancel(context.Background()) // Create a cancelable context and pass it to all goroutines, allows us to gracefully shut down the program - receiver.Receiver(ctx, receiverdb, conf) - sender.Sender(ctx, senderdb, conf) - - return senderdb, receiverdb, func() { - cancel() - os.RemoveAll(conf.OutDir) - database.ClearDatabase(receiverdb) - database.ClearDatabase(senderdb) - os.Remove(database.DBFILE) + t.Fatal(err) } + waitForFinishedFile(t, receiverdb, testfile, time.Minute*20, conf.OutDir) }