Skip to content

Commit

Permalink
Unify dump converter, partial.
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Feb 27, 2025
1 parent 31ac30d commit 60aa1fd
Show file tree
Hide file tree
Showing 12 changed files with 446 additions and 38 deletions.
39 changes: 39 additions & 0 deletions cmd/slackdump/internal/bootstrap/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package bootstrap

import (
"os"
"path/filepath"
"strings"
"time"

"github.com/jmoiron/sqlx"

"github.com/rusq/slackdump/v3/cmd/slackdump/internal/cfg"
"github.com/rusq/slackdump/v3/internal/chunk/dbproc"
"github.com/rusq/slackdump/v3/internal/chunk/dbproc/repository"
)

const defFilename = "slackdump.sqlite"

// Database returns the initialised database connection open for writing.
func Database(dir string, mode string) (*sqlx.DB, dbproc.SessionInfo, error) {
dbfile := filepath.Join(dir, defFilename)
// wconn is the writer connection
wconn, err := sqlx.Open(repository.Driver, dbfile)
if err != nil {
return nil, dbproc.SessionInfo{}, err
}
return wconn, sessionInfo(mode), nil
}

func sessionInfo(mode string) dbproc.SessionInfo {
si := dbproc.SessionInfo{
FromTS: (*time.Time)(&cfg.Oldest),
ToTS: (*time.Time)(&cfg.Latest),
FilesEnabled: cfg.DownloadFiles,
AvatarsEnabled: cfg.DownloadAvatars,
Mode: mode,
Args: strings.Join(os.Args, "|"),
}
return si
}
128 changes: 127 additions & 1 deletion cmd/slackdump/internal/dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"flag"
"fmt"
"log/slog"
"os"
"runtime/trace"
"strings"
Expand All @@ -19,6 +20,8 @@ import (
"github.com/rusq/slackdump/v3/cmd/slackdump/internal/cfg"
"github.com/rusq/slackdump/v3/cmd/slackdump/internal/golang/base"
"github.com/rusq/slackdump/v3/internal/chunk"
"github.com/rusq/slackdump/v3/internal/chunk/control"
"github.com/rusq/slackdump/v3/internal/chunk/dbproc"
"github.com/rusq/slackdump/v3/internal/chunk/dirproc"
"github.com/rusq/slackdump/v3/internal/chunk/transform"
"github.com/rusq/slackdump/v3/internal/chunk/transform/fileproc"
Expand Down Expand Up @@ -165,6 +168,14 @@ func dump(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, p dump
return err
}

if cfg.UseChunkFiles {
return dumpv3(ctx, sess, fsa, p)
} else {
return dumpv31(ctx, sess, fsa, p)
}
}

