Skip to content

Commit

Permalink
generalising
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Feb 21, 2025
1 parent 203fa0f commit f3cc5df
Show file tree
Hide file tree
Showing 19 changed files with 224 additions and 98 deletions.
4 changes: 3 additions & 1 deletion cmd/slackdump/internal/convertcmd/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func chunk2export(ctx context.Context, src, trg string, cflg convertflags) error
return err
}
defer cd.Close()

fsa, err := fsadapter.New(trg)
if err != nil {
return err
Expand All @@ -131,8 +132,9 @@ func chunk2export(ctx context.Context, src, trg string, cflg convertflags) error
includeAvatars = cflg.withAvatars && (st&source.FAvatars != 0)
)

s := source.NewChunkDir(cd, true)
cvt := convert.NewChunkToExport(
cd,
s,
fsa,
convert.WithIncludeFiles(includeFiles),
convert.WithIncludeAvatars(includeAvatars),
Expand Down
17 changes: 12 additions & 5 deletions cmd/slackdump/internal/diag/hydrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"iter"
"log/slog"
"net/http"
"net/url"
Expand Down Expand Up @@ -179,8 +180,8 @@ func download(ctx context.Context, archive, target string, dry bool) error {
//go:generate mockgen -destination=hydrate_mock_test.go -package=diag -source hydrate.go sourcer
type sourcer interface {
Channels(ctx context.Context) ([]slack.Channel, error)
AllMessages(ctx context.Context, channelID string) ([]slack.Message, error)
AllThreadMessages(ctx context.Context, channelID, threadTimestamp string) ([]slack.Message, error)
AllMessages(ctx context.Context, channelID string) (iter.Seq2[slack.Message, error], error)
AllThreadMessages(ctx context.Context, channelID, threadTimestamp string) (iter.Seq2[slack.Message, error], error)
}

func downloadFiles(ctx context.Context, d downloader.GetFiler, trg fsadapter.FS, src sourcer) error {
Expand All @@ -202,18 +203,24 @@ func downloadFiles(ctx context.Context, d downloader.GetFiler, trg fsadapter.FS,
if err != nil {
return fmt.Errorf("error reading messages in channel %s: %w", ch.ID, err)
}
for _, m := range msgs {
for m, err := range msgs {
if err != nil {
return fmt.Errorf("error reading message in channel %s: %w", ch.ID, err)
}
if len(m.Files) > 0 {
if err := proc.Files(ctx, &ch, m, m.Files); err != nil {
return fmt.Errorf("error processing files in message %s: %w", m.Timestamp, err)
}
}
if structures.IsThreadStart(&m) {
tm, err := src.AllThreadMessages(ctx, ch.ID, m.ThreadTimestamp)
itTm, err := src.AllThreadMessages(ctx, ch.ID, m.ThreadTimestamp)
if err != nil {
return fmt.Errorf("error reading thread messages for message %s in channel %s: %w", m.Timestamp, ch.ID, err)
}
for _, tm := range tm {
for tm, err := range itTm {
if err != nil {
return fmt.Errorf("error reading thread message %s in channel %s: %w", tm.Timestamp, ch.ID, err)
}
if len(tm.Files) > 0 {
if err := proc.Files(ctx, &ch, tm, tm.Files); err != nil {
return fmt.Errorf("error processing files in thread message %s: %w", tm.Timestamp, err)
Expand Down
9 changes: 5 additions & 4 deletions cmd/slackdump/internal/diag/hydrate_mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/rusq/slackdump/v3/internal/chunk/control"
"github.com/rusq/slackdump/v3/internal/chunk/transform"
"github.com/rusq/slackdump/v3/internal/chunk/transform/fileproc"
"github.com/rusq/slackdump/v3/internal/source"
"github.com/rusq/slackdump/v3/internal/structures"
"github.com/rusq/slackdump/v3/stream"
)
Expand Down Expand Up @@ -48,7 +49,8 @@ func export(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, list
return fn(m)
}
}
conv := transform.NewExpConverter(chunkdir, fsa, transform.ExpWithMsgUpdateFunc(updFn()))
src := source.NewChunkDir(chunkdir, false)
conv := transform.NewExpConverter(src, fsa, transform.ExpWithMsgUpdateFunc(updFn()))
tf := transform.NewExportCoordinator(ctx, conv, transform.WithBufferSize(1000))
defer tf.Close()

Expand Down
18 changes: 18 additions & 0 deletions internal/chunk/db_compat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package chunk

import (
"context"

"github.com/rusq/slack"
)

// db source compatibility layer

func (d *Directory) ChannelInfo(ctx context.Context, id string) (*slack.Channel, error) {
f, err := d.Open(ToFileID(id, "", false))
if err != nil {
return nil, err
}
defer f.Close()
return f.ChannelInfo(id)
}
24 changes: 20 additions & 4 deletions internal/chunk/dbproc/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,22 @@ type valuer[T any] interface {
Val() (T, error)
}

func valueIter[T any, D valuer[T]](it iter.Seq2[D, error]) iter.Seq2[T, error] {
iterFn := func(yield func(T, error) bool) {
for c, err := range it {
if err != nil {
var t T
yield(t, err)
return
}
if !yield(c.Val()) {
return
}
}
}
return iterFn
}

func collect[T any, D valuer[T]](it iter.Seq2[D, error], sz int) ([]T, error) {
vs := make([]T, 0, sz)
for c, err := range it {
Expand All @@ -89,7 +105,7 @@ func collect[T any, D valuer[T]](it iter.Seq2[D, error], sz int) ([]T, error) {
return vs, nil
}

func (s *Source) AllMessages(ctx context.Context, channelID string) ([]slack.Message, error) {
func (s *Source) AllMessages(ctx context.Context, channelID string) (iter.Seq2[slack.Message, error], error) {
tx, err := s.conn.BeginTxx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, err
Expand All @@ -101,10 +117,10 @@ func (s *Source) AllMessages(ctx context.Context, channelID string) ([]slack.Mes
if err != nil {
return nil, err
}
return collect(it, preallocSz)
return valueIter(it), nil
}

func (s *Source) AllThreadMessages(ctx context.Context, channelID, threadID string) ([]slack.Message, error) {
func (s *Source) AllThreadMessages(ctx context.Context, channelID, threadID string) (iter.Seq2[slack.Message, error], error) {
tx, err := s.conn.BeginTxx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, err
Expand All @@ -116,7 +132,7 @@ func (s *Source) AllThreadMessages(ctx context.Context, channelID, threadID stri
if err != nil {
return nil, err
}
return collect(it, preallocSz)
return valueIter(it), nil
}

func (s *Source) ChannelInfo(ctx context.Context, channelID string) (*slack.Channel, error) {
Expand Down
1 change: 1 addition & 0 deletions internal/chunk/dbproc/source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package dbproc
55 changes: 32 additions & 23 deletions internal/chunk/transform/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
"runtime/trace"
"sort"
"sync/atomic"
"time"

"github.com/rusq/fsadapter"
"github.com/rusq/slack"

"github.com/rusq/slackdump/v3/export"
"github.com/rusq/slackdump/v3/internal/chunk"
"github.com/rusq/slackdump/v3/internal/fasttime"
"github.com/rusq/slackdump/v3/internal/source"
"github.com/rusq/slackdump/v3/internal/structures"
"github.com/rusq/slackdump/v3/types"
)
Expand All @@ -39,15 +39,15 @@ func ExpWithUsers(users []slack.User) ExpCvtOption {
}

type ExpConverter struct {
cd *chunk.Directory
src source.Sourcer
fsa fsadapter.FS
users atomic.Value
msgFunc []msgUpdFunc
}

func NewExpConverter(cd *chunk.Directory, fsa fsadapter.FS, opt ...ExpCvtOption) *ExpConverter {
func NewExpConverter(src source.Sourcer, fsa fsadapter.FS, opt ...ExpCvtOption) *ExpConverter {
e := &ExpConverter{
cd: cd,
src: src,
fsa: fsa,
}
for _, o := range opt {
Expand Down Expand Up @@ -83,35 +83,39 @@ func (e *ExpConverter) Convert(ctx context.Context, id chunk.FileID) error {
lg.DebugContext(ctx, "transforming channel", "id", id, "user_count", userCnt)
}

// load the chunk file
cf, err := e.cd.Open(id)
if err != nil {
return fmt.Errorf("error opening chunk file %q: %w", id, err)
}
defer cf.Close()

channelID, _ := id.Split()
ci, err := cf.ChannelInfo(channelID)
ci, err := e.src.ChannelInfo(ctx, channelID)
if err != nil {
return fmt.Errorf("error reading channel info for %q: %w", id, err)
}

if err := e.writeMessages(ctx, cf, ci); err != nil {
if err := e.writeMessages(ctx, ci); err != nil {
return err
}

return nil
}

func (e *ExpConverter) writeMessages(ctx context.Context, pl *chunk.File, ci *slack.Channel) error {
func (e *ExpConverter) writeMessages(ctx context.Context, ci *slack.Channel) error {
lg := slog.With("in", "writeMessages", "channel", ci.ID)
uidx := types.Users(e.getUsers()).IndexByID()
trgdir := ExportChanName(ci)

mm := make([]export.ExportMessage, 0, 100)
var prevDt string
var currDt string
if err := pl.Sorted(ctx, false, func(ts time.Time, m *slack.Message) error {
it, err := e.src.AllMessages(ctx, ci.ID)
if err != nil {
return fmt.Errorf("error getting messages for %q: %w", ci.ID, err)
}
for m, err := range it {
if err != nil {
return fmt.Errorf("error reading message: %w", err)
}
ts, err := structures.ParseSlackTS(m.Timestamp)
if err != nil {
return fmt.Errorf("error parsing timestamp: %w", err)
}
currDt = ts.Format("2006-01-02")
if currDt != prevDt || prevDt == "" {
if prevDt != "" {
Expand All @@ -126,10 +130,10 @@ func (e *ExpConverter) writeMessages(ctx context.Context, pl *chunk.File, ci *sl
// the "thread" is only used to collect statistics. Thread messages
// are passed by Sorted and written as a normal course of action.
var thread []slack.Message
if structures.IsThreadStart(m) && m.LatestReply != structures.LatestReplyNoReplies {
if structures.IsThreadStart(&m) && m.LatestReply != structures.LatestReplyNoReplies {
// get the thread for the initial thread message only.
var err error
thread, err = pl.AllThreadMessages(ci.ID, m.ThreadTimestamp)
itTm, err := e.src.AllThreadMessages(ctx, ci.ID, m.ThreadTimestamp)
if err != nil {
if !errors.Is(err, chunk.ErrNotFound) {
return fmt.Errorf("error getting thread messages for %q: %w", ci.ID+":"+m.ThreadTimestamp, err)
Expand All @@ -139,19 +143,24 @@ func (e *ExpConverter) writeMessages(ctx context.Context, pl *chunk.File, ci *sl
lg.Warn("not an error, possibly deleted thread not found in chunk file", "slack_link", ci.ID+":"+m.ThreadTimestamp)
}
}
thread = make([]slack.Message, 0, 10)
for tm, err := range itTm {
if err != nil {
return fmt.Errorf("error reading thread message: %w", err)
}
thread = append(thread, tm)
}
}

// apply all message functions.
for _, fn := range e.msgFunc {
if err := fn(ci, m); err != nil {
if err := fn(ci, &m); err != nil {
return fmt.Errorf("error updating message: %w", err)
}
}

mm = append(mm, *toExportMessage(m, thread, uidx[m.User]))
mm = append(mm, *toExportMessage(&m, thread, uidx[m.User]))
return nil
}); err != nil {
return fmt.Errorf("sorted callback error: %w", err)
}

// flush the last day.
Expand Down Expand Up @@ -267,11 +276,11 @@ func ExportChanName(ch *slack.Channel) string {
// once all transformations are done, because it might require to read channel
// files.
func (t *ExpConverter) WriteIndex(ctx context.Context) error {
wsp, err := t.cd.WorkspaceInfo()
wsp, err := t.src.WorkspaceInfo(ctx)
if err != nil {
return fmt.Errorf("failed to get the workspace info: %w", err)
}
chans, err := t.cd.Channels(ctx)
chans, err := t.src.Channels(ctx)
if err != nil {
return fmt.Errorf("error indexing channels: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/chunk/transform/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/rusq/slackdump/v3/internal/chunk"
"github.com/rusq/slackdump/v3/internal/fixtures"
"github.com/rusq/slackdump/v3/internal/source"
)

func Test_transform(t *testing.T) {
Expand Down Expand Up @@ -53,8 +54,9 @@ func Test_transform(t *testing.T) {
t.Fatal(err)
}
defer cd.Close()
src := source.NewChunkDir(cd, true)
cvt := ExpConverter{
cd: cd,
src: src,
fsa: tt.args.fsa,
}
if err := cvt.Convert(tt.args.ctx, chunk.FileID(tt.args.id)); (err != nil) != tt.wantErr {
Expand All @@ -71,7 +73,6 @@ func TestExpConverter_getUsers(t *testing.T) {
return &v
}
type fields struct {
cd *chunk.Directory
fsa fsadapter.FS
users atomic.Value
msgFunc []msgUpdFunc
Expand Down Expand Up @@ -99,7 +100,6 @@ func TestExpConverter_getUsers(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &ExpConverter{
cd: tt.fields.cd,
fsa: tt.fields.fsa,
users: tt.fields.users,
msgFunc: tt.fields.msgFunc,
Expand Down
Loading

0 comments on commit f3cc5df

Please sign in to comment.