diff --git a/pkg/database/database_test.go b/pkg/database/database_test.go new file mode 100644 index 0000000..f5d7344 --- /dev/null +++ b/pkg/database/database_test.go @@ -0,0 +1,103 @@ +package database + +import ( + "os" + "testing" + + "gorm.io/driver/sqlite" + "gorm.io/gorm" + gormlogger "gorm.io/gorm/logger" +) + +func TestOpenDatabase(t *testing.T) { + type args struct { + tableprefix string + } + tests := []struct { + name string + args args + wantErr bool + }{ + {"test-works", args{"t_"}, false}, + {"test-nullbyte", args{"t_\x00"}, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := OpenDatabase(tt.args.tableprefix) + if (err != nil) != tt.wantErr { + t.Errorf("OpenDatabase() error = %v, wantErr %v", err, tt.wantErr) + return + } + _ = os.Remove(DBFILE) + }) + } +} + +func TestClearDatabase(t *testing.T) { + db1, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{Logger: gormlogger.Discard}) + if err != nil { + t.Fatal(err) + } + if err = configureDatabase(db1); err != nil { + t.Fatal(err) + } + db2, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{Logger: gormlogger.Discard}) + if err != nil { + t.Fatal(err) + } + type args struct { + db *gorm.DB + } + tests := []struct { + name string + args args + wantErr bool + }{ + {"test-working", args{db1}, false}, + {"test-test-no-such-table", args{db2}, true}, + // {"test-test-null-db", args{&gorm.DB{}}, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := ClearDatabase(tt.args.db); (err != nil) != tt.wantErr { + t.Errorf("ClearDatabase() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestQueueFileForSending(t *testing.T) { + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{Logger: gormlogger.Discard}) + if err != nil { + t.Fatal(err) + } + if err = configureDatabase(db); err != nil { + t.Fatal(err) + } + type args struct { + db *gorm.DB + path string + encrypted bool + } + tests := []struct { + name string + args args + wantErr bool + }{ + {"test-works", args{db, "a", false}, false}, + {"test-no-such-file", args{db, "a", false}, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.name != "test-no-such-file" && tt.name != "test-patherror" { + if err := os.WriteFile(tt.args.path, make([]byte, 4), os.ModePerm); err != nil { + t.Fatal(err) + } + } + defer os.Remove(tt.args.path) + if err := QueueFileForSending(tt.args.db, tt.args.path, tt.args.encrypted); (err != nil) != tt.wantErr { + t.Errorf("QueueFileForSending() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/pkg/fecdecoder/fecdecoder.go b/pkg/fecdecoder/fecdecoder.go index cc90cd4..7742ebf 100644 --- a/pkg/fecdecoder/fecdecoder.go +++ b/pkg/fecdecoder/fecdecoder.go @@ -19,7 +19,8 @@ type fecDecoderConfig struct { func worker(ctx context.Context, conf *fecDecoderConfig) { fec, err := reedsolomon.New(conf.required, conf.total-conf.required) if err != nil { - logrus.Fatalf("Error creating fec object: %v", err) + logrus.Errorf("Error creating fec object: %v", err) + return } for { select { diff --git a/pkg/fecencoder/fecencoder.go b/pkg/fecencoder/fecencoder.go index e5b7dc9..8609afb 100644 --- a/pkg/fecencoder/fecencoder.go +++ b/pkg/fecencoder/fecencoder.go @@ -25,33 +25,33 @@ type fecEncoderConfig struct { func worker(ctx context.Context, conf *fecEncoderConfig) { fec, err := reedsolomon.New(conf.required, conf.total-conf.required) if err != nil { - logrus.Fatalf("Error creating fec object: %v", err) + logrus.Errorf("Error creating fec object: %v", err) + return } for { select { case <-ctx.Done(): return case chunk := <-conf.input: + l := logrus.WithFields(logrus.Fields{ + "Path": chunk.Path, + "Hash": fmt.Sprintf("%x", chunk.Hash), + }) + padding := (conf.required - (len(chunk.Data) % conf.required)) % conf.required chunk.Data = append(chunk.Data, make([]byte, padding)...) // Split the data into shares shares, err := fec.Split(chunk.Data) if err != nil { - logrus.WithFields(logrus.Fields{ - "Path": chunk.Path, - "Hash": fmt.Sprintf("%x", chunk.Hash), - }).Errorf("Error splitting chunk: %v", err) + l.Errorf("Error splitting chunk: %v", err) continue } // Encode the parity set err = fec.Encode(shares) if err != nil { - logrus.WithFields(logrus.Fields{ - "Path": chunk.Path, - "Hash": fmt.Sprintf("%x", chunk.Hash), - }).Errorf("Error FEC encoding chunk: %v", err) + l.Errorf("Error FEC encoding chunk: %v", err) continue } diff --git a/pkg/filecloser/filecloser.go b/pkg/filecloser/filecloser.go index 970ef86..6ea1650 100644 --- a/pkg/filecloser/filecloser.go +++ b/pkg/filecloser/filecloser.go @@ -2,7 +2,6 @@ package filecloser import ( "context" - "errors" "fmt" "oneway-filesync/pkg/database" "oneway-filesync/pkg/structs" @@ -24,31 +23,19 @@ func normalizePath(path string) string { } } func closeFile(file *structs.OpenTempFile, outdir string) error { - l := logrus.WithFields(logrus.Fields{ - "TempFile": file.TempFile, - "Path": file.Path, - "Hash": fmt.Sprintf("%x", file.Hash), - }) - f, err := os.Open(file.TempFile) if err != nil { - l.Errorf("Error opening tempfile: %v", err) - return err + return fmt.Errorf("error opening tempfile: %v", err) } hash, err := structs.HashFile(f, false) - err2 := f.Close() + _ = f.Close() // Ignoring error on purpose if err != nil { - l.Errorf("Error hashing tempfile: %v", err) - return err - } - if err2 != nil { - l.Errorf("Error closing tempfile: %v", err2) - // Not returning error on purpose + return fmt.Errorf("error hashing tempfile: %v", err) } + if hash != file.Hash { - l.WithField("TempFileHash", fmt.Sprintf("%x", hash)).Errorf("Hash mismatch") - return errors.New("hash mismatch") + return fmt.Errorf("hash mismatch '%v'!='%v'", fmt.Sprintf("%x", hash), fmt.Sprintf("%x", file.Hash)) } newpath := filepath.Join(outdir, normalizePath(file.Path)) @@ -57,17 +44,14 @@ func closeFile(file *structs.OpenTempFile, outdir string) error { } err = os.MkdirAll(filepath.Dir(newpath), os.ModePerm) if err != nil { - l.Errorf("Failed creating directory path: %v", err) - return err + return fmt.Errorf("failed creating directory path: %v", err) } err = os.Rename(file.TempFile, newpath) if err != nil { - l.Errorf("Failed moving tempfile to new location: %v", err) - return err + return fmt.Errorf("failed moving tempfile to new location: %v", err) } - l.WithField("NewPath", newpath).Infof("Successfully finished writing file") return nil } @@ -83,6 +67,11 @@ func worker(ctx context.Context, conf *fileCloserConfig) { case <-ctx.Done(): return case file := <-conf.input: + l := logrus.WithFields(logrus.Fields{ + "TempFile": file.TempFile, + "Path": file.Path, + "Hash": fmt.Sprintf("%x", file.Hash), + }) dbentry := database.File{ Path: file.Path, Hash: file.Hash[:], @@ -94,15 +83,13 @@ func worker(ctx context.Context, conf *fileCloserConfig) { err := closeFile(file, conf.outdir) if err != nil { dbentry.Success = false + l.Error(err) } else { dbentry.Success = true + l.Infof("Successfully finished writing file") } if err := conf.db.Save(&dbentry).Error; err != nil { - logrus.WithFields(logrus.Fields{ - "TempFile": file.TempFile, - "Path": file.Path, - "Hash": fmt.Sprintf("%x", file.Hash), - }).Errorf("Failed committing to db: %v", err) + l.Errorf("Failed committing to db: %v", err) } } } diff --git a/pkg/filecloser/filecloser_test.go b/pkg/filecloser/filecloser_test.go new file mode 100644 index 0000000..b2d9493 --- /dev/null +++ b/pkg/filecloser/filecloser_test.go @@ -0,0 +1,129 @@ +package filecloser + +import ( + "bytes" + "context" + "oneway-filesync/pkg/structs" + "os" + "runtime" + "strings" + "testing" + "time" + + "github.com/sirupsen/logrus" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + gormlogger "gorm.io/gorm/logger" +) + +func Test_normalizePath(t *testing.T) { + tests := []struct { + name string + path string + want string + }{ + {"test1", "/tmp/out/check", "tmp/out/check"}, + {"test2", "c:\\tmp\\out\\check", "c/tmp/out/check"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if runtime.GOOS == "windows" { + tt.want = strings.ReplaceAll(tt.want, "/", "\\") + if got := normalizePath(tt.path); got != tt.want { + t.Errorf("normalizePath() = %v, want %v", got, tt.want) + } + } else { + if got := normalizePath(tt.path); got != tt.want { + t.Errorf("normalizePath() = %v, want %v", got, tt.want) + } + } + }) + } +} + +func Test_closeFile(t *testing.T) { + data := []byte{1, 2, 3, 4} + hash := [32]byte{0x9f, 0x64, 0xa7, 0x47, 0xe1, 0xb9, 0x7f, 0x13, 0x1f, 0xab, 0xb6, 0xb4, 0x47, 0x29, 0x6c, 0x9b, 0x6f, 0x02, 0x01, 0xe7, 0x9f, 0xb3, 0xc5, 0x35, 0x6e, 0x6c, 0x77, 0xe8, 0x9b, 0x6a, 0x80, 0x6a} + wronghash := hash + wronghash[0] = 0 + + type args struct { + file *structs.OpenTempFile + outdir string + } + tests := []struct { + name string + args args + wantErr bool + }{ + {"test-works", args{&structs.OpenTempFile{TempFile: "a", Path: "b", Hash: hash, Encrypted: false, LastUpdated: time.Now()}, "out"}, false}, + {"test-hash-mismsatch", args{&structs.OpenTempFile{TempFile: "a", Path: "b", Hash: wronghash, Encrypted: false, LastUpdated: time.Now()}, "out"}, true}, + {"test-no-such-file", args{&structs.OpenTempFile{TempFile: "/tmp/adsasdasdsadas/adadsada/a", Path: "b", Hash: hash, Encrypted: false, LastUpdated: time.Now()}, "out"}, true}, + {"test-rename-fail", args{&structs.OpenTempFile{TempFile: "a", Path: "b\x00", Hash: hash, Encrypted: false, LastUpdated: time.Now()}, "out"}, true}, + {"test-mkdirall-fail", args{&structs.OpenTempFile{TempFile: "a", Path: "b", Hash: hash, Encrypted: false, LastUpdated: time.Now()}, "out\x00"}, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer os.RemoveAll(tt.args.outdir) + if tt.name != "test-no-such-file" { + if err := os.WriteFile(tt.args.file.TempFile, data, os.ModePerm); err != nil { + t.Fatal(err) + } + } + defer os.Remove(tt.args.file.TempFile) + + if err := closeFile(tt.args.file, tt.args.outdir); (err != nil) != tt.wantErr { + t.Errorf("closeFile() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_worker(t *testing.T) { + data := []byte{1, 2, 3, 4} + hash := [32]byte{0x9f, 0x64, 0xa7, 0x47, 0xe1, 0xb9, 0x7f, 0x13, 0x1f, 0xab, 0xb6, 0xb4, 0x47, 0x29, 0x6c, 0x9b, 0x6f, 0x02, 0x01, 0xe7, 0x9f, 0xb3, 0xc5, 0x35, 0x6e, 0x6c, 0x77, 0xe8, 0x9b, 0x6a, 0x80, 0x6a} + wronghash := hash + wronghash[0] = 0 + + type args struct { + file *structs.OpenTempFile + outdir string + } + tests := []struct { + name string + args args + expected string + }{ + {"test-dberror", args{&structs.OpenTempFile{TempFile: "a", Path: "b", Hash: hash, Encrypted: false, LastUpdated: time.Now()}, "out"}, "Failed committing to db"}, + {"test-hash-mismsatch", args{&structs.OpenTempFile{TempFile: "a", Path: "b", Hash: wronghash, Encrypted: false, LastUpdated: time.Now()}, "out"}, "hash mismatch"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var memLog bytes.Buffer + logrus.SetOutput(&memLog) + + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{Logger: gormlogger.Discard}) + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tt.args.outdir) + if err := os.WriteFile(tt.args.file.TempFile, data, os.ModePerm); err != nil { + t.Fatal(err) + } + defer os.Remove(tt.args.file.TempFile) + ch := make(chan *structs.OpenTempFile, 5) + conf := fileCloserConfig{db: db, outdir: tt.args.outdir, input: ch} + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(2 * time.Second) + cancel() + }() + ch <- tt.args.file + worker(ctx, &conf) + + if !strings.Contains(memLog.String(), tt.expected) { + t.Fatalf("Expected not in log, '%v' not in '%vs'", tt.expected, memLog.String()) + } + }) + } +} diff --git a/pkg/filereader/filereader.go b/pkg/filereader/filereader.go index 434a439..39e8548 100644 --- a/pkg/filereader/filereader.go +++ b/pkg/filereader/filereader.go @@ -21,42 +21,30 @@ type chunkWriter struct { sendchunk func(data []byte, offset int64) } -func (w *chunkWriter) dumpChunk() error { +func (w *chunkWriter) dumpChunk() { b := make([]byte, w.chunksize) - n, err := w.buf.Read(b) - if err != nil { - return err + n, _ := w.buf.Read(b) // err means EOF + if n > 0 { + w.sendchunk(b[:n], w.offset) + w.offset += int64(n) } - w.sendchunk(b[:n], w.offset) - w.offset += int64(n) - return nil } func (w *chunkWriter) Write(p []byte) (int, error) { - _, err := w.buf.Write(p) - if err != nil { - return 0, err - } + _, _ = w.buf.Write(p) // bytes.Buffer.Write never returns error if w.buf.Len() > w.chunksize { - err := w.dumpChunk() - if err != nil { - return 0, err - } + w.dumpChunk() } return len(p), nil } -func (w *chunkWriter) Close() error { +func (w *chunkWriter) Close() { for { if w.buf.Len() == 0 { break } - err := w.dumpChunk() - if err != nil { - return err - } + w.dumpChunk() } - return nil } func sendfile(file *database.File, conf *fileReaderConfig) error { @@ -92,10 +80,7 @@ func sendfile(file *database.File, conf *fileReaderConfig) error { return err } - err = w.Close() - if err != nil { - return err - } + w.Close() return nil } diff --git a/pkg/filereader/filereader_test.go b/pkg/filereader/filereader_test.go new file mode 100644 index 0000000..ca5cc02 --- /dev/null +++ b/pkg/filereader/filereader_test.go @@ -0,0 +1,148 @@ +package filereader + +import ( + "bytes" + "context" + "oneway-filesync/pkg/database" + "oneway-filesync/pkg/structs" + "os" + "strings" + "testing" + "time" + + "github.com/sirupsen/logrus" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + gormlogger "gorm.io/gorm/logger" +) + +func Test_sendfile(t *testing.T) { + data := make([]byte, 4*8192) + hash := []byte{0x9f, 0x64, 0xa7, 0x47, 0xe1, 0xb9, 0x7f, 0x13, 0x1f, 0xab, 0xb6, 0xb4, 0x47, 0x29, 0x6c, 0x9b, 0x6f, 0x02, 0x01, 0xe7, 0x9f, 0xb3, 0xc5, 0x35, 0x6e, 0x6c, 0x77, 0xe8, 0x9b, 0x6a, 0x80, 0x6a} + type args struct { + file *database.File + conf *fileReaderConfig + } + tests := []struct { + name string + args args + expected int + wantErr bool + }{ + {"test-regular", args{ + file: &database.File{Path: "a", Hash: hash, Encrypted: false}, + conf: &fileReaderConfig{chunksize: 8192, required: 2}, + }, 3, false}, + {"test-encrypted", args{ + file: &database.File{Path: "a", Hash: hash, Encrypted: true}, + conf: &fileReaderConfig{chunksize: 8192, required: 2}, + }, 1, false}, + {"test-no-such-file", args{ + file: &database.File{Path: "b", Hash: hash, Encrypted: false}, + conf: &fileReaderConfig{chunksize: 8192, required: 2}, + }, 4, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + out := make(chan *structs.Chunk, 5) + tt.args.conf.output = out + + if tt.name != "test-no-such-file" { + if err := os.WriteFile(tt.args.file.Path, data, os.ModePerm); err != nil { + t.Fatal(err) + } + } + defer os.Remove(tt.args.file.Path) + + if err := sendfile(tt.args.file, tt.args.conf); (err != nil) != tt.wantErr { + t.Fatalf("sendfile() error = %v, wantErr %v", err, tt.wantErr) + } + if !tt.wantErr { + if len(out) != tt.expected { + t.Fatalf("Got too many chunks %v!=%v", len(out), tt.expected) + } + } + }) + } +} + +func Test_worker(t *testing.T) { + data := []byte{1, 2, 3, 4} + hash := []byte{0x9f, 0x64, 0xa7, 0x47, 0xe1, 0xb9, 0x7f, 0x13, 0x1f, 0xab, 0xb6, 0xb4, 0x47, 0x29, 0x6c, 0x9b, 0x6f, 0x02, 0x01, 0xe7, 0x9f, 0xb3, 0xc5, 0x35, 0x6e, 0x6c, 0x77, 0xe8, 0x9b, 0x6a, 0x80, 0x6a} + type args struct { + file database.File + conf *fileReaderConfig + } + tests := []struct { + name string + args args + expected string + }{ + {"test-error-db", args{ + file: database.File{Path: "a", Hash: hash}, + conf: &fileReaderConfig{chunksize: 8192, required: 2}, + }, "Error updating Finished in database"}, + {"test-no-such-file", args{ + file: database.File{Path: "b", Hash: hash}, + conf: &fileReaderConfig{chunksize: 8192, required: 2}, + }, "File sending failed with err: error opening file:"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var memLog bytes.Buffer + logrus.SetOutput(&memLog) + + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{Logger: gormlogger.Discard}) + if err != nil { + t.Fatal(err) + } + tt.args.conf.db = db + in := make(chan database.File, 5) + tt.args.conf.input = in + out := make(chan *structs.Chunk, 5) + tt.args.conf.output = out + + if tt.name != "test-no-such-file" { + if err := os.WriteFile(tt.args.file.Path, data, os.ModePerm); err != nil { + t.Fatal(err) + } + } + defer os.Remove(tt.args.file.Path) + in <- tt.args.file + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(2 * time.Second) + cancel() + }() + worker(ctx, tt.args.conf) + + if !strings.Contains(memLog.String(), tt.expected) { + t.Fatalf("Expected not in log, '%v' not in '%vs'", tt.expected, memLog.String()) + } + }) + } +} + +func TestCreateFileReader(t *testing.T) { + type args struct { + ctx context.Context + db *gorm.DB + chunksize int + required int + input chan database.File + output chan *structs.Chunk + workercount int + } + tests := []struct { + name string + args args + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + CreateFileReader(tt.args.ctx, tt.args.db, tt.args.chunksize, tt.args.required, tt.args.input, tt.args.output, tt.args.workercount) + }) + } +} diff --git a/pkg/filewriter/filewriter.go b/pkg/filewriter/filewriter.go index 3cef65e..796becb 100644 --- a/pkg/filewriter/filewriter.go +++ b/pkg/filewriter/filewriter.go @@ -57,35 +57,24 @@ func worker(ctx context.Context, conf *fileWriterConfig) { return case chunk := <-conf.input: tempfilepath := filepath.Join(conf.tempdir, fmt.Sprintf("%s___%x.tmp", pathReplace(chunk.Path), chunk.Hash)) + l := logrus.WithFields(logrus.Fields{ + "TempFile": tempfilepath, + "Path": chunk.Path, + "Hash": fmt.Sprintf("%x", chunk.Hash), + }) tempfile, err := os.OpenFile(tempfilepath, os.O_RDWR|os.O_CREATE, 0600) if err != nil { - logrus.WithFields(logrus.Fields{ - "TempFile": tempfilepath, - "Path": chunk.Path, - "Hash": fmt.Sprintf("%x", chunk.Hash), - }).Errorf("Error creating tempfile for chunk: %v", err) + l.Errorf("Error creating tempfile for chunk: %v", err) continue } _, err = tempfile.WriteAt(chunk.Data, chunk.DataOffset) - err2 := tempfile.Close() // Not using defer because of overhead concerns + _ = tempfile.Close() // Not using defer because of overhead concerns, ignoring error on purpose if err != nil { - logrus.WithFields(logrus.Fields{ - "TempFile": tempfilepath, - "Path": chunk.Path, - "Hash": fmt.Sprintf("%x", chunk.Hash), - }).Errorf("Error writing to tempfile: %v", err) + l.Errorf("Error writing to tempfile: %v", err) continue } - if err2 != nil { - logrus.WithFields(logrus.Fields{ - "TempFile": tempfilepath, - "Path": chunk.Path, - "Hash": fmt.Sprintf("%x", chunk.Hash), - }).Errorf("Error closing tempfile: %v", err) - } - conf.cache.Store(tempfilepath, &structs.OpenTempFile{ TempFile: tempfilepath, Path: chunk.Path, diff --git a/pkg/udpreceiver/udpreceiver.go b/pkg/udpreceiver/udpreceiver.go index cfe991d..bc8600c 100644 --- a/pkg/udpreceiver/udpreceiver.go +++ b/pkg/udpreceiver/udpreceiver.go @@ -27,6 +27,7 @@ func manager(ctx context.Context, conf *udpReceiverConfig) { bufsize, err := socketbuffer.GetReadBuffer(rawconn) if err != nil { logrus.Errorf("Error getting read buffer size: %v", err) + return } for { @@ -37,6 +38,7 @@ func manager(ctx context.Context, conf *udpReceiverConfig) { toread, err := socketbuffer.GetAvailableBytes(rawconn) if err != nil { logrus.Errorf("Error getting available bytes on socket: %v", err) + continue } if float64(toread)/float64(bufsize) > 0.8 { @@ -62,7 +64,7 @@ func worker(ctx context.Context, conf *udpReceiverConfig) { continue } logrus.Errorf("Error reading from socket: %v", err) - continue + return } chunk, err := structs.DecodeChunk(buf[:n]) if err != nil { @@ -82,7 +84,8 @@ func CreateUdpReceiver(ctx context.Context, ip string, port int, chunksize int, conn, err := net.ListenUDP("udp", &addr) if err != nil { - logrus.Fatalf("Error creating udp socket: %v", err) + logrus.Errorf("Error creating udp socket: %v", err) + return } go func() { <-ctx.Done() diff --git a/pkg/udpreceiver/udpreceiver_test.go b/pkg/udpreceiver/udpreceiver_test.go new file mode 100644 index 0000000..2843511 --- /dev/null +++ b/pkg/udpreceiver/udpreceiver_test.go @@ -0,0 +1,261 @@ +package udpreceiver + +import ( + "bytes" + "context" + "crypto/rand" + "fmt" + "math/big" + "net" + "reflect" + "strings" + "testing" + "time" + + "oneway-filesync/pkg/structs" + + "github.com/sirupsen/logrus" +) + +func randint(max int64) int { + nBig, err := rand.Int(rand.Reader, big.NewInt(max)) + if err != nil { + panic(err) + } + return int(nBig.Int64()) +} + +func Test_manager(t *testing.T) { + ip := "127.0.0.1" + port := randint(30000) + 30000 + addr := net.UDPAddr{ + IP: net.ParseIP(ip), + Port: port, + } + + receiving_conn, err := net.ListenUDP("udp", &addr) + if err != nil { + t.Fatal(err) + } + defer receiving_conn.Close() + + sending_conn, err := net.Dial("udp", fmt.Sprintf("%s:%d", ip, port)) + if err != nil { + t.Fatal(err) + } + defer sending_conn.Close() + chunksize := 8192 + chunk := make([]byte, chunksize) + err = receiving_conn.SetReadBuffer(5 * chunksize) + if err != nil { + t.Fatal(err) + } + for i := 0; i < 5; i++ { + _, err := sending_conn.Write(chunk) + if err != nil { + t.Fatal(err) + } + } + + type args struct { + conf *udpReceiverConfig + } + tests := []struct { + name string + args args + expected string + }{ + {"test-invalid-socket", args{&udpReceiverConfig{&net.UDPConn{}, 8192, make(chan *structs.Chunk)}}, "Error getting raw socket"}, + {"test-buffers-full", args{&udpReceiverConfig{receiving_conn, 8192, make(chan *structs.Chunk)}}, "Buffers are filling up loss of data is probable"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var memLog bytes.Buffer + logrus.SetOutput(&memLog) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(2 * time.Second) + cancel() + }() + manager(ctx, tt.args.conf) + + if !strings.Contains(memLog.String(), tt.expected) { + t.Fatalf("Expected not in log, '%v' not in '%v'", tt.expected, memLog.String()) + } + }) + } +} + +func Test_worker_close_conn(t *testing.T) { + ip := "127.0.0.1" + port := randint(30000) + 30000 + addr := net.UDPAddr{ + IP: net.ParseIP(ip), + Port: port, + } + + receiving_conn, err := net.ListenUDP("udp", &addr) + if err != nil { + t.Fatal(err) + } + defer receiving_conn.Close() + + chunksize := 8192 + + output := make(chan *structs.Chunk, 5) + type args struct { + conf *udpReceiverConfig + } + tests := []struct { + name string + args args + }{ + {"test1", args{&udpReceiverConfig{receiving_conn, chunksize, output}}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(2 * time.Second) + tt.args.conf.conn.Close() + cancel() + }() + worker(ctx, tt.args.conf) + }) + } +} + +func Test_worker(t *testing.T) { + ip := "127.0.0.1" + port := randint(30000) + 30000 + addr := net.UDPAddr{ + IP: net.ParseIP(ip), + Port: port, + } + + receiving_conn, err := net.ListenUDP("udp", &addr) + if err != nil { + t.Fatal(err) + } + defer receiving_conn.Close() + + sending_conn, err := net.Dial("udp", fmt.Sprintf("%s:%d", ip, port)) + if err != nil { + t.Fatal(err) + } + defer sending_conn.Close() + + chunksize := 8192 + + output := make(chan *structs.Chunk, 5) + conf := &udpReceiverConfig{receiving_conn, chunksize, output} + chunk := structs.Chunk{Path: "a", Data: make([]byte, chunksize/2)} + + var memLog bytes.Buffer + logrus.SetOutput(&memLog) + + data, err := chunk.Encode() + if err != nil { + t.Fatal(err) + } + _, err = sending_conn.Write(data) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(2 * time.Second) + conf.conn.Close() + cancel() + }() + worker(ctx, conf) + + got := <-output + if !reflect.DeepEqual(*got, chunk) { + t.Fatalf("DecodeChunk() = %v, want %v", got, chunk) + } +} + +func Test_worker_error_invalid_socket(t *testing.T) { + chunksize := 8192 + output := make(chan *structs.Chunk, 5) + conf := &udpReceiverConfig{&net.UDPConn{}, chunksize, output} + + var memLog bytes.Buffer + logrus.SetOutput(&memLog) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(2 * time.Second) + conf.conn.Close() + cancel() + }() + worker(ctx, conf) + + if !strings.Contains(memLog.String(), "Error reading from socket") { + t.Fatalf("Expected not in log, '%v' not in '%v'", "Error reading from socket", memLog.String()) + } +} + +func Test_worker_error_decoding(t *testing.T) { + ip := "127.0.0.1" + port := randint(30000) + 30000 + addr := net.UDPAddr{ + IP: net.ParseIP(ip), + Port: port, + } + + receiving_conn, err := net.ListenUDP("udp", &addr) + if err != nil { + t.Fatal(err) + } + defer receiving_conn.Close() + + sending_conn, err := net.Dial("udp", fmt.Sprintf("%s:%d", ip, port)) + if err != nil { + t.Fatal(err) + } + defer sending_conn.Close() + + chunksize := 8192 + output := make(chan *structs.Chunk, 5) + conf := &udpReceiverConfig{receiving_conn, chunksize, output} + data := make([]byte, chunksize/2) + for i := range data { + data[i] = 0xff + } + var memLog bytes.Buffer + logrus.SetOutput(&memLog) + + _, err = sending_conn.Write(data) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(2 * time.Second) + conf.conn.Close() + cancel() + }() + worker(ctx, conf) + + if !strings.Contains(memLog.String(), "Error decoding chunk") { + t.Fatalf("Expected not in log, '%v' not in '%v'", "Error decoding chunk", memLog.String()) + } +} + +func TestCreateUdpReceiver(t *testing.T) { + // fail create socket test + var memLog bytes.Buffer + logrus.SetOutput(&memLog) + ctx, cancel := context.WithCancel(context.Background()) + CreateUdpReceiver(ctx, "127.0.0.1", 88888, 8192, make(chan *structs.Chunk), 1) + cancel() + if !strings.Contains(memLog.String(), "Error creating udp socket") { + t.Fatalf("Expected not in log, '%v' not in '%v'", "Error creating udp socket", memLog.String()) + } +} diff --git a/pkg/udpsender/udpsender.go b/pkg/udpsender/udpsender.go index 846c250..f26bc8e 100644 --- a/pkg/udpsender/udpsender.go +++ b/pkg/udpsender/udpsender.go @@ -18,7 +18,8 @@ type udpSenderConfig struct { func worker(ctx context.Context, conf *udpSenderConfig) { conn, err := net.Dial("udp", fmt.Sprintf("%s:%d", conf.ip, conf.port)) if err != nil { - logrus.Fatalf("Error creating udp socket: %v", err) + logrus.Errorf("Error creating udp socket: %v", err) + return } defer conn.Close() for { @@ -26,20 +27,18 @@ func worker(ctx context.Context, conf *udpSenderConfig) { case <-ctx.Done(): return case share := <-conf.input: + l := logrus.WithFields(logrus.Fields{ + "Path": share.Path, + "Hash": fmt.Sprintf("%x", share.Hash), + }) buf, err := share.Encode() if err != nil { - logrus.WithFields(logrus.Fields{ - "Path": share.Path, - "Hash": fmt.Sprintf("%x", share.Hash), - }).Errorf("Error encoding share: %v", err) + l.Errorf("Error encoding share: %v", err) continue } _, err = conn.Write(buf) if err != nil { - logrus.WithFields(logrus.Fields{ - "Path": share.Path, - "Hash": fmt.Sprintf("%x", share.Hash), - }).Errorf("Error sending share: %v", err) + l.Errorf("Error sending share: %v", err) continue } } diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go index 15817f3..1e858da 100644 --- a/pkg/utils/utils_test.go +++ b/pkg/utils/utils_test.go @@ -2,7 +2,11 @@ package utils import ( "os" + "runtime" + "strings" "testing" + + "github.com/sirupsen/logrus" ) func Test_formatFilePath(t *testing.T) { @@ -31,31 +35,100 @@ func TestInitializeLogging(t *testing.T) { logFile string } tests := []struct { - name string - args args + name string + args args + wantErr bool }{ - {"test1", args{"logfile.txt"}}, + {"test1", args{"logfile.txt"}, false}, + {"test-invalid-path", args{"/tmp/asuhdaiusfa/asdada/logfile.txt"}, true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { InitializeLogging(tt.args.logFile) - if _, err := os.Stat(tt.args.logFile); os.IsExist(err) { + if tt.wantErr { + return + } + defer os.Remove(tt.args.logFile) + + logrus.Info("Test") + data, err := os.ReadFile(tt.args.logFile) + if err != nil { t.Fatal("logfile did not create") } - os.Remove(tt.args.logFile) + if !(strings.Contains(string(data), "Test") && strings.Contains(string(data), "utils_test.go")) { + t.Fatal("logging failed") + } }) } } +func TestMap(t *testing.T) { + m := RWMutexMap[int, string]{} + m.Store(1, "a") + if v, ok := m.Load(1); !ok || v != "a" { + t.Fatal("Store then load failed") + } + + if actual, loaded := m.LoadOrStore(1, "b"); !loaded || actual != "a" { + t.Fatal("LoadOrStore-Load failed") + } + if actual, loaded := m.LoadOrStore(2, "b"); loaded || actual != "b" { + t.Fatal("LoadOrStore-Store failed") + } + + if value, loaded := m.LoadAndDelete(2); !loaded || value != "b" { + t.Fatal("LoadAndDelete-Exists failed") + } + if _, loaded := m.LoadAndDelete(2); loaded { + t.Fatal("LoadAndDelete-NotExists failed") + } + + m.Delete(1) + m.Store(1, "a") + m.Store(2, "b") + m2 := map[int]string{} + m2[1] = "a" + m2[2] = "b" + m.Range(func(key int, value string) bool { + if m2[key] != value { + t.Fatal("mismating value in range") + } + if key == 1 { + return true + } else { + return false + } + }) + + if m.Len() != 2 { + t.Fatal("Len failed") + } + + m.Store(1, "a") + m.Store(2, "b") + m.Range(func(key int, value string) bool { + m.Delete(1) + m.Delete(2) + return true + }) + +} + // Removed CtrlC test due to: https://github.com/golang/go/issues/46354 -// func TestCtrlC(t *testing.T) { -// ch := CtrlC() -// err := sendCtrlC(os.Getpid()) -// if err != nil { -// t.Fatal(err) -// } -// _, ok := <-ch -// if !ok { -// t.Fatal("Ctrl c not caught") -// } -// } +func TestCtrlC(t *testing.T) { + if runtime.GOOS == "linux" || runtime.GOOS == "darwin" { + ch := CtrlC() + p, err := os.FindProcess(os.Getpid()) + if err != nil { + t.Fatal(err) + } + err = p.Signal(os.Interrupt) + if err != nil { + t.Fatal(err) + } + _, ok := <-ch + if !ok { + t.Fatal("Ctrl c not caught") + } + } +} diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index ef3a048..d0baf4e 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -62,7 +62,8 @@ func worker(ctx context.Context, conf *watcherConfig) { func CreateWatcher(ctx context.Context, db *gorm.DB, watchdir string, encrypted bool, input chan notify.EventInfo) { if err := notify.Watch(filepath.Join(watchdir, "..."), input, notify.Write, notify.Create); err != nil { - logrus.Fatalf("%v", err) + logrus.Errorf("Failed to watch dir with error: %v", err) + return } conf := watcherConfig{ db: db, diff --git a/tests/system_test.go b/tests/system_test.go index 739c350..38721f4 100644 --- a/tests/system_test.go +++ b/tests/system_test.go @@ -6,7 +6,7 @@ import ( "fmt" "io" "log" - mathrand "math/rand" + "math/big" "oneway-filesync/pkg/config" "oneway-filesync/pkg/database" "oneway-filesync/pkg/receiver" @@ -21,6 +21,14 @@ import ( "gorm.io/gorm" ) +func randint(max int64) int { + nBig, err := rand.Int(rand.Reader, big.NewInt(max)) + if err != nil { + panic(err) + } + return int(nBig.Int64()) +} + func getDiff(t *testing.T, path1 string, path2 string) int { diff := 0 buf1 := make([]byte, 64*1024) @@ -168,7 +176,7 @@ func setupTest(t *testing.T, conf config.Config) (*gorm.DB, *gorm.DB, func()) { func TestSetup(t *testing.T) { _, _, teardowntest := setupTest(t, config.Config{ ReceiverIP: "127.0.0.1", - ReceiverPort: mathrand.Intn(30000) + 30000, + ReceiverPort: randint(30000) + 30000, BandwidthLimit: 10000, ChunkSize: 8192, EncryptedOutput: true, @@ -195,7 +203,7 @@ func TestFileTransfer(t *testing.T) { []int{500, 1024 * 1024}, config.Config{ ReceiverIP: "127.0.0.1", - ReceiverPort: mathrand.Intn(30000) + 30000, + ReceiverPort: randint(30000) + 30000, BandwidthLimit: 100 * 1024, ChunkSize: 8192, EncryptedOutput: false, @@ -212,7 +220,7 @@ func TestFileTransfer(t *testing.T) { []int{500, 1024 * 1024}, config.Config{ ReceiverIP: "127.0.0.1", - ReceiverPort: mathrand.Intn(30000) + 30000, + ReceiverPort: randint(30000) + 30000, BandwidthLimit: 100 * 1024, ChunkSize: 8192, EncryptedOutput: true, @@ -248,7 +256,7 @@ func TestFileTransfer(t *testing.T) { func TestWatcherFiles(t *testing.T) { conf := config.Config{ ReceiverIP: "127.0.0.1", - ReceiverPort: mathrand.Intn(30000) + 30000, + ReceiverPort: randint(30000) + 30000, BandwidthLimit: 1024 * 1024, ChunkSize: 8192, EncryptedOutput: true, @@ -292,5 +300,4 @@ func TestWatcherFiles(t *testing.T) { defer os.Remove(tempfile) defer waitForFinishedFile(t, receiverdb, tempfile, time.Now().Add(time.Minute*5), conf.OutDir) } - }