Skip to content

Commit

Permalink
universal converter
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Feb 22, 2025
1 parent 257889f commit f96c187
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 97 deletions.
40 changes: 17 additions & 23 deletions cmd/slackdump/internal/convertcmd/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/rusq/slackdump/v3/cmd/slackdump/internal/cfg"
"github.com/rusq/slackdump/v3/cmd/slackdump/internal/golang/base"
"github.com/rusq/slackdump/v3/internal/chunk"
"github.com/rusq/slackdump/v3/internal/chunk/transform/fileproc"
"github.com/rusq/slackdump/v3/internal/convert"
"github.com/rusq/slackdump/v3/internal/source"
Expand All @@ -35,19 +34,16 @@ Currently only "export" format is supported.

type tparams struct {
storageType fileproc.StorageType
inputfmt datafmt
outputfmt datafmt
}

var params = tparams{
storageType: fileproc.STmattermost,
inputfmt: Fchunk,
outputfmt: Fexport,
}

func init() {
CmdConvert.Flag.Var(&params.storageType, "storage", "storage type")
CmdConvert.Flag.Var(&params.inputfmt, "input", "input format")
CmdConvert.Flag.Var(&params.outputfmt, "output", "output format")
}

Expand All @@ -56,14 +52,14 @@ func runConvert(ctx context.Context, cmd *base.Command, args []string) error {
base.SetExitStatus(base.SInvalidParameters)
return errors.New("source and destination are required")
}
fn, exist := converter(params.inputfmt, params.outputfmt)
fn, exist := converter(params.outputfmt)
if !exist {
base.SetExitStatus(base.SInvalidParameters)
return errors.New("unsupported conversion type")
}

lg := cfg.Log
lg.InfoContext(ctx, "converting", "input_format", params.inputfmt, "source", args[0], "output_format", params.outputfmt, "output", cfg.Output)
lg.InfoContext(ctx, "converting", "source", args[0], "output_format", params.outputfmt, "output", cfg.Output)