func dumpv3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, p dumpparams) error {
dir, err := os.MkdirTemp("", "slackdump-*")
if err != nil {
return fmt.Errorf("failed to create temp directory: %w", err)
Expand Down Expand Up @@ -194,7 +205,7 @@ func dump(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, p dump
defer cd.Close()
src := source.NewChunkDir(cd, true)

tf, err := transform.NewStandard(fsa, src, opts...)
tf, err := transform.NewStdConverter(fsa, src, opts...)
if err != nil {
return fmt.Errorf("failed to create transform: %w", err)
}
Expand Down Expand Up @@ -246,3 +257,118 @@ func helpDump(cmd *base.Command) string {
}
return buf.String()
}

func dumpv31(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, p dumpparams) error {
lg := cfg.Log

tmpdir, err := os.MkdirTemp("", "slackdump-*")
if err != nil {
return fmt.Errorf("failed to create temp directory: %w", err)
}
if !lg.Enabled(ctx, slog.LevelDebug) {
defer func() { _ = os.RemoveAll(tmpdir) }()
}

wconn, si, err := bootstrap.Database(tmpdir, "dump")
if err != nil {
return err
}
defer wconn.Close()
tmpdbp, err := dbproc.New(ctx, wconn, si)
if err != nil {
return err
}
defer tmpdbp.Close()

lg.DebugContext(ctx, "using database in ", "dir", tmpdir)

defer func() {
if err := tmpdbp.Close(); err != nil {
lg.ErrorContext(ctx, "unable to close database processor", "error", err)
}
}()
src := source.OpenDatabaseConn(wconn)

// files subprocessor
sdl := fileproc.NewDownloader(ctx, p.downloadFiles, sess.Client(), fsa, cfg.Log)
subproc := fileproc.NewDump(sdl)
defer subproc.Close()

opts := []transform.StdOption{
transform.StdWithTemplate(p.tmpl),
transform.StdWithLogger(lg),
}
if p.updatePath && p.downloadFiles {
opts = append(opts, transform.StdWithPipeline(subproc.PathUpdateFunc))
}

tf, err := transform.NewStdConverter(fsa, src, opts...)
if err != nil {
return fmt.Errorf("failed to create transform: %w", err)
}
coord := transform.NewCoordinator(ctx, tf)

stream := sess.Stream(
stream.OptOldest(time.Time(cfg.Oldest)),
stream.OptLatest(time.Time(cfg.Latest)),
stream.OptResultFn(func(sr stream.Result) error {
if sr.Err != nil {
return sr.Err
}
if sr.IsLast {
lg.InfoContext(ctx, "dumped", "sr", sr.String())
}
return nil
}),
)

ctrl, err := control.NewDB(
ctx,
stream,
tmpdbp,
control.WithLogger(lg),
control.WithFlags(control.Flags{RecordFiles: cfg.DownloadFiles}),
control.WithTransformer(coord),
control.WithFiler(subproc),
)
if err != nil {
return fmt.Errorf("error creating db controller: %w", err)
}
defer ctrl.Close()

if err := ctrl.Run(ctx, p.list); err != nil {
return fmt.Errorf("error running db controller: %w", err)
}
return nil
}

/*
coord := transform.NewCoordinator(ctx, tf)
proc := chunk.NewCustomRecorder("db", tmpdbp)
defer proc.Close()
if err := sess.Stream(
stream.OptOldest(time.Time(cfg.Oldest)),
stream.OptLatest(time.Time(cfg.Latest)),
stream.OptResultFn(func(sr stream.Result) error {
if sr.Err != nil {
return sr.Err
}
if sr.IsLast {
lg.InfoContext(ctx, "dumped", "sr", sr.String())
}
return nil
}),
).Conversations(ctx, proc, p.list.C(ctx)); err != nil {
return fmt.Errorf("failed to dump conversations: %w", err)
}
lg.DebugContext(ctx, "stream complete, waiting for all goroutines to finish")
if err := coord.Wait(); err != nil {
return err
}
return nil
}
*/
17 changes: 2 additions & 15 deletions cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@ import (
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"time"

"github.com/jmoiron/sqlx"
"github.com/rusq/fsadapter"
"github.com/schollz/progressbar/v3"

Expand All @@ -20,7 +17,6 @@ import (
"github.com/rusq/slackdump/v3/internal/chunk"
"github.com/rusq/slackdump/v3/internal/chunk/control"
"github.com/rusq/slackdump/v3/internal/chunk/dbproc"
"github.com/rusq/slackdump/v3/internal/chunk/dbproc/repository"
"github.com/rusq/slackdump/v3/internal/chunk/transform"
"github.com/rusq/slackdump/v3/internal/chunk/transform/fileproc"
"github.com/rusq/slackdump/v3/internal/source"
Expand All @@ -38,20 +34,11 @@ func exportv31(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, l
}

lg.InfoContext(ctx, "temporary directory in use", "tmpdir", tmpdir)
dbfile := filepath.Join(tmpdir, "slackdump.sqlite")
// wconn is the writer connection
wconn, err := sqlx.Open(repository.Driver, dbfile)
wconn, si, err := bootstrap.Database(tmpdir, "export")
if err != nil {
return err
}
si := dbproc.SessionInfo{
FromTS: (*time.Time)(&cfg.Oldest),
ToTS: (*time.Time)(&cfg.Latest),
FilesEnabled: cfg.DownloadFiles,
AvatarsEnabled: cfg.DownloadAvatars,
Mode: "export",
Args: strings.Join(os.Args, "|"),
}
defer wconn.Close()

tmpdbp, err := dbproc.New(ctx, wconn, si)
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions internal/chunk/control/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/rusq/slack"

"github.com/rusq/slackdump/v3/internal/chunk"
"github.com/rusq/slackdump/v3/internal/chunk/dirproc"
"github.com/rusq/slackdump/v3/processor"
)

Expand Down Expand Up @@ -46,35 +47,35 @@ func (u *userCollector) Close() error {
// processor.
type conversationTransformer struct {
ctx context.Context
tf ExportTransformer
tf dirproc.Transformer
rc ReferenceChecker
}

var _ processor.Messenger = (*conversationTransformer)(nil)

func (ct *conversationTransformer) Messages(ctx context.Context, channelID string, numThreads int, isLast bool, mm []slack.Message) error {
if isLast {
return ct.mbeTransform(ctx, channelID)
return ct.mbeTransform(ctx, channelID, "", false)
}
return nil
}

func (ct *conversationTransformer) ThreadMessages(ctx context.Context, channelID string, parent slack.Message, threadOnly bool, isLast bool, tm []slack.Message) error {
if isLast {
return ct.mbeTransform(ctx, channelID)
return ct.mbeTransform(ctx, channelID, parent.ThreadTimestamp, threadOnly)
}
return nil
}

func (ct *conversationTransformer) mbeTransform(ctx context.Context, channelID string) error {
func (ct *conversationTransformer) mbeTransform(ctx context.Context, channelID, threadID string, threadOnly bool) error {
finalised, err := ct.rc.IsFinalised(ctx, channelID)
if err != nil {
return fmt.Errorf("error checking if finalised: %w", err)
}
if !finalised {
return nil
}
if err := ct.tf.Transform(ctx, chunk.FileID(channelID)); err != nil {
if err := ct.tf.Transform(ctx, chunk.ToFileID(channelID, threadID, threadOnly)); err != nil {
return fmt.Errorf("error transforming: %w", err)
}
return nil
Expand Down
15 changes: 10 additions & 5 deletions internal/chunk/dbproc/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ const preallocSz = 100

type Source struct {
conn *sqlx.DB
// canClose set to false when the connection is passed to the source
// and should not be closed by the source.
canClose bool
}

// Connect uses existing connection to the database.
func Connect(conn *sqlx.DB) *Source {
return &Source{conn: conn}
}

// Open attempts to open the database at given path.
func Open(path string) (*Source, error) {
conn, err := sqlx.Open(repository.Driver, "file:"+path+"?mode=ro")
if err != nil {
Expand All @@ -34,17 +38,18 @@ func Open(path string) (*Source, error) {
if err := conn.Ping(); err != nil {
return nil, err
}
return &Source{conn: conn}, nil
return &Source{conn: conn, canClose: true}, nil
}

// Close closes the database connection. It is a noop
// if the [Source] was created with [Connect].
func (r *Source) Close() error {
if !r.canClose {
return nil
}
return r.conn.Close()
}

func OpenDB(conn *sqlx.DB) *Source {
return &Source{conn: conn}
}

func (s *Source) Channels(ctx context.Context) ([]slack.Channel, error) {
cr := repository.NewChannelRepository()

Expand Down
6 changes: 3 additions & 3 deletions internal/chunk/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

// file extensions
const (
chunkExt = ".json.gz"
ChunkExt = ".json.gz"
extIdx = ".idx"
)

Expand Down Expand Up @@ -237,7 +237,7 @@ func (d *Directory) Walk(fn func(name string, f *File, err error) error) error {
if err != nil {
return err
}
if !strings.HasSuffix(path, chunkExt) || de.IsDir() {
if !strings.HasSuffix(path, ChunkExt) || de.IsDir() {
return nil
}
f, err := d.openRAW(path)
Expand Down Expand Up @@ -350,7 +350,7 @@ func openfile(filename string) (*os.File, error) {

// filename returns the full path of the chunk file with the given fileID.
func (d *Directory) filename(id FileID) string {
return filepath.Join(d.dir, string(id)+chunkExt)
return filepath.Join(d.dir, string(id)+ChunkExt)
}

// Create creates the chunk file with the given name. Extension is appended
Expand Down
4 changes: 3 additions & 1 deletion internal/chunk/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ func allForOffsets[T any](p *File, offsets []int64, fn func(c *Chunk) []T) ([]T,
return items, nil
}

var ErrNoChannelUsers = errors.New("no channel users")

// ChannelInfo returns the information for the given channel.
func (f *File) ChannelInfo(channelID string) (*slack.Channel, error) {
info, err := f.channelInfo(channelID)
Expand All @@ -327,7 +329,7 @@ func (f *File) ChannelInfo(channelID string) (*slack.Channel, error) {
if !info.IsArchived && info.NumMembers > 0 {
users, err := f.ChannelUsers(channelID)
if err != nil {
return nil, fmt.Errorf("failed getting channel users for %q: %w", channelID, err)
return info, fmt.Errorf("failed getting channel users for %q: %w", channelID, ErrNoChannelUsers)
}
info.Members = users
}
Expand Down
7 changes: 7 additions & 0 deletions internal/chunk/transform/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"log/slog"

"github.com/rusq/slack"

"github.com/rusq/slackdump/v3/internal/chunk"
)

Expand Down Expand Up @@ -65,6 +67,11 @@ func (s *Coordinator) Wait() (err error) {
return err
}

func (s *Coordinator) StartWithUsers(context.Context, []slack.User) error {
// noop
return nil
}

func (s *Coordinator) Transform(ctx context.Context, id chunk.FileID) error {
select {
case err := <-s.errC:
Expand Down
Loading

0 comments on commit 60aa1fd

Please sign in to comment.