cflg := convertflags{
withFiles: cfg.DownloadFiles,
Expand All @@ -80,23 +76,17 @@ func runConvert(ctx context.Context, cmd *base.Command, args []string) error {
return nil
}

func converter(input datafmt, output datafmt) (convertFunc, bool) {
if _, ok := converters[input]; !ok {
return nil, false
}
if cvt, ok := converters[input][output]; ok {
func converter(output datafmt) (convertFunc, bool) {
if cvt, ok := converters[output]; ok {
return cvt, true
}
return nil, false
}

type convertFunc func(ctx context.Context, input, output string, cflg convertflags) error

// ..................input.......output..............
var converters = map[datafmt]map[datafmt]convertFunc{
Fchunk: {
Fexport: chunk2export,
},
var converters = map[datafmt]convertFunc{
Fexport: toExport,
}

type convertflags struct {
Expand All @@ -105,16 +95,16 @@ type convertflags struct {
stt fileproc.StorageType
}

func chunk2export(ctx context.Context, src, trg string, cflg convertflags) error {
func toExport(ctx context.Context, src, trg string, cflg convertflags) error {
// detect source type
st, err := source.Type(src)
if err != nil {
return err
}
cd, err := chunk.OpenDir(src)
if err != nil {
return err

if st == source.FUnknown {
return errors.New("unknown source type")
}
defer cd.Close()

fsa, err := fsadapter.New(trg)
if err != nil {
Expand All @@ -132,8 +122,12 @@ func chunk2export(ctx context.Context, src, trg string, cflg convertflags) error
includeAvatars = cflg.withAvatars && (st&source.FAvatars != 0)
)

s := source.NewChunkDir(cd, true)
cvt := convert.NewChunkToExport(
s, err := source.Load(ctx, src)
if err != nil {
return err
}

cvt := convert.NewToExport(
s,
fsa,
convert.WithIncludeFiles(includeFiles),
Expand Down
16 changes: 11 additions & 5 deletions internal/chunk/transform/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,16 @@ func (e *ExpConverter) writeMessages(ctx context.Context, ci *slack.Channel) err
uidx := types.Users(e.getUsers()).IndexByID()
trgdir := ExportChanName(ci)

mm := make([]export.ExportMessage, 0, 100)
var prevDt string
var currDt string
it, err := e.src.AllMessages(ctx, ci.ID)
if err != nil {
return fmt.Errorf("error getting messages for %q: %w", ci.ID, err)
}
// loop variables
var (
prevDt string
currDt string
mm []export.ExportMessage
)
for m, err := range it {
if err != nil {
return fmt.Errorf("error reading message: %w", err)
Expand All @@ -119,6 +122,7 @@ func (e *ExpConverter) writeMessages(ctx context.Context, ci *slack.Channel) err
currDt = ts.Format("2006-01-02")
if currDt != prevDt || prevDt == "" {
if prevDt != "" {
// flush the previous day.
if err := e.writeout(filepath.Join(trgdir, prevDt+".json"), mm); err != nil {
return err
}
Expand Down Expand Up @@ -160,14 +164,16 @@ func (e *ExpConverter) writeMessages(ctx context.Context, ci *slack.Channel) err
}

mm = append(mm, *toExportMessage(&m, thread, uidx[m.User]))
return nil
}

// flush the last day.
if len(mm) > 0 {
// flush the last day.
lg.DebugContext(ctx, "writing last day", "date", prevDt)
if err := e.writeout(filepath.Join(trgdir, prevDt+".json"), mm); err != nil {
return err
}
} else {
lg.DebugContext(ctx, "no messages for the channel", "channel", ci.ID)
}

return nil
Expand Down
75 changes: 46 additions & 29 deletions internal/convert/chunkexp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"log/slog"
"os"
"path/filepath"
"path"
"runtime/trace"
"sync"

Expand Down Expand Up @@ -38,7 +38,6 @@ type ChunkToExport struct {
// trg is the target FS for the export
trg fsadapter.FS
opts options
lg *slog.Logger

workers int // number of workers to use to convert channels

Expand All @@ -47,7 +46,8 @@ type ChunkToExport struct {
avtrresult chan copyresult
}

func NewChunkToExport(src source.Sourcer, trg fsadapter.FS, opt ...C2EOption) *ChunkToExport {
// NewToExport returns the converter to export format.
func NewToExport(src source.Sourcer, trg fsadapter.FS, opt ...Option) *ChunkToExport {
c := &ChunkToExport{
src: src,
trg: trg,
Expand Down Expand Up @@ -127,6 +127,8 @@ func (c *ChunkToExport) Convert(ctx context.Context) error {
// 1. generator
chC := sliceToChan(channels)

lg := c.opts.lg

errC := make(chan error, c.workers)
{
// 2. workers
Expand Down Expand Up @@ -170,7 +172,7 @@ func (c *ChunkToExport) Convert(ctx context.Context) error {
go func() {
defer msgwg.Done()
for ch := range chC {
lg := c.lg.With("channel", ch.ID)
lg := lg.With("channel", ch.ID)
lg.Debug("processing channel")
if err := conv.Convert(ctx, chunk.ToFileID(ch.ID, "", false)); err != nil {
errC <- fmt.Errorf("converter: failed to process %q: %w", ch.ID, err)
Expand All @@ -183,18 +185,18 @@ func (c *ChunkToExport) Convert(ctx context.Context) error {
msgwg.Add(1)
go func() {
defer msgwg.Done()
c.lg.DebugContext(ctx, "writing index", "name", c.src.Name())
lg.DebugContext(ctx, "writing index", "name", c.src.Name())
if err := conv.WriteIndex(ctx); err != nil {
errC <- err
}
}()
// 2.3. workers sentinels
go func() {
msgwg.Wait()
c.lg.Debug("messages wait group done, closing file requests")
lg.Debug("messages wait group done, closing file requests")
close(c.filerequest)
filewg.Wait()
c.lg.Debug("file workers done, finalising")
lg.Debug("file workers done, finalising")
close(errC)
}()
}
Expand All @@ -204,9 +206,9 @@ func (c *ChunkToExport) Convert(ctx context.Context) error {
for res := range fileresults {
if res.err != nil {
if res.fr.message != nil {
c.lg.Error("file converter: error processing message", "ts", res.fr.message.Timestamp, "err", res.err)
lg.Error("file converter: error processing message", "ts", res.fr.message.Timestamp, "err", res.err)
} else {
c.lg.Error("file converter", "err", res.err)
lg.Error("file converter", "err", res.err)
}
errC <- res.err
}
Expand All @@ -224,7 +226,7 @@ LOOP:
break LOOP
}
if err != nil {
slog.Error("worker error", "err", err)
lg.ErrorContext(ctx, "worker", "error", err)
failed = true
}
}
Expand Down Expand Up @@ -280,30 +282,38 @@ func (c *ChunkToExport) fileCopy(ch *slack.Channel, msg *slack.Message) error {
}
if msg == nil {
return errors.New("convert: internal error: callback: nil message")
}
if len(msg.Files) == 0 {
} else if len(msg.Files) == 0 {
// no files to process
return nil
}

var (
fsys = c.src.Files().FS()
lg = c.opts.lg.With("channel", ch.ID, "ts", msg.Timestamp)
)
for _, f := range msg.Files {
if err := fileproc.IsValidWithReason(&f); err != nil {
c.lg.Info("skipping", "file", f.ID, "error", err)
lg.Info("skipping file", "file", f.ID, "error", err)
continue
}

srcpath := filepath.Join(c.src.Name(), c.opts.srcFileLoc(ch, &f))
srcpath, err := c.src.Files().File(f.ID, f.Name)
if err != nil {
return &copyerror{f.ID, err}
}
// srcpath := c.opts.srcFileLoc(ch, &f)
trgpath := c.opts.trgFileLoc(ch, &f)

sfi, err := os.Stat(srcpath)
sfi, err := fs.Stat(fsys, srcpath)
if err != nil {
return &copyerror{f.ID, err}
}
if sfi.Size() == 0 {
c.lg.Warn("skipping", "file", f.ID, "reason", "empty file")
lg.Warn("skipping", "file", f.ID, "reason", "empty")
continue
}
c.lg.Debug("copying", "srcpath", srcpath, "trgpath", trgpath)
if err := copy2trg(c.trg, trgpath, srcpath); err != nil {
lg.Debug("copying", "srcpath", srcpath, "trgpath", trgpath)
if err := copy2trg(c.trg, trgpath, fsys, srcpath); err != nil {
return &copyerror{f.ID, err}
}
}
Expand All @@ -312,8 +322,8 @@ func (c *ChunkToExport) fileCopy(ch *slack.Channel, msg *slack.Message) error {

// copy2trg copies the file from the source path to the target path. Source
// path is absolute, target path is relative to the target FS adapter root.
func copy2trg(trgfs fsadapter.FS, trgpath, srcpath string) error {
in, err := os.Open(srcpath)
func copy2trg(trgfs fsadapter.FS, trgpath string, srcfs fs.FS, srcpath string) error {
in, err := srcfs.Open(srcpath)
if err != nil {
return err
}
Expand Down Expand Up @@ -349,36 +359,43 @@ func (cr copyresult) Unwrap() error {

func (c *ChunkToExport) copyworker(req <-chan copyrequest) {
defer close(c.fileresult)
c.lg.Debug("copy worker started")
c.opts.lg.Debug("copy worker started")
for r := range req {
c.fileresult <- copyresult{
fr: r,
err: c.fileCopy(r.channel, r.message),
}
}
c.lg.Debug("copy worker done")
c.opts.lg.Debug("copy worker done")
}

func (c *ChunkToExport) avatarWorker(users []slack.User) {
c.lg.Debug("avatar worker started")
lg := c.opts.lg
lg.Debug("avatar worker started")
defer close(c.avtrresult)
for _, u := range users {
if u.Profile.ImageOriginal == "" {
continue
}
c.lg.Debug("processing avatar", "user", u.ID)
lg.Debug("processing avatar", "user", u.ID)
loc := c.opts.avtrFileLoc(&u)
err := copy2trg(c.trg, loc, filepath.Join(c.src.Name(), loc))
fsys := c.src.Avatars().FS()
srcLoc, err := c.src.Avatars().File(u.ID, path.Base(u.Profile.ImageOriginal))
if err != nil {
err = fmt.Errorf("error copying avatar for user %s: %w", u.ID, err)
err = fmt.Errorf("error getting avatar for user %s: %w", u.ID, err)
} else {
err = copy2trg(c.trg, loc, fsys, srcLoc)
if err != nil {
err = fmt.Errorf("error copying avatar for user %s: %w", u.ID, err)
}
}
c.avtrresult <- copyresult{
err: err,
}
if err != nil {
continue
}
c.lg.Debug("avatar processed", "user", u.ID)
lg.Debug("avatar processed", "user", u.ID)
}
c.lg.Debug("avatar worker done")
lg.Debug("avatar worker done")
}
10 changes: 6 additions & 4 deletions internal/convert/chunkexp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ func TestChunkToExport_Convert(t *testing.T) {
}
defer fsa.Close()
src := source.NewChunkDir(cd, true)
c := NewChunkToExport(src, fsa, WithIncludeFiles(true))
c := NewToExport(src, fsa, WithIncludeFiles(true))

ctx := context.Background()
c.lg = testLogger
c.opts.lg = testLogger
if err := c.Convert(ctx); err != nil {
t.Fatal(err)
}
Expand All @@ -138,8 +138,9 @@ func Test_copy2trg(t *testing.T) {
t.Fatal(err)
}
trgfs := fsadapter.NewDirectory(trgdir)
srcfs := os.DirFS(srcdir)

if err := copy2trg(trgfs, "test-copy.txt", filepath.Join(srcdir, "test.txt")); err != nil {
if err := copy2trg(trgfs, "test-copy.txt", srcfs, "test.txt"); err != nil {
t.Fatal(err)
}
// validate
Expand All @@ -155,9 +156,10 @@ func Test_copy2trg(t *testing.T) {
srcdir := t.TempDir()
trgdir := t.TempDir()

srcfs := os.DirFS(srcdir)
trgfs := fsadapter.NewDirectory(trgdir)
// source file does not exist.
if err := copy2trg(trgfs, "test-copy.txt", filepath.Join(srcdir, "test.txt")); err == nil {
if err := copy2trg(trgfs, "test-copy.txt", srcfs, "test.txt"); err == nil {
t.Fatal("expected error, but got nil")
}
})
Expand Down
Loading

0 comments on commit f96c187

Please sign in to comment.